package etl import ( "context" "fmt" "log" "runtime" "sync" "gitea.urkob.com/urko/crono" "gitea.urkob.com/urko/ess-etl-go/internal/xml_loader" "gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi" "gitea.urkob.com/urko/ess-etl-go/pkg/domain" ) type Etl struct { ewiLoader xml_loader.EmployeeWILoader repo *employee_wi.Repo } func New(ewiLoader xml_loader.EmployeeWILoader, repo *employee_wi.Repo) *Etl { return &Etl{ ewiLoader: ewiLoader, repo: repo, } } func (etl *Etl) FanOut(ctx context.Context, employeeNumber []string, from, to string) error { g := runtime.GOMAXPROCS(0) var wg sync.WaitGroup wg.Add(g) employeeWIChan := make(chan []domain.EmployeeWorkInformation, g) errChan := make(chan error, 1) for i := 0; i < g; i++ { go func() { defer func() { wg.Done() }() for v := range employeeWIChan { func() { err := etl.repo.InsertMany(ctx, v) if err != nil { errChan <- err return } }() } }() } go func() { var wg2 sync.WaitGroup wg2.Add(len(employeeNumber)) for i := range employeeNumber { go func(v string) { defer wg2.Done() wi, err := etl.ewiLoader.LoadEmployee(v, from, to) if err != nil { log.Println("err", err) errChan <- err return } employeeWIChan <- wi }(employeeNumber[i]) } wg2.Wait() close(employeeWIChan) }() wg.Wait() errChan <- nil return <-errChan } func (etl *Etl) FanOutV2(ctx context.Context, employeeNumber []string, from, to string) error { employeeWIChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber)) errChan := make(chan error, 1) go func() { for { select { case v, ok := <-employeeWIChan: if !ok { errChan <- nil return } err := etl.repo.InsertMany(ctx, v) if err != nil { errChan <- err return } } } }() go func() { var wg sync.WaitGroup wg.Add(len(employeeNumber)) for i := range employeeNumber { go func(v string) { defer wg.Done() wi, err := etl.ewiLoader.LoadEmployee(v, from, to) if err != nil { log.Println("err", err) errChan <- err return } employeeWIChan <- wi }(employeeNumber[i]) } wg.Wait() close(employeeWIChan) }() return <-errChan } func (etl *Etl) FanOut2(ctx context.Context, employeeNumber []string, from, to string) error { employeeWIChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber)) xmlChan := make(chan []byte, len(employeeNumber)) errChan := make(chan error, 1) go func() { for { select { case v, ok := <-employeeWIChan: if !ok { errChan <- nil return } err := etl.repo.InsertMany(ctx, v) if err != nil { errChan <- err return } } } }() go func() { for { select { case bts, ok := <-xmlChan: if !ok { close(employeeWIChan) return } wi, err := xml_loader.GoLoadFromXML(bts) if err != nil { errChan <- err return } employeeWIChan <- wi } } }() go func() { var wg sync.WaitGroup wg.Add(len(employeeNumber)) for i := range employeeNumber { go func(v string) { defer wg.Done() bts, err := etl.ewiLoader.GoLoadEmployee(v, from, to) if err != nil { errChan <- err return } xmlChan <- bts }(employeeNumber[i]) } wg.Wait() close(xmlChan) }() return <-errChan } func (etl *Etl) Main(ctx context.Context, cr *crono.Crono, employeeNumber []string, from, to string) error { ctx, cancel := context.WithCancel(ctx) errChan := make(chan error, 1) ewiChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber)) var wg sync.WaitGroup wg.Add(1 + len(employeeNumber)) go func() { defer wg.Done() for _, v := range employeeNumber { go func(v string) { cr.Restart() defer wg.Done() wi, err := etl.ewiLoader.LoadEmployee(v, from, to) if err != nil { errChan <- err return } ewiChan <- wi cr.MarkAndRestart(fmt.Sprintf("ewiLoader.LoadEmployee | %s | from: %s to: %s", v, from, to)) }(v) } }() go func() { if err := <-errChan; err != nil { log.Fatalln("error while process", err) cancel() return } }() go func() { for v := range ewiChan { // log.Println("len v", len(v)) err := etl.repo.InsertMany(ctx, v) if err != nil { errChan <- err return } cr.MarkAndRestart(fmt.Sprintf("database inserted: %d", len(v))) } errChan <- nil }() wg.Wait() return nil }