package main import ( "context" "fmt" "log" _ "net/http/pprof" "os" "os/signal" "sync" "syscall" "gitea.urkob.com/urko/crono" "gitea.urkob.com/urko/ess-etl-go/config" "gitea.urkob.com/urko/ess-etl-go/internal/request" "gitea.urkob.com/urko/ess-etl-go/internal/xml_loader" "gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi" "gitea.urkob.com/urko/ess-etl-go/pkg/domain" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) // var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") func main() { // flag.Parse() // if *cpuprofile != "" { // f, err := os.Create(*cpuprofile) // if err != nil { // log.Fatal(err) // } // pprof.StartCPUProfile(f) // defer pprof.StopCPUProfile() // } // // Add pprof endpoints // go func() { // log.Println(http.ListenAndServe("localhost:6060", nil)) // }() cr := crono.New() defer cr.Table() cfg := config.NewConfig(".env") ctx := context.Background() dbOpts := options.Client() dbOpts.ApplyURI(cfg.DbAddress) client, err := mongo.NewClient(dbOpts) if err != nil { log.Fatalln("mongo.NewClient", err) } log.Println("mongodb client is connected") if err = client.Connect(ctx); err != nil { log.Fatalln("client.Connect", err) } employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) if err = employeeWICollection.Drop(ctx); err != nil { log.Fatalln("employeeWICollection.Drop", err) } professionalRepo := employee_wi.NewRepo(employeeWICollection) r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) employeeIDList := cfg.EmployeeIdList ewiLoader := xml_loader.NewEmployeeWILoader(r) from, to := "2023-01-01", "2023-01-31" cr.MarkAndRestart("dependencies loaded") ctx, cancel := context.WithCancel(signalContext(context.Background())) errChan := make(chan error, 1) ewiChan := make(chan []domain.EmployeeWorkInformation, len(employeeIDList)) wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for _, v := range employeeIDList { wg.Add(1) go func(v string) { cr.Restart() defer wg.Done() wi, err := ewiLoader.LoadEmployee(v, from, to) if err != nil { errChan <- err return } ewiChan <- wi cr.MarkAndRestart(fmt.Sprintf("ewiLoader.LoadEmployee | %s | from: %s to: %s", v, from, to)) }(v) } }() go func() { if err := <-errChan; err != nil { log.Fatalln("error while process", err) cancel() } }() go func() { for v := range ewiChan { log.Println("len v", len(v)) err := professionalRepo.InsertMany(ctx, v) if err != nil { errChan <- err return } cr.MarkAndRestart(fmt.Sprintf("database inserted: %d", len(v))) } log.Println("cancel") errChan <- nil }() wg.Wait() // <-ctx.Done() log.Println("gracefully shutdown") } func signalContext(ctx context.Context) context.Context { ctx, cancel := context.WithCancel(ctx) sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() { log.Println("listening for shutdown signal") <-sigs log.Println("shutdown signal received") signal.Stop(sigs) close(sigs) cancel() }() return ctx }