package main import ( "context" "flag" "fmt" "log" "net/http" _ "net/http/pprof" "os" "os/signal" "runtime/pprof" "runtime/trace" "sync" "syscall" "gitea.urkob.com/urko/crono" "gitea.urkob.com/urko/ess-etl-go/config" "gitea.urkob.com/urko/ess-etl-go/internal/request" "gitea.urkob.com/urko/ess-etl-go/internal/xml_loader" "gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi" "gitea.urkob.com/urko/ess-etl-go/pkg/domain" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var traceflag = flag.Bool("trace", false, "write trace to file") func main() { flag.Parse() if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { log.Fatal(err) } pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } // Add pprof endpoints go func() { if *cpuprofile != "" { log.Println(http.ListenAndServe("localhost:6060", nil)) } }() if *traceflag { log.Println("trace on") trace.Start(os.Stdout) } defer func() { if *traceflag { trace.Stop() } }() cr := crono.New() if !*traceflag { defer cr.Table() } cfg := config.NewConfig(".env") ctx := context.Background() dbOpts := options.Client() dbOpts.ApplyURI(cfg.DbAddress) client, err := mongo.NewClient(dbOpts) if err != nil { log.Fatalln("mongo.NewClient", err) } log.Println("mongodb client is connected") if err = client.Connect(ctx); err != nil { log.Fatalln("client.Connect", err) } employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) if err = employeeWICollection.Drop(ctx); err != nil { log.Fatalln("employeeWICollection.Drop", err) } professionalRepo := employee_wi.NewRepo(employeeWICollection) r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) employeeIDList := cfg.EmployeeIdList ewiLoader := xml_loader.NewEmployeeWILoader(r) from, to := "2023-01-01", "2023-01-31" cr.MarkAndRestart("dependencies loaded") ctx, cancel := context.WithCancel(signalContext(context.Background())) errChan := make(chan error, 1) ewiChan := make(chan []domain.EmployeeWorkInformation, len(employeeIDList)) wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for _, v := range employeeIDList { wg.Add(1) go func(v string) { cr.Restart() defer wg.Done() wi, err := ewiLoader.LoadEmployee(v, from, to) if err != nil { errChan <- err return } ewiChan <- wi cr.MarkAndRestart(fmt.Sprintf("ewiLoader.LoadEmployee | %s | from: %s to: %s", v, from, to)) }(v) } }() go func() { if err := <-errChan; err != nil { log.Fatalln("error while process", err) cancel() } }() go func() { for v := range ewiChan { // log.Println("len v", len(v)) err := professionalRepo.InsertMany(ctx, v) if err != nil { errChan <- err return } cr.MarkAndRestart(fmt.Sprintf("database inserted: %d", len(v))) } log.Println("cancel") errChan <- nil }() wg.Wait() // <-ctx.Done() log.Println("gracefully shutdown") } func signalContext(ctx context.Context) context.Context { ctx, cancel := context.WithCancel(ctx) sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() { log.Println("listening for shutdown signal") <-sigs log.Println("shutdown signal received") signal.Stop(sigs) close(sigs) cancel() }() return ctx }