ess-etl-go/internal/etl/etl.go

73 lines
1.4 KiB
Go

package etl
import (
"context"
"log"
"runtime"
"sync"
"gitea.urkob.com/urko/ess-etl-go/internal/xml_loader"
"gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi"
"gitea.urkob.com/urko/ess-etl-go/pkg/domain"
)
type Etl struct {
ewiLoader xml_loader.EmployeeWILoader
repo employee_wi.Repo
}
func New(ewiLoader xml_loader.EmployeeWILoader, repo employee_wi.Repo) *Etl {
return &Etl{
ewiLoader: ewiLoader,
repo: repo,
}
}
func (etl *Etl) FanOut(ctx context.Context, employeeNumber []string, from, to string) error {
g := runtime.GOMAXPROCS(0)
var wg sync.WaitGroup
wg.Add(g)
employeeWIChan := make(chan []domain.EmployeeWorkInformation, g)
errChan := make(chan error, 1)
for i := 0; i < g; i++ {
go func() {
defer func() {
wg.Done()
}()
for v := range employeeWIChan {
func() {
err := etl.repo.InsertMany(ctx, v)
if err != nil {
errChan <- err
return
}
}()
}
}()
}
go func() {
var wg2 sync.WaitGroup
wg2.Add(len(employeeNumber))
for i := range employeeNumber {
go func(v string) {
defer wg2.Done()
wi, err := etl.ewiLoader.LoadEmployee(v, from, to)
if err != nil {
log.Println("err", err)
errChan <- err
return
}
employeeWIChan <- wi
}(employeeNumber[i])
}
wg2.Wait()
close(employeeWIChan)
}()
wg.Wait()
errChan <- nil
return <-errChan
}