228 lines
4.4 KiB
Go
228 lines
4.4 KiB
Go
package etl
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"runtime"
|
|
"sync"
|
|
|
|
"gitea.urkob.com/urko/crono"
|
|
"gitea.urkob.com/urko/ess-etl-go/internal/xml_loader"
|
|
"gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi"
|
|
"gitea.urkob.com/urko/ess-etl-go/pkg/domain"
|
|
)
|
|
|
|
type Etl struct {
|
|
ewiLoader xml_loader.EmployeeWILoader
|
|
repo *employee_wi.Repo
|
|
}
|
|
|
|
func New(ewiLoader xml_loader.EmployeeWILoader, repo *employee_wi.Repo) *Etl {
|
|
return &Etl{
|
|
ewiLoader: ewiLoader,
|
|
repo: repo,
|
|
}
|
|
}
|
|
|
|
func (etl *Etl) FanOut(ctx context.Context, employeeNumber []string, from, to string) error {
|
|
g := runtime.GOMAXPROCS(0)
|
|
var wg sync.WaitGroup
|
|
wg.Add(g)
|
|
|
|
employeeWIChan := make(chan []domain.EmployeeWorkInformation, g)
|
|
errChan := make(chan error, 1)
|
|
|
|
for i := 0; i < g; i++ {
|
|
go func() {
|
|
defer func() {
|
|
wg.Done()
|
|
}()
|
|
for v := range employeeWIChan {
|
|
func() {
|
|
err := etl.repo.InsertMany(ctx, v)
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
}()
|
|
}
|
|
go func() {
|
|
var wg2 sync.WaitGroup
|
|
wg2.Add(len(employeeNumber))
|
|
for i := range employeeNumber {
|
|
go func(v string) {
|
|
defer wg2.Done()
|
|
wi, err := etl.ewiLoader.LoadEmployee(v, from, to)
|
|
if err != nil {
|
|
log.Println("err", err)
|
|
errChan <- err
|
|
return
|
|
}
|
|
employeeWIChan <- wi
|
|
}(employeeNumber[i])
|
|
}
|
|
wg2.Wait()
|
|
close(employeeWIChan)
|
|
}()
|
|
|
|
wg.Wait()
|
|
errChan <- nil
|
|
return <-errChan
|
|
}
|
|
|
|
func (etl *Etl) FanOutV2(ctx context.Context, employeeNumber []string, from, to string) error {
|
|
employeeWIChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber))
|
|
errChan := make(chan error, 1)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case v, ok := <-employeeWIChan:
|
|
if !ok {
|
|
errChan <- nil
|
|
return
|
|
}
|
|
err := etl.repo.InsertMany(ctx, v)
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(employeeNumber))
|
|
for i := range employeeNumber {
|
|
go func(v string) {
|
|
defer wg.Done()
|
|
wi, err := etl.ewiLoader.LoadEmployee(v, from, to)
|
|
if err != nil {
|
|
log.Println("err", err)
|
|
errChan <- err
|
|
return
|
|
}
|
|
employeeWIChan <- wi
|
|
}(employeeNumber[i])
|
|
}
|
|
wg.Wait()
|
|
close(employeeWIChan)
|
|
}()
|
|
|
|
return <-errChan
|
|
}
|
|
|
|
func (etl *Etl) FanOut2(ctx context.Context, employeeNumber []string, from, to string) error {
|
|
employeeWIChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber))
|
|
xmlChan := make(chan []byte, len(employeeNumber))
|
|
errChan := make(chan error, 1)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case v, ok := <-employeeWIChan:
|
|
if !ok {
|
|
errChan <- nil
|
|
return
|
|
}
|
|
err := etl.repo.InsertMany(ctx, v)
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case bts, ok := <-xmlChan:
|
|
if !ok {
|
|
close(employeeWIChan)
|
|
return
|
|
}
|
|
wi, err := xml_loader.GoLoadFromXML(bts)
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
employeeWIChan <- wi
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(employeeNumber))
|
|
for i := range employeeNumber {
|
|
go func(v string) {
|
|
defer wg.Done()
|
|
bts, err := etl.ewiLoader.GoLoadEmployee(v, from, to)
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
xmlChan <- bts
|
|
}(employeeNumber[i])
|
|
}
|
|
wg.Wait()
|
|
close(xmlChan)
|
|
}()
|
|
return <-errChan
|
|
}
|
|
|
|
func (etl *Etl) Main(ctx context.Context, cr *crono.Crono, employeeNumber []string, from, to string) error {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
errChan := make(chan error, 1)
|
|
ewiChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber))
|
|
var wg sync.WaitGroup
|
|
wg.Add(1 + len(employeeNumber))
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
for _, v := range employeeNumber {
|
|
go func(v string) {
|
|
cr.Restart()
|
|
defer wg.Done()
|
|
wi, err := etl.ewiLoader.LoadEmployee(v, from, to)
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
ewiChan <- wi
|
|
cr.MarkAndRestart(fmt.Sprintf("ewiLoader.LoadEmployee | %s | from: %s to: %s", v, from, to))
|
|
}(v)
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
if err := <-errChan; err != nil {
|
|
log.Fatalln("error while process", err)
|
|
cancel()
|
|
return
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for v := range ewiChan {
|
|
// log.Println("len v", len(v))
|
|
err := etl.repo.InsertMany(ctx, v)
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
cr.MarkAndRestart(fmt.Sprintf("database inserted: %d", len(v)))
|
|
}
|
|
errChan <- nil
|
|
}()
|
|
|
|
wg.Wait()
|
|
return nil
|
|
}
|