From 4286af982b8afdcfd054ee94e239aa3405af0d7b Mon Sep 17 00:00:00 2001 From: Urko Date: Mon, 10 Apr 2023 14:52:05 +0200 Subject: [PATCH] feat: etl add more benchmarks WIP: improve algorithym --- Makefile | 2 +- cmd/etl/main.go | 61 ++------- internal/etl/etl.go | 116 +++++++++++++++- internal/etl/etl_benchmark_test.go | 210 +++++++++++++++++++++++++++-- internal/xml_loader/employee_wi.go | 21 +++ 5 files changed, 348 insertions(+), 62 deletions(-) diff --git a/Makefile b/Makefile index 63e10eb..86dc691 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ pprof_url:# top 40 -cum benchmark_server: go test -gcflags "-m -m" -run none -bench . -benchtime 30s -benchmem -memprofile profile.out ./benchmark benchmark_etl: - go test -gcflags "-m -m" -run none -bench . -benchtime 5s -benchmem -memprofile profile.out ./internal/etl/ + go test -gcflags "-m -m" -run none -bench . -benchtime 15s -benchmem -memprofile profile.out ./internal/etl/ > t.out trace_etl: build_etl# go tool trace t.out $(build_etl) time ./etl -trace > t.out diff --git a/cmd/etl/main.go b/cmd/etl/main.go index 3aa08fa..c2bf3a9 100644 --- a/cmd/etl/main.go +++ b/cmd/etl/main.go @@ -3,7 +3,6 @@ package main import ( "context" "flag" - "fmt" "log" "net/http" _ "net/http/pprof" @@ -11,15 +10,14 @@ import ( "os/signal" "runtime/pprof" "runtime/trace" - "sync" "syscall" "gitea.urkob.com/urko/crono" "gitea.urkob.com/urko/ess-etl-go/config" + "gitea.urkob.com/urko/ess-etl-go/internal/etl" "gitea.urkob.com/urko/ess-etl-go/internal/request" "gitea.urkob.com/urko/ess-etl-go/internal/xml_loader" "gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi" - "gitea.urkob.com/urko/ess-etl-go/pkg/domain" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -84,62 +82,23 @@ func main() { if err = employeeWICollection.Drop(ctx); err != nil { log.Fatalln("employeeWICollection.Drop", err) } - professionalRepo := employee_wi.NewRepo(employeeWICollection) + repo := employee_wi.NewRepo(employeeWICollection) r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) - employeeIDList := cfg.EmployeeIdList - ewiLoader := xml_loader.NewEmployeeWILoader(r) from, to := "2023-01-01", "2023-01-31" cr.MarkAndRestart("dependencies loaded") - ctx, cancel := context.WithCancel(signalContext(context.Background())) - errChan := make(chan error, 1) - ewiChan := make(chan []domain.EmployeeWorkInformation, len(employeeIDList)) - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - for _, v := range employeeIDList { - wg.Add(1) - go func(v string) { - cr.Restart() - defer wg.Done() - wi, err := ewiLoader.LoadEmployee(v, from, to) - if err != nil { - errChan <- err - return - } - ewiChan <- wi - cr.MarkAndRestart(fmt.Sprintf("ewiLoader.LoadEmployee | %s | from: %s to: %s", v, from, to)) - }(v) - } - }() + etlLoader := etl.New(ewiLoader, repo) + // err = etlLoader.FanOut(ctx, cfg.EmployeeIdList, from, to) + err = etlLoader.FanOut2(ctx, cfg.EmployeeIdList, from, to) + //err = etlLoader.Main(signalContext(ctx), cr, cfg.EmployeeIdList, from, to) + if err != nil { + log.Fatalln("etlLoader.FanOut", err) + } + cr.MarkAndRestart("FanOut") - go func() { - if err := <-errChan; err != nil { - log.Fatalln("error while process", err) - cancel() - } - }() - - go func() { - for v := range ewiChan { - // log.Println("len v", len(v)) - err := professionalRepo.InsertMany(ctx, v) - if err != nil { - errChan <- err - return - } - cr.MarkAndRestart(fmt.Sprintf("database inserted: %d", len(v))) - } - - log.Println("cancel") - errChan <- nil - }() - wg.Wait() - // <-ctx.Done() log.Println("gracefully shutdown") } diff --git a/internal/etl/etl.go b/internal/etl/etl.go index e505426..f0f14c5 100644 --- a/internal/etl/etl.go +++ b/internal/etl/etl.go @@ -2,10 +2,12 @@ package etl import ( "context" + "fmt" "log" "runtime" "sync" + "gitea.urkob.com/urko/crono" "gitea.urkob.com/urko/ess-etl-go/internal/xml_loader" "gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi" "gitea.urkob.com/urko/ess-etl-go/pkg/domain" @@ -13,10 +15,10 @@ import ( type Etl struct { ewiLoader xml_loader.EmployeeWILoader - repo employee_wi.Repo + repo *employee_wi.Repo } -func New(ewiLoader xml_loader.EmployeeWILoader, repo employee_wi.Repo) *Etl { +func New(ewiLoader xml_loader.EmployeeWILoader, repo *employee_wi.Repo) *Etl { return &Etl{ ewiLoader: ewiLoader, repo: repo, @@ -70,3 +72,113 @@ func (etl *Etl) FanOut(ctx context.Context, employeeNumber []string, from, to st errChan <- nil return <-errChan } + +func (etl *Etl) FanOut2(ctx context.Context, employeeNumber []string, from, to string) error { + employeeWIChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber)) + xmlChan := make(chan []byte, len(employeeNumber)) + errChan := make(chan error, 1) + + go func() { + for { + select { + case v, ok := <-employeeWIChan: + if !ok { + errChan <- nil + return + } + err := etl.repo.InsertMany(ctx, v) + if err != nil { + errChan <- err + return + } + } + } + }() + + go func() { + for { + select { + case bts, ok := <-xmlChan: + if !ok { + close(employeeWIChan) + return + } + wi, err := xml_loader.GoLoadFromXML(bts) + if err != nil { + errChan <- err + return + } + employeeWIChan <- wi + } + } + }() + + go func() { + var wg sync.WaitGroup + wg.Add(len(employeeNumber)) + for i := range employeeNumber { + go func(v string) { + defer wg.Done() + bts, err := etl.ewiLoader.GoLoadEmployee(v, from, to) + if err != nil { + errChan <- err + return + } + xmlChan <- bts + }(employeeNumber[i]) + } + wg.Wait() + close(xmlChan) + }() + return <-errChan +} + +func (etl *Etl) Main(ctx context.Context, cr *crono.Crono, employeeNumber []string, from, to string) error { + ctx, cancel := context.WithCancel(ctx) + + errChan := make(chan error, 1) + ewiChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber)) + var wg sync.WaitGroup + wg.Add(1 + len(employeeNumber)) + + go func() { + defer wg.Done() + for _, v := range employeeNumber { + go func(v string) { + cr.Restart() + defer wg.Done() + wi, err := etl.ewiLoader.LoadEmployee(v, from, to) + if err != nil { + errChan <- err + return + } + ewiChan <- wi + cr.MarkAndRestart(fmt.Sprintf("ewiLoader.LoadEmployee | %s | from: %s to: %s", v, from, to)) + }(v) + } + }() + + go func() { + if err := <-errChan; err != nil { + log.Fatalln("error while process", err) + cancel() + return + } + }() + + go func() { + for v := range ewiChan { + // log.Println("len v", len(v)) + err := etl.repo.InsertMany(ctx, v) + if err != nil { + errChan <- err + return + } + cr.MarkAndRestart(fmt.Sprintf("database inserted: %d", len(v))) + } + errChan <- nil + }() + + wg.Wait() + return nil +} diff --git a/internal/etl/etl_benchmark_test.go b/internal/etl/etl_benchmark_test.go index 869517a..f0a9d3e 100644 --- a/internal/etl/etl_benchmark_test.go +++ b/internal/etl/etl_benchmark_test.go @@ -3,8 +3,11 @@ package etl_test import ( "context" "log" + "os" + "runtime/trace" "testing" + "gitea.urkob.com/urko/crono" "gitea.urkob.com/urko/ess-etl-go/config" "gitea.urkob.com/urko/ess-etl-go/internal/etl" "gitea.urkob.com/urko/ess-etl-go/internal/request" @@ -15,7 +18,18 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) -func BenchmarkLoad(b *testing.B) { +const traceFlag = false + +func BenchmarkETLFanout(b *testing.B) { + if traceFlag { + trace.Start(os.Stdout) + } + + defer func() { + if traceFlag { + trace.Stop() + } + }() cfg := config.NewConfig(".env") ctx := context.Background() @@ -28,23 +42,203 @@ func BenchmarkLoad(b *testing.B) { require.NoError(b, client.Connect(ctx), "client.Connect") employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) - if err = employeeWICollection.Drop(ctx); err != nil { - log.Fatalln("employeeWICollection.Drop", err) - } + require.NoError(b, employeeWICollection.Drop(ctx), "employeeWICollection.Drop") repo := employee_wi.NewRepo(employeeWICollection) r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) ewiLoader := xml_loader.NewEmployeeWILoader(r) from, to := "2023-01-01", "2023-01-31" - e := etl.New(ewiLoader, *repo) + etlLoader := etl.New(ewiLoader, repo) + + // err = etlLoader.Main(signalContext(ctx), cr, cfg.EmployeeIdList, from, to) - b.ResetTimer() for i := 0; i < b.N; i++ { - require.NoError(b, e.FanOut(ctx, cfg.EmployeeIdList, from, to)) + require.NoError(b, etlLoader.FanOut(ctx, cfg.EmployeeIdList, from, to)) } } +func BenchmarkETLFanout2(b *testing.B) { + if traceFlag { + trace.Start(os.Stdout) + } + + defer func() { + if traceFlag { + trace.Stop() + } + }() + cfg := config.NewConfig(".env") + ctx := context.Background() + + dbOpts := options.Client() + dbOpts.ApplyURI(cfg.DbAddress) + + client, err := mongo.NewClient(dbOpts) + require.NoError(b, err, "mongo.NewClient") + + require.NoError(b, client.Connect(ctx), "client.Connect") + + employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) + require.NoError(b, employeeWICollection.Drop(ctx), "employeeWICollection.Drop") + repo := employee_wi.NewRepo(employeeWICollection) + + r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) + + ewiLoader := xml_loader.NewEmployeeWILoader(r) + from, to := "2023-01-01", "2023-01-31" + etlLoader := etl.New(ewiLoader, repo) + + for i := 0; i < b.N; i++ { + require.NoError(b, etlLoader.FanOut2(ctx, cfg.EmployeeIdList, from, to)) + } +} + +func BenchmarkETLMain(b *testing.B) { + if traceFlag { + trace.Start(os.Stdout) + } + + defer func() { + if traceFlag { + trace.Stop() + } + }() + cfg := config.NewConfig(".env") + ctx := context.Background() + + dbOpts := options.Client() + dbOpts.ApplyURI(cfg.DbAddress) + + client, err := mongo.NewClient(dbOpts) + require.NoError(b, err, "mongo.NewClient") + + require.NoError(b, client.Connect(ctx), "client.Connect") + + employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) + require.NoError(b, employeeWICollection.Drop(ctx), "employeeWICollection.Drop") + repo := employee_wi.NewRepo(employeeWICollection) + + r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) + + ewiLoader := xml_loader.NewEmployeeWILoader(r) + from, to := "2023-01-01", "2023-01-31" + etlLoader := etl.New(ewiLoader, repo) + + for i := 0; i < b.N; i++ { + require.NoError(b, etlLoader.Main(ctx, crono.New(), cfg.EmployeeIdList, from, to)) + } +} + +func TestETLFanout(t *testing.T) { + cr := crono.New() + if traceFlag { + trace.Start(os.Stdout) + } else { + defer cr.Table() + } + + defer func() { + if traceFlag { + trace.Stop() + } + }() + cfg := config.NewConfig(".env") + ctx := context.Background() + + dbOpts := options.Client() + dbOpts.ApplyURI(cfg.DbAddress) + + client, err := mongo.NewClient(dbOpts) + require.NoError(t, err, "mongo.NewClient") + + require.NoError(t, client.Connect(ctx), "client.Connect") + + employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) + require.NoError(t, employeeWICollection.Drop(ctx), "employeeWICollection.Drop") + repo := employee_wi.NewRepo(employeeWICollection) + + r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) + + ewiLoader := xml_loader.NewEmployeeWILoader(r) + from, to := "2023-01-01", "2023-01-31" + etlLoader := etl.New(ewiLoader, repo) + + require.NoError(t, etlLoader.FanOut(ctx, cfg.EmployeeIdList, from, to)) +} + +func TestETLFanOut2(t *testing.T) { + cr := crono.New() + if traceFlag { + trace.Start(os.Stdout) + } else { + defer cr.Table() + } + + defer func() { + if traceFlag { + trace.Stop() + } + }() + cfg := config.NewConfig(".env") + ctx := context.Background() + + dbOpts := options.Client() + dbOpts.ApplyURI(cfg.DbAddress) + + client, err := mongo.NewClient(dbOpts) + require.NoError(t, err, "mongo.NewClient") + + require.NoError(t, client.Connect(ctx), "client.Connect") + + employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) + require.NoError(t, employeeWICollection.Drop(ctx), "employeeWICollection.Drop") + repo := employee_wi.NewRepo(employeeWICollection) + + r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) + ewiLoader := xml_loader.NewEmployeeWILoader(r) + from, to := "2023-01-01", "2023-01-31" + etlLoader := etl.New(ewiLoader, repo) + + require.NoError(t, etlLoader.FanOut2(ctx, cfg.EmployeeIdList, from, to)) +} + +func TestETLMain(t *testing.T) { + cr := crono.New() + if traceFlag { + trace.Start(os.Stdout) + } else { + defer cr.Table() + } + + defer func() { + if traceFlag { + trace.Stop() + } + }() + cfg := config.NewConfig(".env") + ctx := context.Background() + + dbOpts := options.Client() + dbOpts.ApplyURI(cfg.DbAddress) + + client, err := mongo.NewClient(dbOpts) + require.NoError(t, err, "mongo.NewClient") + + require.NoError(t, client.Connect(ctx), "client.Connect") + + employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) + require.NoError(t, employeeWICollection.Drop(ctx), "employeeWICollection.Drop") + repo := employee_wi.NewRepo(employeeWICollection) + + r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) + + ewiLoader := xml_loader.NewEmployeeWILoader(r) + from, to := "2023-01-01", "2023-01-31" + etlLoader := etl.New(ewiLoader, repo) + require.NoError(t, etlLoader.Main(ctx, cr, cfg.EmployeeIdList, from, to)) +} + func TestLoad(t *testing.T) { cfg := config.NewConfig(".env") ctx := context.Background() @@ -67,7 +261,7 @@ func TestLoad(t *testing.T) { ewiLoader := xml_loader.NewEmployeeWILoader(r) from, to := "2023-01-01", "2023-01-31" - e := etl.New(ewiLoader, *repo) + e := etl.New(ewiLoader, repo) require.NoError(t, e.FanOut(ctx, cfg.EmployeeIdList, from, to)) } diff --git a/internal/xml_loader/employee_wi.go b/internal/xml_loader/employee_wi.go index db64e46..404eef1 100644 --- a/internal/xml_loader/employee_wi.go +++ b/internal/xml_loader/employee_wi.go @@ -50,3 +50,24 @@ func loadFromXML(xmlFile []byte) ([]domain.EmployeeWorkInformation, error) { } return awi.EmployeeWorkInfos, nil } + +func (e EmployeeWILoader) GoLoadEmployee(employeeID, from, to string) ([]byte, error) { + employeeIDList := []string{employeeID} + xmlBts := make([]byte, 0, units.MiB*5) + err := e.r.EmployeeWorkInformation(&xmlBts, employeeIDList, from, to) + if err != nil { + return nil, fmt.Errorf("r.EmployeeWorkInformation: %s", err) + } + if len(xmlBts) <= 0 { + return nil, fmt.Errorf("couldn't load xml ") + } + return xmlBts, nil +} + +func GoLoadFromXML(xmlFile []byte) ([]domain.EmployeeWorkInformation, error) { + var awi domain.ArrayOfEmployeeWorkInformation + if err := xml.Unmarshal(xmlFile, &awi); err != nil { + return nil, fmt.Errorf("xml.Unmarshal: %s", err) + } + return awi.EmployeeWorkInfos, nil +}