feat: WIP add new algorythms
This commit is contained in:
parent
4286af982b
commit
4956815ec6
4
Makefile
4
Makefile
|
@ -29,7 +29,7 @@ trace_etl: build_etl# go tool trace t.out
|
||||||
build_etl:
|
build_etl:
|
||||||
env GOOS=linux CGO_ENABLED=0 GOARCH=amd64 go build -v -o etl ./cmd/etl/main.go
|
env GOOS=linux CGO_ENABLED=0 GOARCH=amd64 go build -v -o etl ./cmd/etl/main.go
|
||||||
build_server:
|
build_server:
|
||||||
env GOOS=linux CGO_ENABLED=0 GOARCH=amd64 go build -o ${BINARY_DIR}/${BINARY_NAME} ./main.go
|
env GOOS=linux CGO_ENABLED=0 GOARCH=amd64 go build -o server ./cmd/server/main.go
|
||||||
run_server: build_server
|
run_server: build_server
|
||||||
./${BINARY_DIR}/${BINARY_NAME}
|
./server
|
||||||
|
|
||||||
|
|
|
@ -91,8 +91,9 @@ func main() {
|
||||||
cr.MarkAndRestart("dependencies loaded")
|
cr.MarkAndRestart("dependencies loaded")
|
||||||
|
|
||||||
etlLoader := etl.New(ewiLoader, repo)
|
etlLoader := etl.New(ewiLoader, repo)
|
||||||
// err = etlLoader.FanOut(ctx, cfg.EmployeeIdList, from, to)
|
err = etlLoader.FanOut(ctx, cfg.EmployeeIdList, from, to)
|
||||||
err = etlLoader.FanOut2(ctx, cfg.EmployeeIdList, from, to)
|
// err = etlLoader.FanOutV2(ctx, cfg.EmployeeIdList, from, to)
|
||||||
|
// err = etlLoader.FanOut2(ctx, cfg.EmployeeIdList, from, to)
|
||||||
// err = etlLoader.Main(signalContext(ctx), cr, cfg.EmployeeIdList, from, to)
|
// err = etlLoader.Main(signalContext(ctx), cr, cfg.EmployeeIdList, from, to)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("etlLoader.FanOut", err)
|
log.Fatalln("etlLoader.FanOut", err)
|
||||||
|
|
|
@ -73,6 +73,49 @@ func (etl *Etl) FanOut(ctx context.Context, employeeNumber []string, from, to st
|
||||||
return <-errChan
|
return <-errChan
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (etl *Etl) FanOutV2(ctx context.Context, employeeNumber []string, from, to string) error {
|
||||||
|
employeeWIChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber))
|
||||||
|
errChan := make(chan error, 1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case v, ok := <-employeeWIChan:
|
||||||
|
if !ok {
|
||||||
|
errChan <- nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := etl.repo.InsertMany(ctx, v)
|
||||||
|
if err != nil {
|
||||||
|
errChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(employeeNumber))
|
||||||
|
for i := range employeeNumber {
|
||||||
|
go func(v string) {
|
||||||
|
defer wg.Done()
|
||||||
|
wi, err := etl.ewiLoader.LoadEmployee(v, from, to)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("err", err)
|
||||||
|
errChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
employeeWIChan <- wi
|
||||||
|
}(employeeNumber[i])
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
close(employeeWIChan)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return <-errChan
|
||||||
|
}
|
||||||
|
|
||||||
func (etl *Etl) FanOut2(ctx context.Context, employeeNumber []string, from, to string) error {
|
func (etl *Etl) FanOut2(ctx context.Context, employeeNumber []string, from, to string) error {
|
||||||
employeeWIChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber))
|
employeeWIChan := make(chan []domain.EmployeeWorkInformation, len(employeeNumber))
|
||||||
xmlChan := make(chan []byte, len(employeeNumber))
|
xmlChan := make(chan []byte, len(employeeNumber))
|
||||||
|
|
Loading…
Reference in New Issue