ess-etl-go/internal/etl/etl.go

185 lines
3.6 KiB
Go
Raw Normal View History

package etl
import (
"context"
"fmt"
"log"
"runtime"
"sync"
"gitea.urkob.com/urko/crono"
"gitea.urkob.com/urko/ess-etl-go/internal/xml_loader"
"gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi"
"gitea.urkob.com/urko/ess-etl-go/pkg/domain"
)
type Etl struct {
ewiLoader xml_loader.EmployeeWILoader
repo *employee_wi.Repo
}
func New(ewiLoader xml_loader.EmployeeWILoader, repo *employee_wi.Repo) *Etl {
return &Etl{
ewiLoader: ewiLoader,
repo: repo,
}
}
func (etl *Etl) FanOut(ctx context.Context, employeeNumber []string, from, to string) error {
g := runtime.GOMAXPROCS(0)
var wg sync.WaitGroup
wg.Add(g)
employeeWIChan := make(chan []domain.EmployeeWorkInformation, g)
errChan := make(chan error, 1)
for i := 0; i < g; i++ {
go func() {
defer func() {
wg.Done()
}()
for v := range employeeWIChan {
func() {
err := etl.repo.InsertMany(ctx, v)
if err != nil {
errChan <- err
return
}
}()
}
}()
}
go func() {
var wg2 sync.WaitGroup
wg2.Add(len(employeeNumber))
for i := range employeeNumber {
go func(v string) {
defer wg2.Done()
wi, err := etl.ewiLoader.LoadEmployee(v, from, to)
if err != nil {
log.Println("err", err)
errChan <- err
return
}
employeeWIChan <- wi
}(employeeNumber[i])
}
wg2.Wait()
close(employeeWIChan)
}()
wg.Wait()
errChan <- nil
return <-errChan
}
func (etl *Etl) FanOut2(ctx context.Context, employeeNumber []string, from, to string) error {
employeeWIChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber))
xmlChan := make(chan []byte, len(employeeNumber))
errChan := make(chan error, 1)
go func() {
for {
select {
case v, ok := <-employeeWIChan:
if !ok {
errChan <- nil
return
}
err := etl.repo.InsertMany(ctx, v)
if err != nil {
errChan <- err
return
}
}
}
}()
go func() {
for {
select {
case bts, ok := <-xmlChan:
if !ok {
close(employeeWIChan)
return
}
wi, err := xml_loader.GoLoadFromXML(bts)
if err != nil {
errChan <- err
return
}
employeeWIChan <- wi
}
}
}()
go func() {
var wg sync.WaitGroup
wg.Add(len(employeeNumber))
for i := range employeeNumber {
go func(v string) {
defer wg.Done()
bts, err := etl.ewiLoader.GoLoadEmployee(v, from, to)
if err != nil {
errChan <- err
return
}
xmlChan <- bts
}(employeeNumber[i])
}
wg.Wait()
close(xmlChan)
}()
return <-errChan
}
func (etl *Etl) Main(ctx context.Context, cr *crono.Crono, employeeNumber []string, from, to string) error {
ctx, cancel := context.WithCancel(ctx)
errChan := make(chan error, 1)
ewiChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber))
var wg sync.WaitGroup
wg.Add(1 + len(employeeNumber))
go func() {
defer wg.Done()
for _, v := range employeeNumber {
go func(v string) {
cr.Restart()
defer wg.Done()
wi, err := etl.ewiLoader.LoadEmployee(v, from, to)
if err != nil {
errChan <- err
return
}
ewiChan <- wi
cr.MarkAndRestart(fmt.Sprintf("ewiLoader.LoadEmployee | %s | from: %s to: %s", v, from, to))
}(v)
}
}()
go func() {
if err := <-errChan; err != nil {
log.Fatalln("error while process", err)
cancel()
return
}
}()
go func() {
for v := range ewiChan {
// log.Println("len v", len(v))
err := etl.repo.InsertMany(ctx, v)
if err != nil {
errChan <- err
return
}
cr.MarkAndRestart(fmt.Sprintf("database inserted: %d", len(v)))
}
errChan <- nil
}()
wg.Wait()
return nil
}