diff --git a/Makefile b/Makefile index 86dc691..b87486e 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ trace_etl: build_etl# go tool trace t.out build_etl: env GOOS=linux CGO_ENABLED=0 GOARCH=amd64 go build -v -o etl ./cmd/etl/main.go build_server: - env GOOS=linux CGO_ENABLED=0 GOARCH=amd64 go build -o ${BINARY_DIR}/${BINARY_NAME} ./main.go + env GOOS=linux CGO_ENABLED=0 GOARCH=amd64 go build -o server ./cmd/server/main.go run_server: build_server - ./${BINARY_DIR}/${BINARY_NAME} + ./server diff --git a/cmd/etl/main.go b/cmd/etl/main.go index c2bf3a9..2e945f1 100644 --- a/cmd/etl/main.go +++ b/cmd/etl/main.go @@ -91,9 +91,10 @@ func main() { cr.MarkAndRestart("dependencies loaded") etlLoader := etl.New(ewiLoader, repo) - // err = etlLoader.FanOut(ctx, cfg.EmployeeIdList, from, to) - err = etlLoader.FanOut2(ctx, cfg.EmployeeIdList, from, to) - //err = etlLoader.Main(signalContext(ctx), cr, cfg.EmployeeIdList, from, to) + err = etlLoader.FanOut(ctx, cfg.EmployeeIdList, from, to) + // err = etlLoader.FanOutV2(ctx, cfg.EmployeeIdList, from, to) + // err = etlLoader.FanOut2(ctx, cfg.EmployeeIdList, from, to) + // err = etlLoader.Main(signalContext(ctx), cr, cfg.EmployeeIdList, from, to) if err != nil { log.Fatalln("etlLoader.FanOut", err) } diff --git a/internal/etl/etl.go b/internal/etl/etl.go index f0f14c5..f312351 100644 --- a/internal/etl/etl.go +++ b/internal/etl/etl.go @@ -73,6 +73,49 @@ func (etl *Etl) FanOut(ctx context.Context, employeeNumber []string, from, to st return <-errChan } +func (etl *Etl) FanOutV2(ctx context.Context, employeeNumber []string, from, to string) error { + employeeWIChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber)) + errChan := make(chan error, 1) + + go func() { + for { + select { + case v, ok := <-employeeWIChan: + if !ok { + errChan <- nil + return + } + err := etl.repo.InsertMany(ctx, v) + if err != nil { + errChan <- err + return + } + } + } + }() + + go func() { + var wg sync.WaitGroup + wg.Add(len(employeeNumber)) + for i := range employeeNumber { + go func(v string) { + defer wg.Done() + wi, err := etl.ewiLoader.LoadEmployee(v, from, to) + if err != nil { + log.Println("err", err) + errChan <- err + return + } + employeeWIChan <- wi + }(employeeNumber[i]) + } + wg.Wait() + close(employeeWIChan) + }() + + return <-errChan +} + func (etl *Etl) FanOut2(ctx context.Context, employeeNumber []string, from, to string) error { employeeWIChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber)) xmlChan := make(chan []byte, len(employeeNumber))