From 4c8da4777de09d427e20666eb4502bdf915bea22 Mon Sep 17 00:00:00 2001 From: Urko Date: Mon, 10 Apr 2023 11:43:13 +0200 Subject: [PATCH] refactor etl go: load from byte slice instead of interface - add benchmark - add profiling and trace --- Makefile | 30 +++++----- README.md | 3 + benchmark/nest/nest.go | 2 +- cmd/etl/main.go | 55 +++++++++++++------ cmd/server/main.go | 38 ++++++++++++- go.mod | 1 + go.sum | 2 + internal/etl/etl.go | 72 ++++++++++++++++++++++++ internal/etl/etl_benchmark_test.go | 73 +++++++++++++++++++++++++ internal/request/request.go | 33 +++++++---- internal/xml_loader/employee_wi.go | 28 ++++++---- internal/xml_loader/employee_wi_test.go | 22 ++++++++ 12 files changed, 302 insertions(+), 57 deletions(-) create mode 100644 internal/etl/etl.go create mode 100644 internal/etl/etl_benchmark_test.go create mode 100644 internal/xml_loader/employee_wi_test.go diff --git a/Makefile b/Makefile index ecd79c4..bddb515 100644 --- a/Makefile +++ b/Makefile @@ -1,16 +1,4 @@ COVERAGE_DIR=coverage -BINARY_DIR=bin -BINARY_NAME=ess-etl-go - -UNAME := $(shell uname -s) -ifeq ($(UNAME),Darwin) - OS = macos -else ifeq ($(UNAME),Linux) - OS = linux -else -$(error OS not supported by this Makefile) -endif -PACKAGE = $(shell head -1 go.mod | awk '{print $$2}') lint: golangci-lint run ./... @@ -24,8 +12,22 @@ test-coverage: go test -v -coverprofile ${COVERAGE_DIR}/cover.out ./... go tool cover -html ${COVERAGE_DIR}/cover.out -o ${COVERAGE_DIR}/cover.html benchmark: - go test -run none -bench . -benchtime 3s -benchmem + go test -run none -bench . -benchtime 3s -benchmem +benchmark_escape_analisys: + go test -gcflags "-m -m" -run none -bench . -benchtime 3s -benchmem -memprofile profile.out ./internal/etl +pprof: + go tool pprof -alloc_space profile.out +pprof_url:# top 40 -cum + go tool pprof -alloc_space http://localhost:5000/debug/pprof/allocs +benchmark_etl: + go test -gcflags "-m -m" -run none -bench . -benchtime 5s -benchmem -memprofile profile.out ./internal/etl/ +trace_etl: build_etl# go tool trace t.out + $(build_etl) + time ./etl -trace > t.out +build_etl: + env GOOS=linux CGO_ENABLED=0 GOARCH=amd64 go build -v -o etl ./cmd/etl/main.go build_server: env GOOS=linux CGO_ENABLED=0 GOARCH=amd64 go build -o ${BINARY_DIR}/${BINARY_NAME} ./main.go run_server: build_server - ./${BINARY_DIR}/${BINARY_NAME} \ No newline at end of file + ./${BINARY_DIR}/${BINARY_NAME} + diff --git a/README.md b/README.md index 0fd6172..ea27d10 100644 --- a/README.md +++ b/README.md @@ -6,3 +6,6 @@ Start your server Run your tests `go test -race -v --bench ./. --benchmem ./benchmark ` + + + diff --git a/benchmark/nest/nest.go b/benchmark/nest/nest.go index b2bd182..ac2bed2 100644 --- a/benchmark/nest/nest.go +++ b/benchmark/nest/nest.go @@ -26,7 +26,7 @@ func doRequest(wg *sync.WaitGroup, host string, employeeID int, errChan chan err defer wg.Done() var err error query := GraphqlQuery{ - Query: `query ByEmployeeNumber($employeeNumber: Float!) { byEmployeeNumber(employeeNumber: $employeeNumber) { EmployeeNumber Date WorkInformation { EmployeeNumber Date Shifts { Start End ActualStart ActualEnd RoleCode ShiftCategoryCode _ReferenceId } DaysOff { DayOff { DayOffTypeCode Note _ReferenceId } } } Baselines { Shifts { DayOff { DayOffTypeCode } } BaselineType DaysOff { DayOff { DayOffTypeCode } } FullDayAbsences { FullDayAbsence { AbsenceTypeCode } } } CustomFields { CustomField { FullDayAbsence { AbsenceTypeCode } } } DataVersion FullDayAbsences { FullDayAbsence { AbsenceTypeCode Note _ReferenceId } } } }`, + Query: `query ByEmployeeNumber($employeeNumber: Float!) { byEmployeeNumber(employeeNumber: $employeeNumber) { EmployeeNumber Date WorkInformation { EmployeeNumber Date Shifts { Start End RoleCode } DaysOff { DayOff { DayOffTypeCode Note _ReferenceId } } } Baselines { Shifts { Start End RoleCode } BaselineType DaysOff { DayOff { DayOffTypeCode } } FullDayAbsences { FullDayAbsence { AbsenceTypeCode } } } CustomFields { CustomField { Name } } DataVersion FullDayAbsences { FullDayAbsence { _ReferenceId } } } } `, OperationName: "ByEmployeeNumber", Variables: ByEmployeeNumberQueryVariables{ EmployeeNumber: float64(employeeID), diff --git a/cmd/etl/main.go b/cmd/etl/main.go index 2101279..3aa08fa 100644 --- a/cmd/etl/main.go +++ b/cmd/etl/main.go @@ -2,11 +2,15 @@ package main import ( "context" + "flag" "fmt" "log" + "net/http" _ "net/http/pprof" "os" "os/signal" + "runtime/pprof" + "runtime/trace" "sync" "syscall" @@ -21,28 +25,45 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) -// var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") +var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") +var traceflag = flag.Bool("trace", false, "write trace to file") func main() { - // flag.Parse() - // if *cpuprofile != "" { - // f, err := os.Create(*cpuprofile) - // if err != nil { - // log.Fatal(err) - // } - // pprof.StartCPUProfile(f) - // defer pprof.StopCPUProfile() - // } + flag.Parse() + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() - // // Add pprof endpoints - // go func() { - // log.Println(http.ListenAndServe("localhost:6060", nil)) - // }() + } + + // Add pprof endpoints + go func() { + if *cpuprofile != "" { + log.Println(http.ListenAndServe("localhost:6060", nil)) + } + }() + + if *traceflag { + log.Println("trace on") + trace.Start(os.Stdout) + } + + defer func() { + if *traceflag { + trace.Stop() + } + }() cr := crono.New() - defer cr.Table() - cfg := config.NewConfig(".env") + if !*traceflag { + defer cr.Table() + } + cfg := config.NewConfig(".env") ctx := context.Background() dbOpts := options.Client() @@ -105,7 +126,7 @@ func main() { go func() { for v := range ewiChan { - log.Println("len v", len(v)) + // log.Println("len v", len(v)) err := professionalRepo.InsertMany(ctx, v) if err != nil { errChan <- err diff --git a/cmd/server/main.go b/cmd/server/main.go index 7dcca91..10a7277 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -2,14 +2,18 @@ package main import ( "context" + "flag" "log" + "net/http" "os" "os/signal" + "runtime/pprof" + "runtime/trace" "syscall" "gitea.urkob.com/urko/crono" "gitea.urkob.com/urko/ess-etl-go/config" - "gitea.urkob.com/urko/ess-etl-go/internal/api/http" + apihttp "gitea.urkob.com/urko/ess-etl-go/internal/api/http" "gitea.urkob.com/urko/ess-etl-go/internal/services" "gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi" @@ -17,7 +21,36 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) +var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") +var traceflag = flag.String("trace", "", "write trace to file") + func main() { + flag.Parse() + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + + // Add pprof endpoints + go func() { + if *cpuprofile != "" { + log.Println(http.ListenAndServe("localhost:6060", nil)) + } + }() + + if *traceflag != "" { + log.Println("trace on") + trace.Start(os.Stdout) + defer func() { + log.Println("on stop") + trace.Stop() + }() + } + cr := crono.New() defer cr.Table() cfg := config.NewConfig(".env") @@ -41,13 +74,12 @@ func main() { employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) professionalRepo := employee_wi.NewRepo(employeeWICollection) - restServer := http. + restServer := apihttp. NewRestServer(cfg, cr). WithEmployeeWIHandler(services.NewEmployeeWIService(ctx, professionalRepo)). WithAMSHander() cr.MarkAndRestart("dependencies loaded") - log.Println(cfg) go func() { if err = restServer.Start(cfg.ApiPort, ""); err != nil { log.Fatalln("restServer.Start", err) diff --git a/go.mod b/go.mod index eea485a..b97e578 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( gitea.urkob.com/urko/crono v0.0.0-20230405153202-0554f3e53a4c gitea.urkob.com/urko/go-root-dir v0.0.0-20230311113851-2f6d4355888a + github.com/docker/go-units v0.5.0 github.com/gofiber/fiber/v2 v2.43.0 github.com/joho/godotenv v1.5.1 github.com/kelseyhightower/envconfig v1.4.0 diff --git a/go.sum b/go.sum index 55be086..15a7849 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/gofiber/fiber/v2 v2.43.0 h1:yit3E4kHf178B60p5CQBa/3v+WVuziWMa/G2ZNyLJB0= github.com/gofiber/fiber/v2 v2.43.0/go.mod h1:mpS1ZNE5jU+u+BA4FbM+KKnUzJ4wzTK+FT2tG3tU+6I= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= diff --git a/internal/etl/etl.go b/internal/etl/etl.go new file mode 100644 index 0000000..e505426 --- /dev/null +++ b/internal/etl/etl.go @@ -0,0 +1,72 @@ +package etl + +import ( + "context" + "log" + "runtime" + "sync" + + "gitea.urkob.com/urko/ess-etl-go/internal/xml_loader" + "gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi" + "gitea.urkob.com/urko/ess-etl-go/pkg/domain" +) + +type Etl struct { + ewiLoader xml_loader.EmployeeWILoader + repo employee_wi.Repo +} + +func New(ewiLoader xml_loader.EmployeeWILoader, repo employee_wi.Repo) *Etl { + return &Etl{ + ewiLoader: ewiLoader, + repo: repo, + } +} + +func (etl *Etl) FanOut(ctx context.Context, employeeNumber []string, from, to string) error { + g := runtime.GOMAXPROCS(0) + var wg sync.WaitGroup + wg.Add(g) + + employeeWIChan := make(chan []domain.EmployeeWorkInformation, g) + errChan := make(chan error, 1) + + for i := 0; i < g; i++ { + go func() { + defer func() { + wg.Done() + }() + for v := range employeeWIChan { + func() { + err := etl.repo.InsertMany(ctx, v) + if err != nil { + errChan <- err + return + } + }() + } + }() + } + go func() { + var wg2 sync.WaitGroup + wg2.Add(len(employeeNumber)) + for i := range employeeNumber { + go func(v string) { + defer wg2.Done() + wi, err := etl.ewiLoader.LoadEmployee(v, from, to) + if err != nil { + log.Println("err", err) + errChan <- err + return + } + employeeWIChan <- wi + }(employeeNumber[i]) + } + wg2.Wait() + close(employeeWIChan) + }() + + wg.Wait() + errChan <- nil + return <-errChan +} diff --git a/internal/etl/etl_benchmark_test.go b/internal/etl/etl_benchmark_test.go new file mode 100644 index 0000000..869517a --- /dev/null +++ b/internal/etl/etl_benchmark_test.go @@ -0,0 +1,73 @@ +package etl_test + +import ( + "context" + "log" + "testing" + + "gitea.urkob.com/urko/ess-etl-go/config" + "gitea.urkob.com/urko/ess-etl-go/internal/etl" + "gitea.urkob.com/urko/ess-etl-go/internal/request" + "gitea.urkob.com/urko/ess-etl-go/internal/xml_loader" + "gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func BenchmarkLoad(b *testing.B) { + cfg := config.NewConfig(".env") + ctx := context.Background() + + dbOpts := options.Client() + dbOpts.ApplyURI(cfg.DbAddress) + + client, err := mongo.NewClient(dbOpts) + require.NoError(b, err, "mongo.NewClient") + + require.NoError(b, client.Connect(ctx), "client.Connect") + + employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) + if err = employeeWICollection.Drop(ctx); err != nil { + log.Fatalln("employeeWICollection.Drop", err) + } + repo := employee_wi.NewRepo(employeeWICollection) + + r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) + + ewiLoader := xml_loader.NewEmployeeWILoader(r) + from, to := "2023-01-01", "2023-01-31" + e := etl.New(ewiLoader, *repo) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + require.NoError(b, e.FanOut(ctx, cfg.EmployeeIdList, from, to)) + } +} + +func TestLoad(t *testing.T) { + cfg := config.NewConfig(".env") + ctx := context.Background() + + dbOpts := options.Client() + dbOpts.ApplyURI(cfg.DbAddress) + + client, err := mongo.NewClient(dbOpts) + require.NoError(t, err, "mongo.NewClient") + + require.NoError(t, client.Connect(ctx), "client.Connect") + + employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) + if err = employeeWICollection.Drop(ctx); err != nil { + log.Fatalln("employeeWICollection.Drop", err) + } + repo := employee_wi.NewRepo(employeeWICollection) + + r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) + + ewiLoader := xml_loader.NewEmployeeWILoader(r) + from, to := "2023-01-01", "2023-01-31" + e := etl.New(ewiLoader, *repo) + + require.NoError(t, e.FanOut(ctx, cfg.EmployeeIdList, from, to)) +} diff --git a/internal/request/request.go b/internal/request/request.go index d1db0b1..af46129 100644 --- a/internal/request/request.go +++ b/internal/request/request.go @@ -1,7 +1,6 @@ package request import ( - "bytes" "fmt" "io" "net/http" @@ -38,12 +37,12 @@ func getPayload(employeeIDList []string) (string, error) { return employees.String(), nil } -func (r RequestService) EmployeeWorkInformation(employeeIDList []string, from, to string) (io.Reader, error) { +func (r RequestService) EmployeeWorkInformation(data *[]byte, employeeIDList []string, from, to string) error { url := r.api + "/EmployeeWorkInformation/Search/" + from + "/" + to + "/" stringPayload, err := getPayload(employeeIDList) if err != nil { - return nil, fmt.Errorf("getPayload: %s", err) + return fmt.Errorf("getPayload: %s", err) } payload := strings.NewReader(stringPayload) @@ -51,7 +50,7 @@ func (r RequestService) EmployeeWorkInformation(employeeIDList []string, from, t req, err := http.NewRequest("POST", url, payload) if err != nil { - return nil, fmt.Errorf("http.NewRequest: %s", err) + return fmt.Errorf("http.NewRequest: %s", err) } req.Header.Add("Cache-Control", "no-cache") req.Header.Add("Authorization", r.apiKey) @@ -59,16 +58,30 @@ func (r RequestService) EmployeeWorkInformation(employeeIDList []string, from, t res, err := client.Do(req) if err != nil { - return nil, fmt.Errorf("client.Do: %s", err) + return fmt.Errorf("client.Do: %s", err) } defer res.Body.Close() - body, err := io.ReadAll(res.Body) - if err != nil { - return nil, fmt.Errorf("ioutil.ReadAll: %s", err) + for { + if len(*data) == cap(*data) { + // Add more capacity (let append pick how much). + *data = append(*data, 0)[:len(*data)] + } + n, err := res.Body.Read((*data)[len(*data):cap(*data)]) + *data = (*data)[:len(*data)+n] + if err != nil { + if err == io.EOF { + err = nil + } + return err + } } - //log.Println("readed", string(body)) + // data, err = io.ReadAll(res.Body) + // if err != nil { + // return fmt.Errorf("ioutil.ReadAll: %s", err) + // } - return bytes.NewReader(body), nil + //log.Println("readed", string(body)) + return nil } diff --git a/internal/xml_loader/employee_wi.go b/internal/xml_loader/employee_wi.go index 2cfe7ac..db64e46 100644 --- a/internal/xml_loader/employee_wi.go +++ b/internal/xml_loader/employee_wi.go @@ -3,10 +3,10 @@ package xml_loader import ( "encoding/xml" "fmt" - "io" "gitea.urkob.com/urko/ess-etl-go/internal/request" "gitea.urkob.com/urko/ess-etl-go/pkg/domain" + "github.com/docker/go-units" ) type EmployeeWILoader struct { @@ -18,31 +18,35 @@ func NewEmployeeWILoader(r request.RequestService) EmployeeWILoader { } func (e EmployeeWILoader) LoadEmployeeList(employeeIDList []string, from, to string) ([]domain.EmployeeWorkInformation, error) { - reader, err := e.r.EmployeeWorkInformation(employeeIDList, from, to) + xmlBts := make([]byte, 0, units.MiB*5) + err := e.r.EmployeeWorkInformation(&xmlBts, employeeIDList, from, to) if err != nil { return nil, fmt.Errorf("r.EmployeeWorkInformation: %s", err) } + if len(xmlBts) <= 0 { + return nil, fmt.Errorf("couldn't load xml ") + } - return loadFromXML(reader) + return loadFromXML(xmlBts) } func (e EmployeeWILoader) LoadEmployee(employeeID, from, to string) ([]domain.EmployeeWorkInformation, error) { employeeIDList := []string{employeeID} - - reader, err := e.r.EmployeeWorkInformation(employeeIDList, from, to) + xmlBts := make([]byte, 0, units.MiB*5) + err := e.r.EmployeeWorkInformation(&xmlBts, employeeIDList, from, to) if err != nil { return nil, fmt.Errorf("r.EmployeeWorkInformation: %s", err) } - - return loadFromXML(reader) + if len(xmlBts) <= 0 { + return nil, fmt.Errorf("couldn't load xml ") + } + return loadFromXML(xmlBts) } -func loadFromXML(xmlFile io.Reader) ([]domain.EmployeeWorkInformation, error) { +func loadFromXML(xmlFile []byte) ([]domain.EmployeeWorkInformation, error) { var awi domain.ArrayOfEmployeeWorkInformation - - if err := xml.NewDecoder(xmlFile).Decode(&awi); err != nil { - return nil, fmt.Errorf("xml.NewDecoder.Decode: %s", err) + if err := xml.Unmarshal(xmlFile, &awi); err != nil { + return nil, fmt.Errorf("xml.Unmarshal: %s", err) } - return awi.EmployeeWorkInfos, nil } diff --git a/internal/xml_loader/employee_wi_test.go b/internal/xml_loader/employee_wi_test.go new file mode 100644 index 0000000..ad7aaf6 --- /dev/null +++ b/internal/xml_loader/employee_wi_test.go @@ -0,0 +1,22 @@ +package xml_loader_test + +import ( + "testing" + + "gitea.urkob.com/urko/ess-etl-go/config" + "gitea.urkob.com/urko/ess-etl-go/internal/request" + "gitea.urkob.com/urko/ess-etl-go/internal/xml_loader" + "github.com/stretchr/testify/require" +) + +func TestEmployeeWILoader_LoadEmployee(t *testing.T) { + cfg := config.NewConfig(".env") + + r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) + + loader := xml_loader.NewEmployeeWILoader(r) + employeeID, from, to := cfg.EmployeeIdList[0], "2023-01-01", "2023-01-31" + got, err := loader.LoadEmployee(employeeID, from, to) + require.NoError(t, err) + require.Greater(t, len(got), 0) +}