package main import ( "context" "flag" "log" "net/http" _ "net/http/pprof" "os" "os/signal" "runtime/pprof" "runtime/trace" "syscall" "gitea.urkob.com/urko/crono" "gitea.urkob.com/urko/ess-etl-go/config" "gitea.urkob.com/urko/ess-etl-go/internal/etl" "gitea.urkob.com/urko/ess-etl-go/internal/request" "gitea.urkob.com/urko/ess-etl-go/internal/xml_loader" "gitea.urkob.com/urko/ess-etl-go/pkg/adapter/repository/mongodb/employee_wi" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var traceflag = flag.Bool("trace", false, "write trace to file") func main() { flag.Parse() if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { log.Fatal(err) } pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } // Add pprof endpoints go func() { if *cpuprofile != "" { log.Println(http.ListenAndServe("localhost:6060", nil)) } }() if *traceflag { log.Println("trace on") trace.Start(os.Stdout) } defer func() { if *traceflag { trace.Stop() } }() cr := crono.New() if !*traceflag { defer cr.Table() } cfg := config.NewConfig(".env") ctx := context.Background() dbOpts := options.Client() dbOpts.ApplyURI(cfg.DbAddress) client, err := mongo.NewClient(dbOpts) if err != nil { log.Fatalln("mongo.NewClient", err) } log.Println("mongodb client is connected") if err = client.Connect(ctx); err != nil { log.Fatalln("client.Connect", err) } employeeWICollection := client.Database(cfg.DbName).Collection(cfg.EmployeeWorkInformationCollection) if err = employeeWICollection.Drop(ctx); err != nil { log.Fatalln("employeeWICollection.Drop", err) } repo := employee_wi.NewRepo(employeeWICollection) r := request.NewRequestService(cfg.AmsApi, cfg.AmsApiKey) ewiLoader := xml_loader.NewEmployeeWILoader(r) from, to := "2023-01-01", "2023-01-31" cr.MarkAndRestart("dependencies loaded") etlLoader := etl.New(ewiLoader, repo) err = etlLoader.FanOut(ctx, cfg.EmployeeIdList, from, to) // err = etlLoader.FanOutV2(ctx, cfg.EmployeeIdList, from, to) // err = etlLoader.FanOut2(ctx, cfg.EmployeeIdList, from, to) // err = etlLoader.Main(signalContext(ctx), cr, cfg.EmployeeIdList, from, to) if err != nil { log.Fatalln("etlLoader.FanOut", err) } cr.MarkAndRestart("FanOut") log.Println("gracefully shutdown") } func signalContext(ctx context.Context) context.Context { ctx, cancel := context.WithCancel(ctx) sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() { log.Println("listening for shutdown signal") <-sigs log.Println("shutdown signal received") signal.Stop(sigs) close(sigs) cancel() }() return ctx }