diff --git a/.gitignore b/.gitignore index 297bf68..179fa93 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .env +.env.* .vscode coverage .notes diff --git a/cmd/check.go b/cmd/check.go new file mode 100644 index 0000000..3c1a57d --- /dev/null +++ b/cmd/check.go @@ -0,0 +1,134 @@ +package cmd + +import ( + "bytes" + "context" + "errors" + "fmt" + "log" + "net/smtp" + "os" + "time" + + "gitea.urkob.com/urko/backblaze-backup/internal/services/backblaze" + "gitea.urkob.com/urko/backblaze-backup/internal/services/email" + "gitea.urkob.com/urko/backblaze-backup/kit" + "gitea.urkob.com/urko/backblaze-backup/kit/config" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var Check = &cobra.Command{ + Use: "check", + Short: "Compares local backup files with those in a Backblaze B2 bucket and sends a summary email.", + Long: `This command compares the list of files in a local backup directory against the files in a specified Backblaze B2 bucket. + The operation is performed concurrently and is time-bound, set by default to 5 minutes. + If discrepancies are found, i.e., some files exist only locally or only on the cloud, these are logged and sent via email as attachments. + + The email contains two text attachments: + - 'Local-Files-Not-In-B2.txt': Lists files that are in the local backup directory but not in the B2 bucket. + - 'B2-Files-Not-In-Local.txt': Lists files that are in the B2 bucket but not in the local backup directory. + + The command requires two flags: + - '--dir': The path of the local backup directory + - '--bucket': The name of the Backblaze B2 bucket + + An environment variable 'BACKBLAZE_ENV' can be set to 'dev' to load environment variables from a .env file in the root directory.`, + Run: func(cmd *cobra.Command, args []string) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) // or any appropriate time + defer cancel() + + log.SetFlags(log.Ldate | log.Lmicroseconds) + backupDir, err := cmd.Flags().GetString("dir") + if err != nil { + panic(fmt.Errorf("dir %w", err)) + } + bucketName, err := cmd.Flags().GetString("bucket") + if err != nil { + panic(fmt.Errorf("bucket %w", err)) + } + + envFile := "" + if os.Getenv("BACKBLAZE_ENV") == "dev" { + envFile = kit.RootDir() + "/.env" + } + + cfg := config.NewConfig(envFile) + mailSrv := email.NewMailService(email.MailServiceConfig{ + Auth: smtp.PlainAuth("", cfg.MailUser, cfg.MailPassword, cfg.MailHost), + Host: cfg.MailHost, + Port: cfg.MailPort, + From: cfg.MailUser, + }, + ) + logger := logrus.New() + logger.SetLevel(logrus.Level(cfg.LogLevel)) + + b, err := backblaze.NewBackBlaze(ctx, logger, cfg.BbId, cfg.BbKey) + if err != nil { + panic(fmt.Errorf("NewBackBlaze %w", err)) + } + + logger.Info("start check") + defer logger.Info("end check") + errs := make(chan backblaze.B2LocalErr) + go b.CompareConcurrent(ctx, backupDir, bucketName, errs) + + // Create two buffers for two kinds of errors + cloudNotInLocalBuffer := new(bytes.Buffer) + cloudNotInLocalBuffer.WriteString(fmt.Sprintf("List of B2 files within %s bucket not found in local path %s \n", bucketName, backupDir)) + countNotInLocalBuffer := 0 + + localNotInCloudBuffer := new(bytes.Buffer) + localNotInCloudBuffer.WriteString(fmt.Sprintf("List of local files in %s not found in B2 bucket %s \n", backupDir, bucketName)) + countNotInCloudBuffer := 0 + + loop: + for { + select { + case <-ctx.Done(): + // Handle the timeout or cancellation + // Release any resources here + if ctx.Err() == context.DeadlineExceeded { + logger.Error("Operation timed out") + } else if ctx.Err() == context.Canceled { + logger.Error("Operation canceled") + } + break loop + case err := <-errs: + if errors.Is(err.Err, backblaze.ErrCloudNotInLocal) { + logger.Debug(err.File + ": B2 file not found in local") + cloudNotInLocalBuffer.WriteString(err.File + "\n") + countNotInLocalBuffer++ + } + if errors.Is(err.Err, backblaze.ErrLocalNotInCloud) { + logger.Debug(err.File + ": local file not found in B2") + localNotInCloudBuffer.WriteString(err.File + "\n") + countNotInCloudBuffer++ + } + if err.Err == nil { + break loop + } + } + } + + var attachments []email.EmailAttachment + if countNotInCloudBuffer > 0 { + attachments = append(attachments, email.EmailAttachment{File: cloudNotInLocalBuffer, Title: "B2-Files-Not-In-Local.txt"}) + } + if countNotInLocalBuffer > 0 { + attachments = append(attachments, email.EmailAttachment{File: localNotInCloudBuffer, Title: "Local-Files-Not-In-B2.txt"}) + } + + if err := mailSrv.SendOK(email.EmailWithAttachments{ + To: cfg.MailTo, + Bucket: bucketName, + BackupDir: backupDir, + CountLocal: countNotInLocalBuffer, + CountCloud: countNotInCloudBuffer, + Attachments: attachments, + }); err != nil { + panic(fmt.Errorf("error while send email: %w", err)) + } + }, +} diff --git a/cmd/cleanup.go b/cmd/cleanup.go index e00f532..3ec8c1d 100644 --- a/cmd/cleanup.go +++ b/cmd/cleanup.go @@ -2,11 +2,14 @@ package cmd import ( "context" + "fmt" "log" "os" - "gitea.urkob.com/urko/backblaze-backup/internal/services" + "gitea.urkob.com/urko/backblaze-backup/internal/services/backblaze" + "gitea.urkob.com/urko/backblaze-backup/kit" "gitea.urkob.com/urko/backblaze-backup/kit/config" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -20,18 +23,23 @@ var Cleanup = &cobra.Command{ bucketName, err := cmd.Flags().GetString("bucket") if err != nil { - log.Fatalln("bucket %w", err) + panic(fmt.Errorf("bucket %w", err)) } envFile := "" if os.Getenv("BACKBLAZE_ENV") == "dev" { - envFile = ".env" + envFile = kit.RootDir() + "/.env" } cfg := config.NewConfig(envFile) + logger := logrus.New() + logger.SetLevel(logrus.Level(cfg.LogLevel)) - bbService := services.NewBackBlaze(cfg.BbId, cfg.BbKey) + bbService, err := backblaze.NewBackBlaze(ctx, logger, cfg.BbId, cfg.BbKey) + if err != nil { + panic(fmt.Errorf("NewBackBlaze %w", err)) + } if err := bbService.CleanUp(ctx, cancel, bucketName); err != nil { - log.Fatalln("bbService.CleanUp()", err) + panic(fmt.Errorf("bbService.CleanUp %w", err)) } }, } diff --git a/cmd/duplicate_versions.go b/cmd/duplicate_versions.go index 30b04e2..4ad020b 100644 --- a/cmd/duplicate_versions.go +++ b/cmd/duplicate_versions.go @@ -2,13 +2,16 @@ package cmd import ( "context" + "fmt" "log" "os" "os/signal" "syscall" - "gitea.urkob.com/urko/backblaze-backup/internal/services" + "gitea.urkob.com/urko/backblaze-backup/internal/services/backblaze" + "gitea.urkob.com/urko/backblaze-backup/kit" "gitea.urkob.com/urko/backblaze-backup/kit/config" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -22,13 +25,18 @@ var Versions = &cobra.Command{ envFile := "" if os.Getenv("BACKBLAZE_ENV") == "dev" { - envFile = ".env" + envFile = kit.RootDir() + "/.env" } cfg := config.NewConfig(envFile) + logger := logrus.New() + logger.SetLevel(logrus.Level(cfg.LogLevel)) - bbService := services.NewBackBlaze(cfg.BbId, cfg.BbKey) + bbService, err := backblaze.NewBackBlaze(ctx, logger, cfg.BbId, cfg.BbKey) + if err != nil { + panic(fmt.Errorf("NewBackBlaze %w", err)) + } if err := bbService.ListDuplicateVersions(ctx, cancel); err != nil { - log.Fatalln("bbService.ListDuplicateVersions()", err) + panic(fmt.Errorf("bbService.ListDuplicateVersions %w", err)) } }, } diff --git a/cmd/sync.go b/cmd/sync.go index 1be49c3..4f5bb07 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -2,11 +2,14 @@ package cmd import ( "context" + "fmt" "log" "os" - "gitea.urkob.com/urko/backblaze-backup/internal/services" + "gitea.urkob.com/urko/backblaze-backup/internal/services/backblaze" + "gitea.urkob.com/urko/backblaze-backup/kit" "gitea.urkob.com/urko/backblaze-backup/kit/config" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -21,26 +24,34 @@ var Sync = &cobra.Command{ log.SetFlags(log.Ldate | log.Lmicroseconds) filePath, err := cmd.Flags().GetString("file") if err != nil { - log.Fatalln("file %w", err) + panic(fmt.Errorf("file %w", err)) } dir, err := cmd.Flags().GetString("dir") if err != nil { - log.Fatalln("dir %w", err) + panic(fmt.Errorf("dir %w", err)) } bucketName, err := cmd.Flags().GetString("bucket") if err != nil { - log.Fatalln("bucket %w", err) + panic(fmt.Errorf("bucket %w", err)) } envFile := "" if os.Getenv("BACKBLAZE_ENV") == "dev" { - envFile = ".env" + envFile = kit.RootDir() + "/.env" } - cfg := config.NewConfig(envFile) - bbService := services.NewBackBlaze(cfg.BbId, cfg.BbKey).WithBucket(bucketName).WithDir(dir).WithFile(filePath) + cfg := config.NewConfig(envFile) + logger := logrus.New() + logger.SetLevel(logrus.Level(cfg.LogLevel)) + + bbService, err := backblaze.NewBackBlaze(ctx, logger, cfg.BbId, cfg.BbKey) + if err != nil { + panic(fmt.Errorf("NewBackBlaze %w", err)) + } + + bbService = bbService.WithBucket(bucketName).WithDir(dir).WithFile(filePath) if err := bbService.Sync(ctx); err != nil { - log.Fatalln("bbService.Sync()", err) + panic(fmt.Errorf("bbService.Sync %w", err)) } }, } diff --git a/go.mod b/go.mod index 41df227..f1b3ad0 100644 --- a/go.mod +++ b/go.mod @@ -9,18 +9,22 @@ require ( github.com/kurin/blazer v0.5.3 github.com/rclone/rclone v1.63.1 github.com/spf13/cobra v1.7.0 + github.com/stretchr/testify v1.8.3 golang.org/x/sync v0.3.0 ) require ( + cloud.google.com/go/compute/metadata v0.2.3 // indirect github.com/Max-Sum/base32768 v0.0.0-20230304063302-18e6ce5945fd // indirect github.com/abbot/go-http-auth v0.4.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/coreos/go-semver v0.3.1 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-chi/chi/v5 v5.0.8 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/googleapis/gax-go/v2 v2.11.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jzelinskie/whirlpool v0.0.0-20201016144138-0675e54bb004 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect @@ -28,6 +32,7 @@ require ( github.com/mattn/go-isatty v0.0.16 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect @@ -41,11 +46,14 @@ require ( github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.0 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect - golang.org/x/crypto v0.7.0 // indirect - golang.org/x/net v0.8.0 // indirect + golang.org/x/crypto v0.9.0 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/oauth2 v0.8.0 // indirect golang.org/x/sys v0.8.0 // indirect - golang.org/x/term v0.7.0 // indirect - golang.org/x/text v0.8.0 // indirect + golang.org/x/term v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect golang.org/x/time v0.3.0 // indirect + google.golang.org/api v0.126.0 // indirect google.golang.org/protobuf v1.30.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index eb77236..c249f4c 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,7 @@ cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bP cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKVk= cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs= cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc= +cloud.google.com/go v0.65.0 h1:Dg9iHVQfrhq82rUNu9ZxUDrJLaxFUe/HlCVaLyRruq8= cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= @@ -19,8 +20,9 @@ cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvf cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg= cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= -cloud.google.com/go/compute v1.19.0 h1:+9zda3WGgW1ZSTlVppLCYFIr48Pa35q1uG2N1itbCEQ= +cloud.google.com/go/compute v1.19.3 h1:DcTwsFgGev/wV5+q8o2fzgcHOaac+DKGC91ZlvpsQds= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= @@ -161,11 +163,13 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/googleapis/enterprise-certificate-proxy v0.2.3 h1:yk9/cqRKtT9wXZSsRH9aurXEpJX+U6FLtpYTdC3R06k= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= -github.com/googleapis/gax-go/v2 v2.8.0 h1:UBtEZqx1bjXtOQ5BVTkuYghXrr3N4V123VKJK67vJZc= +github.com/googleapis/gax-go/v2 v2.11.0 h1:9V9PWXEsWnPpQhu/PeQIkS4eGzMlTLGgt80cUUI8Ki4= +github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5iydzRfb3peWZJI= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= @@ -208,8 +212,10 @@ github.com/koofr/go-httpclient v0.0.0-20230225102643-5d51a2e9dea6 h1:uF5FHZ/L5gv github.com/koofr/go-koofrclient v0.0.0-20221207135200-cbd7fc9ad6a6 h1:FHVoZMOVRA+6/y4yRlbiR3WvsrOcKBd/f64H7YiWR2U= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kurin/blazer v0.5.3 h1:SAgYv0TKU0kN/ETfO5ExjNAPyMt2FocO2s/UlCHfjAk= github.com/kurin/blazer v0.5.3/go.mod h1:4FCXMUWo9DllR2Do4TtBd377ezyAJ51vB5uTBjt0pGU= @@ -339,8 +345,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= -golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= +golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -402,8 +408,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -411,7 +417,8 @@ golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= -golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw= +golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8= +golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -470,8 +477,8 @@ golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ= -golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= +golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -479,8 +486,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -546,7 +553,8 @@ google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE= google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM= google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc= -google.golang.org/api v0.115.0 h1:6FFkVvStt4YqXSx3azKyzj7fXerGnVlLJ/eud01nBDE= +google.golang.org/api v0.126.0 h1:q4GJq+cAdMAC7XP7njvQ4tvohGLiSlytuL4BQxbIZ+o= +google.golang.org/api v0.126.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -583,7 +591,8 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08= +google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc h1:8DyZCyvI8mE1IdLy/60bS+52xfymkE72wv1asokgtao= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -596,7 +605,7 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= +google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -615,6 +624,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/services/backblaze.go b/internal/services/backblaze.go deleted file mode 100644 index 01f3691..0000000 --- a/internal/services/backblaze.go +++ /dev/null @@ -1,364 +0,0 @@ -package services - -import ( - "context" - "errors" - "fmt" - "io" - "io/fs" - "log" - "os" - "path/filepath" - "runtime" - "strconv" - "strings" - "sync" - - rcloneb2 "github.com/rclone/rclone/backend/b2" - - "github.com/kurin/blazer/b2" - "github.com/rclone/rclone/fs/config/configmap" - "github.com/rclone/rclone/fs/operations" - "golang.org/x/sync/semaphore" -) - -const writers = 4 -const maxConcurrentWeight = 5 -const largeFileSize = 500 * 1024 * 1024 // 500 MB - -type BackBalze struct { - bucketName string - dir string - filePath string - maxWorkers int - bbID string - bbKey string -} - -func NewBackBlaze(bbID, bbKey string) *BackBalze { - log.Println("runtime.NumCPU()", runtime.NumCPU()) - return &BackBalze{ - bbID: bbID, - bbKey: bbKey, - maxWorkers: runtime.NumCPU(), - } -} -func (b *BackBalze) WithBucket(bucketName string) *BackBalze { - b.bucketName = bucketName - return b -} -func (b *BackBalze) WithDir(dir string) *BackBalze { - b.dir = dir - return b -} -func (b *BackBalze) WithFile(filePath string) *BackBalze { - b.filePath = filePath - return b -} -func (b *BackBalze) Sync(ctx context.Context) error { - if b.bucketName == "" && (b.filePath == "" || b.dir == "") { - return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) - } - - if b.filePath != "" && b.dir != "" { - return errors.New("you must select just 1 option, dir or file") - } - - b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey) - if err != nil { - return fmt.Errorf("b2.NewClient %w", err) - } - log.Println("b2Client ok") - bc, err := b2Client.Bucket(ctx, b.bucketName) - if err != nil { - return fmt.Errorf("b2Client.Bucket %w", err) - } - - if bc == nil { - return fmt.Errorf("bucket doesn't exist %s", b.bucketName) - } - - log.Println("bucket found:", bc.Name()) - if b.filePath != "" { - log.Println("file:", b.filePath) - - if err := copyFile(ctx, bc, b.filePath); err != nil { - return fmt.Errorf("copyFile %w", err) - } - return nil - } - - if b.dir != "" { - oldFiles, err := bucketFiles(ctx, bc) - if err != nil { - return fmt.Errorf("bucketFiles %w", err) - } - log.Println(strings.Repeat("*", 40)) - log.Println("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t")) - log.Println(strings.Repeat("*", 40)) - - fileChan := make(chan string) - - var wg sync.WaitGroup - - for i := 0; i < b.maxWorkers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for src := range fileChan { - err := copyFile(ctx, bc, src) - if err != nil { - log.Printf("error copying file %s: %v\n", src, err) - continue - } - } - }() - } - - // Walk the directory and send files to the channel for uploading - err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error { - if err != nil { - return err - } - if !d.IsDir() { - fileChan <- path - } - return nil - }) - if err != nil { - return fmt.Errorf("error walking the directory: %v", err) - } - - // Close the channel (no more files to send) - close(fileChan) - wg.Wait() - - // Cleanup old files after backup is completed - if err := cleanBucket(ctx, bc, oldFiles); err != nil { - return fmt.Errorf("cleanBucket %w", err) - } - } - - log.Println("copied successfully") - return nil -} - -func (b *BackBalze) OldSync() error { - if b.bucketName == "" && (b.filePath == "" || b.dir == "") { - return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) - } - - if b.filePath != "" && b.dir != "" { - return errors.New("you must select just 1 option, dir or file") - } - - ctx := context.Background() - b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey) - if err != nil { - return fmt.Errorf("b2.NewClient %w", err) - } - log.Println("b2Client ok") - bc, err := b2Client.Bucket(ctx, b.bucketName) - if err != nil { - return fmt.Errorf("b2Client.Bucket %w", err) - } - - if bc == nil { - return fmt.Errorf("bucket doesn't exist %s", b.bucketName) - } - - log.Println("bucket found:", bc.Name()) - if b.filePath != "" { - log.Println("file:", b.filePath) - - if err := copyFile(ctx, bc, b.filePath); err != nil { - return fmt.Errorf("copyFile %w", err) - } - return nil - } - - if b.dir != "" { - oldFiles, err := bucketFiles(ctx, bc) - if err != nil { - return fmt.Errorf("bucketFiles %w", err) - } - log.Println(strings.Repeat("*", 40)) - log.Println("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t")) - log.Println(strings.Repeat("*", 40)) - - fileChan := make(chan string) - uploadSem := semaphore.NewWeighted(maxConcurrentWeight) - - var wg sync.WaitGroup - - for i := 0; i < b.maxWorkers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for src := range fileChan { - info, err := os.Stat(src) - if err != nil { - log.Printf("error getting file info %s: %v\n", src, err) - continue - } - weight := int64(1) - if info.Size() > largeFileSize { - weight = 2 - } - if err := uploadSem.Acquire(ctx, weight); err == nil { - log.Println("start copying file", src) - if err := copyFile(ctx, bc, src); err != nil { - log.Printf("error copying file %s: %v\n", src, err) - } - uploadSem.Release(weight) - } else { - log.Printf("error acquiring semaphore: %v\n", err) - } - } - }() - } - - // Walk the directory and send files to the channel for uploading - err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error { - if err != nil { - return err - } - if !d.IsDir() { - fileChan <- path - } - return nil - }) - if err != nil { - return fmt.Errorf("error walking the directory: %v", err) - } - - // Close the channel (no more files to send) - close(fileChan) - wg.Wait() - - // Cleanup old files after backup is completed - if err := cleanBucket(ctx, bc, oldFiles); err != nil { - return fmt.Errorf("cleanBucket %w", err) - } - } - - log.Println("copied successfully") - return nil -} - -func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error { - f, err := os.Open(src) - if err != nil { - return err - } - defer f.Close() - fi, err := f.Stat() - if err != nil { - return err - } - w := bucket.Object(fi.Name()).NewWriter(ctx) - w.ConcurrentUploads = writers - w.UseFileBuffer = true - log.Println("start copying", fi.Name()) - if _, err := io.Copy(w, f); err != nil { - w.Close() - return err - } - log.Println("end copying", fi.Name()) - return w.Close() -} - -func cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error { - var errorBuilder strings.Builder - for _, v := range files { - obj := bucket.Object(v) - if obj == nil { - log.Println("object is nil", v) - continue - } - if err := obj.Delete(ctx); err != nil { - errorBuilder.WriteString(fmt.Errorf("error deleting %s : %w", v, err).Error()) - errorBuilder.WriteString("; ") - } - } - - if errorBuilder.Len() > 0 { - return errors.New(errorBuilder.String()) - } - - return nil -} - -func bucketFiles(ctx context.Context, bucket *b2.Bucket) ([]string, error) { - bucketIter := bucket.List(ctx) - if bucketIter == nil { - return nil, fmt.Errorf("bucket list cannot be nil") - } - var files []string - for { - if !bucketIter.Next() { - if bucketIter.Err() != nil { - return nil, fmt.Errorf("bucketIter err %w", bucketIter.Err()) - } - break - } - if bucketIter.Object() == nil { - log.Println("bucketIter Object is nil") - continue - } - files = append(files, bucketIter.Object().Name()) - } - return files, nil -} - -type duplicate struct { - bucket string - file string - count int -} - -func (b *BackBalze) CleanUp(ctx context.Context, cancel context.CancelFunc, bucketName string) error { - - b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey) - if err != nil { - return fmt.Errorf("b2.NewClient %w", err) - } - - log.Println("b2Client ok") - - var dups []duplicate - if bucketName != "" { - dups, err = b.listDuplicatesFromBucket(ctx, cancel, b2Client, bucketName) - if err != nil { - return fmt.Errorf("b.listDuplicatesFromBucket: %w", err) - } - } else { - dups, err = b.listDuplicates(ctx, cancel, b2Client) - if err != nil { - return fmt.Errorf("b.listDuplicates: %w", err) - } - - } - - if len(dups) <= 0 { - return nil - } - - smpl := configmap.Simple{} - smpl.Set("account", b.bbID) - smpl.Set("key", b.bbKey) - smpl.Set("chunk_size", strconv.FormatInt(int64(9600), 10)) - log.Println("duplicates", len(dups)) - for _, d := range dups { - f, err := rcloneb2.NewFs(ctx, "B2", d.dir(), smpl) - if err != nil { - return fmt.Errorf("rclonefs.NewFs %w", err) - } - if err := operations.CleanUp(ctx, f); err != nil { - return fmt.Errorf("operations.CleanUp %w", err) - } - log.Println(d.dir(), "cleaned up") - } - - return nil -} diff --git a/internal/services/backblaze/backblaze.go b/internal/services/backblaze/backblaze.go new file mode 100644 index 0000000..808837a --- /dev/null +++ b/internal/services/backblaze/backblaze.go @@ -0,0 +1,50 @@ +package backblaze + +import ( + "context" + "fmt" + "runtime" + + "github.com/kurin/blazer/b2" + "github.com/sirupsen/logrus" +) + +type BackBlaze struct { + logger *logrus.Logger + bucketName string + bbID string + bbKey string + dir string + filePath string + maxWorkers int + b2Client *b2.Client +} + +// NewBackBlaze initializes a new BackBlaze struct with given BackBlaze ID and Key. +func NewBackBlaze(ctx context.Context, logger *logrus.Logger, bbID, bbKey string) (*BackBlaze, error) { + b2Client, err := b2.NewClient(ctx, bbID, bbKey) + if err != nil { + return nil, fmt.Errorf("b2.NewClient %w", err) + } + + return &BackBlaze{ + logger: logger, + b2Client: b2Client, + bbID: bbID, + bbKey: bbKey, + maxWorkers: runtime.NumCPU(), + }, nil +} + +func (b *BackBlaze) WithBucket(bucketName string) *BackBlaze { + b.bucketName = bucketName + return b +} +func (b *BackBlaze) WithDir(dir string) *BackBlaze { + b.dir = dir + return b +} +func (b *BackBlaze) WithFile(filePath string) *BackBlaze { + b.filePath = filePath + return b +} diff --git a/internal/services/backblaze/check.go b/internal/services/backblaze/check.go new file mode 100644 index 0000000..4a7920f --- /dev/null +++ b/internal/services/backblaze/check.go @@ -0,0 +1,152 @@ +package backblaze + +import ( + "context" + "errors" + "fmt" + "io/fs" + "path/filepath" + "sync" + + "github.com/kurin/blazer/b2" +) + +// localFiles lists the local files in the given backup directory and sends them to a channel. +// It closes the channel after all files are listed. +func (b *BackBlaze) localFiles(backupDir string, fileChan chan<- string) error { + defer close(fileChan) + // Walk the directory and send files to the channel + err := filepath.WalkDir(backupDir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() { + fileChan <- filepath.Base(path) + } + return nil + }) + if err != nil { + return fmt.Errorf("error walking the directory: %v", err) + } + + return nil +} + +// b2BucketFiles lists the files in the given B2 bucket and sends them to a channel. +// It closes the channel after all files are listed. +func (b *BackBlaze) b2BucketFiles(ctx context.Context, bucketName string, fileChan chan<- string) error { + bucket, err := b.b2Client.Bucket(ctx, bucketName) + defer close(fileChan) + if err != nil { + return fmt.Errorf("b2Client.Bucket %w", err) + } + + bucketIter := bucket.List(ctx, b2.ListHidden()) + if bucketIter == nil { + return errors.New("bucket list cannot be nil") + } + + for { + if !bucketIter.Next() { + if bucketIter.Err() != nil { + return fmt.Errorf("bucketIter err %w", bucketIter.Err()) + } + break + } + if bucketIter.Object() == nil { + return errors.New("bucketIter Object is nil") + } + b.logger.Debug("bucket file: ", bucketIter.Object().Name()) + fileChan <- bucketIter.Object().Name() + } + + return nil +} + +var ErrLocalNotInCloud error = errors.New("exists locally but not in the cloud") +var ErrCloudNotInLocal error = errors.New("exists on cloud but not locally") + +type B2LocalErr struct { + File string + Err error +} + +// CompareConcurrent concurrently fetches the list of local files and cloud files, +// then compares them to ensure all local files exist in the cloud. +// Errors are sent to a provided error channel. The function will panic if an error occurs while listing files. +func (b *BackBlaze) CompareConcurrent(ctx context.Context, backupDir, bucketName string, errors chan<- B2LocalErr) { + var wg sync.WaitGroup + localFiles := make(map[string]int) + cloudFiles := make(map[string]int) + localFileChan := make(chan string) + b2FileChan := make(chan string) + + // Local listing + wg.Add(1) + go func() { + defer wg.Done() + wg.Add(1) + go func() { + defer wg.Done() + for f := range localFileChan { + if _, ok := localFiles[f]; ok { + panic(fmt.Errorf("local file already exists in map: %s", f)) + } + b.logger.Debug("local file", f) + localFiles[f]++ + } + }() + + if err := b.localFiles(backupDir, localFileChan); err != nil { + panic(fmt.Errorf("b.LocalFilesWithB2: %w", err)) + } + }() + + // Cloud listing + wg.Add(1) + go func() { + defer wg.Done() + wg.Add(1) + go func() { + defer wg.Done() + for f := range b2FileChan { + if _, ok := cloudFiles[f]; ok { + panic(fmt.Errorf("cloud file already exists in map: %s", f)) + } + b.logger.Debug("B2 file", f) + cloudFiles[f]++ + } + }() + if err := b.b2BucketFiles(ctx, bucketName, b2FileChan); err != nil { + panic(fmt.Errorf("b.LocalFilesWithB2: %w", err)) + } + }() + + // Wait for both to complete + wg.Wait() + + // Now check local files that are not presesnt in cloud + wg.Add(2) + go func() { + defer wg.Done() + for localFile := range localFiles { + if _, exists := cloudFiles[localFile]; !exists { + errors <- B2LocalErr{File: localFile, Err: ErrLocalNotInCloud} + } + } + }() + + // Now check cloud files that are not in local + go func() { + defer wg.Done() + for cloudFile := range cloudFiles { + if _, exists := localFiles[cloudFile]; !exists { + errors <- B2LocalErr{File: cloudFile, Err: ErrCloudNotInLocal} + } + } + }() + + wg.Wait() + errors <- B2LocalErr{Err: nil} + close(errors) +} diff --git a/internal/services/backblaze/cleanup.go b/internal/services/backblaze/cleanup.go new file mode 100644 index 0000000..d07c5fb --- /dev/null +++ b/internal/services/backblaze/cleanup.go @@ -0,0 +1,50 @@ +package backblaze + +import ( + "context" + "fmt" + "strconv" + + "github.com/rclone/rclone/backend/b2" + "github.com/rclone/rclone/fs/config/configmap" + "github.com/rclone/rclone/fs/operations" +) + +func (b *BackBlaze) CleanUp(ctx context.Context, cancel context.CancelFunc, bucketName string) error { + var dups []duplicate + var err error + if bucketName != "" { + dups, err = b.listDuplicatesFromBucket(ctx, cancel, b.b2Client, bucketName) + if err != nil { + return fmt.Errorf("b.listDuplicatesFromBucket: %w", err) + } + } else { + dups, err = b.listDuplicates(ctx, cancel, b.b2Client) + if err != nil { + return fmt.Errorf("b.listDuplicates: %w", err) + } + + } + + if len(dups) <= 0 { + return nil + } + + smpl := configmap.Simple{} + smpl.Set("account", b.bbID) + smpl.Set("key", b.bbKey) + smpl.Set("chunk_size", strconv.FormatInt(int64(9600), 10)) + b.logger.Info("duplicates", len(dups)) + for _, d := range dups { + f, err := b2.NewFs(ctx, "B2", d.dir(), smpl) + if err != nil { + return fmt.Errorf("rclonefs.NewFs %w", err) + } + if err := operations.CleanUp(ctx, f); err != nil { + return fmt.Errorf("operations.CleanUp %w", err) + } + b.logger.Info(d.dir(), "cleaned up") + } + + return nil +} diff --git a/internal/services/duplicates.go b/internal/services/backblaze/duplicates.go similarity index 86% rename from internal/services/duplicates.go rename to internal/services/backblaze/duplicates.go index 66d9b39..4f66282 100644 --- a/internal/services/duplicates.go +++ b/internal/services/backblaze/duplicates.go @@ -1,4 +1,4 @@ -package services +package backblaze import ( "context" @@ -12,6 +12,12 @@ import ( "golang.org/x/sync/semaphore" ) +type duplicate struct { + bucket string + file string + count int +} + func (d duplicate) dir() string { if !strings.Contains(d.file, "/") { return d.bucket @@ -20,7 +26,7 @@ func (d duplicate) dir() string { return strings.Join(splitted[:(len(splitted)-1)], "/") } -func (b *BackBalze) ListDuplicateVersions(ctx context.Context, cancel context.CancelFunc) error { +func (b *BackBlaze) ListDuplicateVersions(ctx context.Context, cancel context.CancelFunc) error { b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey) if err != nil { return fmt.Errorf("b2.NewClient %w", err) @@ -42,7 +48,7 @@ func (b *BackBalze) ListDuplicateVersions(ctx context.Context, cancel context.Ca return nil } -func (b *BackBalze) listDuplicates(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client) ([]duplicate, error) { +func (b *BackBlaze) listDuplicates(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client) ([]duplicate, error) { buckets, err := b2Client.ListBuckets(ctx) if err != nil { return nil, fmt.Errorf("b2Client.Bucket %w", err) @@ -66,20 +72,20 @@ func (b *BackBalze) listDuplicates(ctx context.Context, cancel context.CancelFun bucketIter := bc.List(ctx, b2.ListHidden()) if bucketIter == nil { - log.Println("bucket list cannot be nil") + b.logger.Error("bucket list cannot be nil") return } for { if !bucketIter.Next() { if bucketIter.Err() != nil { - log.Println("bucketIter err %w", bucketIter.Err()) + b.logger.Error("bucketIter err %w", bucketIter.Err()) return } break } if bucketIter.Object() == nil { - log.Println("bucketIter Object is nil") + b.logger.Error("bucketIter Object is nil") continue } files[bucketIter.Object().Name()]++ @@ -102,7 +108,7 @@ func (b *BackBalze) listDuplicates(ctx context.Context, cancel context.CancelFun return dups, nil } -func (b *BackBalze) listDuplicatesFromBucket(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client, bucketName string) ([]duplicate, error) { +func (b *BackBlaze) listDuplicatesFromBucket(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client, bucketName string) ([]duplicate, error) { bucket, err := b2Client.Bucket(ctx, bucketName) if err != nil { return nil, fmt.Errorf("b2Client.Bucket %w", err) diff --git a/internal/services/backblaze/sync.go b/internal/services/backblaze/sync.go new file mode 100644 index 0000000..80d4f52 --- /dev/null +++ b/internal/services/backblaze/sync.go @@ -0,0 +1,169 @@ +package backblaze + +import ( + "context" + "errors" + "fmt" + "io" + "io/fs" + "log" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/kurin/blazer/b2" +) + +const writers = 4 + +func (b *BackBlaze) Sync(ctx context.Context) error { + if b.bucketName == "" && (b.filePath == "" || b.dir == "") { + return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) + } + + if b.filePath != "" && b.dir != "" { + return errors.New("you must select just 1 option, dir or file") + } + + bc, err := b.b2Client.Bucket(ctx, b.bucketName) + if err != nil { + return fmt.Errorf("b2Client.Bucket %w", err) + } + + if bc == nil { + return fmt.Errorf("bucket doesn't exist %s", b.bucketName) + } + + log.Println("bucket found:", bc.Name()) + if b.filePath != "" { + log.Println("file:", b.filePath) + + if err := b.copyFile(ctx, bc, b.filePath); err != nil { + return fmt.Errorf("copyFile %w", err) + } + return nil + } + + if b.dir != "" { + oldFiles, err := b.bucketFiles(ctx, bc) + if err != nil { + return fmt.Errorf("bucketFiles %w", err) + } + b.logger.Debug(strings.Repeat("*", 40)) + b.logger.Debug("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t")) + b.logger.Debug(strings.Repeat("*", 40)) + + fileChan := make(chan string) + + var wg sync.WaitGroup + + for i := 0; i < b.maxWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for src := range fileChan { + if err := b.copyFile(ctx, bc, src); err != nil { + b.logger.Errorf("error copying file %s: %v\n", src, err) + continue + } + } + }() + } + + // Walk the directory and send files to the channel for uploading + err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() { + fileChan <- path + } + return nil + }) + if err != nil { + return fmt.Errorf("error walking the directory: %v", err) + } + + // Close the channel (no more files to send) + close(fileChan) + wg.Wait() + + // Cleanup old files after backup is completed + if err := b.cleanBucket(ctx, bc, oldFiles); err != nil { + return fmt.Errorf("cleanBucket %w", err) + } + } + + b.logger.Info("copied successfully") + return nil +} + +func (b *BackBlaze) copyFile(ctx context.Context, bucket *b2.Bucket, src string) error { + f, err := os.Open(src) + if err != nil { + return err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return err + } + w := bucket.Object(fi.Name()).NewWriter(ctx) + w.ConcurrentUploads = writers + w.UseFileBuffer = true + b.logger.Info("start copying", fi.Name()) + if _, err := io.Copy(w, f); err != nil { + w.Close() + return err + } + if err := w.Close(); err != nil { + return err + } + + b.logger.Info("end copying", fi.Name()) + return nil +} + +func (b *BackBlaze) cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error { + var errorBuilder strings.Builder + for _, v := range files { + obj := bucket.Object(v) + if obj == nil { + b.logger.Error("bucket.Object is nil", v) + continue + } + if err := obj.Delete(ctx); err != nil { + errorBuilder.WriteString(fmt.Errorf("error deleting %s : %w", v, err).Error()) + errorBuilder.WriteString("; ") + } + } + + if errorBuilder.Len() > 0 { + return errors.New(errorBuilder.String()) + } + + return nil +} + +func (b *BackBlaze) bucketFiles(ctx context.Context, bucket *b2.Bucket) ([]string, error) { + bucketIter := bucket.List(ctx) + if bucketIter == nil { + return nil, fmt.Errorf("bucket list cannot be nil") + } + var files []string + for { + if !bucketIter.Next() { + if bucketIter.Err() != nil { + return nil, fmt.Errorf("bucketIter err %w", bucketIter.Err()) + } + break + } + if bucketIter.Object() == nil { + b.logger.Error("bucketIter Object is nil") + continue + } + files = append(files, bucketIter.Object().Name()) + } + return files, nil +} diff --git a/internal/services/email/email.go b/internal/services/email/email.go new file mode 100644 index 0000000..d88c962 --- /dev/null +++ b/internal/services/email/email.go @@ -0,0 +1,183 @@ +package email + +import ( + "bytes" + "crypto/tls" + "encoding/base64" + "fmt" + "io" + "net/smtp" + "strings" +) + +const ( + mime = "MIME-version: 1.0;\nContent-Type: text/html; charset=\"UTF-8\";\n\n" + subjectReport = "B2 vs Local Files" + templateSuccess = "success.html" + templateConfirm = "confirm.html" + templateForgotPassword = "forgot_password.html" + templateRegister = "register.html" + delimeter = "**=myohmy689407924327" +) + +type EmailService struct { + auth smtp.Auth + host string + port string + from string + tlsconfig *tls.Config +} + +type EmailWithAttachments struct { + To string + Bucket string + BackupDir string + CountLocal int + CountCloud int + Attachments []EmailAttachment +} + +type EmailAttachment struct { + File io.Reader + Title string +} + +func (e EmailAttachment) ReadContent() ([]byte, error) { + bts, err := io.ReadAll(e.File) + if err != nil { + return nil, fmt.Errorf("error loading attachment: %s", err) + } + return bts, nil +} + +type MailServiceConfig struct { + Auth smtp.Auth + Host string + Port string + From string // Sender email address +} + +func NewMailService(config MailServiceConfig) *EmailService { + return &EmailService{ + auth: config.Auth, + host: config.Host, + port: config.Port, + from: config.From, + tlsconfig: &tls.Config{ + InsecureSkipVerify: true, + ServerName: config.Host, + }, + } +} + +func (e *EmailService) SendOK(emailData EmailWithAttachments) error { + template := strings.Replace(htmlTemplate, "{{bucket}}", emailData.Bucket, -1) + template = strings.Replace(template, "{{local_backup_path}}", emailData.BackupDir, -1) + template = strings.Replace(template, "{{count_ErrCloudNotInLocal}}", fmt.Sprint(emailData.CountCloud), -1) + template = strings.Replace(template, "{{count_ErrLocalNotInCloud}}", fmt.Sprint(emailData.CountLocal), -1) + + msg, err := newMessage(e.from, emailData.To, subjectReport). + withAttachments(template, emailData.Attachments) + + if err != nil { + return fmt.Errorf("error while upload attachments %w", err) + } + + return e.send(emailData.To, msg) +} + +func (e *EmailService) send(to string, msg []byte) error { + c, err := smtp.Dial(e.host + ":" + e.port) + if err != nil { + return fmt.Errorf("DIAL: %s", err) + } + + if err = c.StartTLS(e.tlsconfig); err != nil { + return fmt.Errorf("c.StartTLS: %s", err) + } + + // Auth + if err = c.Auth(e.auth); err != nil { + return fmt.Errorf("c.Auth: %s", err) + } + + // To && From + if err = c.Mail(e.from); err != nil { + return fmt.Errorf("c.Mail: %s", err) + } + + if err = c.Rcpt(to); err != nil { + return fmt.Errorf("c.Rcpt: %s", err) + } + + // Data + w, err := c.Data() + if err != nil { + return fmt.Errorf("c.Data: %s", err) + } + + written, err := w.Write(msg) + if err != nil { + return fmt.Errorf("w.Write: %s", err) + } + + if written <= 0 { + return fmt.Errorf("%d bytes written", written) + } + + if err = w.Close(); err != nil { + return fmt.Errorf("w.Close: %s", err) + } + + if err = c.Quit(); err != nil { + return fmt.Errorf("w.Quit: %s", err) + } + return nil +} + +type message struct { + from string + to string + subject string +} + +func newMessage(from, to, subject string) message { + return message{from: from, to: to, subject: subject} +} + +func (m message) withAttachments(body string, attachments []EmailAttachment) ([]byte, error) { + headers := make(map[string]string) + headers["From"] = m.from + headers["To"] = m.to + headers["Subject"] = m.subject + headers["MIME-Version"] = "1.0" + + var message bytes.Buffer + + for k, v := range headers { + message.WriteString(k) + message.WriteString(": ") + message.WriteString(v) + message.WriteString("\r\n") + } + + message.WriteString("Content-Type: " + fmt.Sprintf("multipart/mixed; boundary=\"%s\"\r\n", delimeter)) + message.WriteString("--" + delimeter + "\r\n") + message.WriteString("Content-Type: text/html; charset=\"UTF-8\"\r\n\r\n") + message.WriteString(body + "\r\n\r\n") + + for _, attachment := range attachments { + attachmentRawFile, err := attachment.ReadContent() + if err != nil { + return nil, err + } + message.WriteString("--" + delimeter + "\r\n") + message.WriteString("Content-Disposition: attachment; filename=\"" + attachment.Title + "\"\r\n") + message.WriteString("Content-Type: application/octet-stream\r\n") + message.WriteString("Content-Transfer-Encoding: base64\r\n\r\n") + message.WriteString(base64.StdEncoding.EncodeToString(attachmentRawFile) + "\r\n") + } + + message.WriteString("--" + delimeter + "--") // End the message + return message.Bytes(), nil +} diff --git a/internal/services/email/email_test.go b/internal/services/email/email_test.go new file mode 100644 index 0000000..2e49f46 --- /dev/null +++ b/internal/services/email/email_test.go @@ -0,0 +1,54 @@ +package email + +import ( + "net/smtp" + "os" + "testing" + + "gitea.urkob.com/urko/backblaze-backup/kit" + "gitea.urkob.com/urko/backblaze-backup/kit/config" + "github.com/stretchr/testify/require" +) + +func Test_mailService_SendOK(t *testing.T) { + cfg := config.NewConfig(kit.RootDir() + "/.env.test") + + mailSrv := NewMailService(MailServiceConfig{ + Auth: smtp.PlainAuth("", cfg.MailUser, cfg.MailPassword, cfg.MailHost), + Host: cfg.MailHost, + Port: cfg.MailPort, + From: cfg.MailUser, + }, + ) + reader, err := os.Open("testdata/attachment1.txt") + require.NoError(t, err) + defer reader.Close() + + reader2, err := os.Open("testdata/attachment2.txt") + require.NoError(t, err) + defer reader2.Close() + + reader3, err := os.Open("testdata/attachment3.txt") + require.NoError(t, err) + defer reader3.Close() + + data := EmailWithAttachments{ + To: cfg.MailTo, + Attachments: []EmailAttachment{ + { + Title: "attachment1.txt", + File: reader, + }, + { + Title: "attachment2.txt", + File: reader2, + }, + { + Title: "attachment3.txt", + File: reader3, + }, + }, + } + err = mailSrv.SendOK(data) + require.NoError(t, err) +} diff --git a/internal/services/email/template.go b/internal/services/email/template.go new file mode 100644 index 0000000..6c90f1e --- /dev/null +++ b/internal/services/email/template.go @@ -0,0 +1,23 @@ +package email + +const htmlTemplate = ` + +
+Local Backup Path: {{local_backup_path}}
+Bucket Name: {{bucket}}
+Count of ErrCloudNotInLocal: {{count_ErrCloudNotInLocal}}
+Count of ErrLocalNotInCloud: {{count_ErrLocalNotInCloud}}
+This is an automated report, please do not reply to this email.
+