73 lines
1.4 KiB
Go
73 lines
1.4 KiB
Go
|
package etl
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"log"
|
||
|
"runtime"
|
||
|
"sync"
|
||
|
|
||
|
"gitea.urkob.com/urko/ess-etl-go/internal/xml_loader"
|
||
|
"gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi"
|
||
|
"gitea.urkob.com/urko/ess-etl-go/pkg/domain"
|
||
|
)
|
||
|
|
||
|
type Etl struct {
|
||
|
ewiLoader xml_loader.EmployeeWILoader
|
||
|
repo employee_wi.Repo
|
||
|
}
|
||
|
|
||
|
func New(ewiLoader xml_loader.EmployeeWILoader, repo employee_wi.Repo) *Etl {
|
||
|
return &Etl{
|
||
|
ewiLoader: ewiLoader,
|
||
|
repo: repo,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (etl *Etl) FanOut(ctx context.Context, employeeNumber []string, from, to string) error {
|
||
|
g := runtime.GOMAXPROCS(0)
|
||
|
var wg sync.WaitGroup
|
||
|
wg.Add(g)
|
||
|
|
||
|
employeeWIChan := make(chan []domain.EmployeeWorkInformation, g)
|
||
|
errChan := make(chan error, 1)
|
||
|
|
||
|
for i := 0; i < g; i++ {
|
||
|
go func() {
|
||
|
defer func() {
|
||
|
wg.Done()
|
||
|
}()
|
||
|
for v := range employeeWIChan {
|
||
|
func() {
|
||
|
err := etl.repo.InsertMany(ctx, v)
|
||
|
if err != nil {
|
||
|
errChan <- err
|
||
|
return
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
go func() {
|
||
|
var wg2 sync.WaitGroup
|
||
|
wg2.Add(len(employeeNumber))
|
||
|
for i := range employeeNumber {
|
||
|
go func(v string) {
|
||
|
defer wg2.Done()
|
||
|
wi, err := etl.ewiLoader.LoadEmployee(v, from, to)
|
||
|
if err != nil {
|
||
|
log.Println("err", err)
|
||
|
errChan <- err
|
||
|
return
|
||
|
}
|
||
|
employeeWIChan <- wi
|
||
|
}(employeeNumber[i])
|
||
|
}
|
||
|
wg2.Wait()
|
||
|
close(employeeWIChan)
|
||
|
}()
|
||
|
|
||
|
wg.Wait()
|
||
|
errChan <- nil
|
||
|
return <-errChan
|
||
|
}
|