refactor: whole application

- add check method
- add log
This commit is contained in:
Urko 2023-08-27 21:30:19 +02:00
parent 5bbeb8c392
commit 61984470da
21 changed files with 930 additions and 415 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
.env
.env.*
.vscode
coverage
.notes

134
cmd/check.go Normal file
View File

@ -0,0 +1,134 @@
package cmd
import (
"bytes"
"context"
"errors"
"fmt"
"log"
"net/smtp"
"os"
"time"
"gitea.urkob.com/urko/backblaze-backup/internal/services/backblaze"
"gitea.urkob.com/urko/backblaze-backup/internal/services/email"
"gitea.urkob.com/urko/backblaze-backup/kit"
"gitea.urkob.com/urko/backblaze-backup/kit/config"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
var Check = &cobra.Command{
Use: "check",
Short: "Compares local backup files with those in a Backblaze B2 bucket and sends a summary email.",
Long: `This command compares the list of files in a local backup directory against the files in a specified Backblaze B2 bucket.
The operation is performed concurrently and is time-bound, set by default to 5 minutes.
If discrepancies are found, i.e., some files exist only locally or only on the cloud, these are logged and sent via email as attachments.
The email contains two text attachments:
- 'Local-Files-Not-In-B2.txt': Lists files that are in the local backup directory but not in the B2 bucket.
- 'B2-Files-Not-In-Local.txt': Lists files that are in the B2 bucket but not in the local backup directory.
The command requires two flags:
- '--dir': The path of the local backup directory
- '--bucket': The name of the Backblaze B2 bucket
An environment variable 'BACKBLAZE_ENV' can be set to 'dev' to load environment variables from a .env file in the root directory.`,
Run: func(cmd *cobra.Command, args []string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) // or any appropriate time
defer cancel()
log.SetFlags(log.Ldate | log.Lmicroseconds)
backupDir, err := cmd.Flags().GetString("dir")
if err != nil {
panic(fmt.Errorf("dir %w", err))
}
bucketName, err := cmd.Flags().GetString("bucket")
if err != nil {
panic(fmt.Errorf("bucket %w", err))
}
envFile := ""
if os.Getenv("BACKBLAZE_ENV") == "dev" {
envFile = kit.RootDir() + "/.env"
}
cfg := config.NewConfig(envFile)
mailSrv := email.NewMailService(email.MailServiceConfig{
Auth: smtp.PlainAuth("", cfg.MailUser, cfg.MailPassword, cfg.MailHost),
Host: cfg.MailHost,
Port: cfg.MailPort,
From: cfg.MailUser,
},
)
logger := logrus.New()
logger.SetLevel(logrus.Level(cfg.LogLevel))
b, err := backblaze.NewBackBlaze(ctx, logger, cfg.BbId, cfg.BbKey)
if err != nil {
panic(fmt.Errorf("NewBackBlaze %w", err))
}
logger.Info("start check")
defer logger.Info("end check")
errs := make(chan backblaze.B2LocalErr)
go b.CompareConcurrent(ctx, backupDir, bucketName, errs)
// Create two buffers for two kinds of errors
cloudNotInLocalBuffer := new(bytes.Buffer)
cloudNotInLocalBuffer.WriteString(fmt.Sprintf("List of B2 files within %s bucket not found in local path %s \n", bucketName, backupDir))
countNotInLocalBuffer := 0
localNotInCloudBuffer := new(bytes.Buffer)
localNotInCloudBuffer.WriteString(fmt.Sprintf("List of local files in %s not found in B2 bucket %s \n", backupDir, bucketName))
countNotInCloudBuffer := 0
loop:
for {
select {
case <-ctx.Done():
// Handle the timeout or cancellation
// Release any resources here
if ctx.Err() == context.DeadlineExceeded {
logger.Error("Operation timed out")
} else if ctx.Err() == context.Canceled {
logger.Error("Operation canceled")
}
break loop
case err := <-errs:
if errors.Is(err.Err, backblaze.ErrCloudNotInLocal) {
logger.Debug(err.File + ": B2 file not found in local")
cloudNotInLocalBuffer.WriteString(err.File + "\n")
countNotInLocalBuffer++
}
if errors.Is(err.Err, backblaze.ErrLocalNotInCloud) {
logger.Debug(err.File + ": local file not found in B2")
localNotInCloudBuffer.WriteString(err.File + "\n")
countNotInCloudBuffer++
}
if err.Err == nil {
break loop
}
}
}
var attachments []email.EmailAttachment
if countNotInCloudBuffer > 0 {
attachments = append(attachments, email.EmailAttachment{File: cloudNotInLocalBuffer, Title: "B2-Files-Not-In-Local.txt"})
}
if countNotInLocalBuffer > 0 {
attachments = append(attachments, email.EmailAttachment{File: localNotInCloudBuffer, Title: "Local-Files-Not-In-B2.txt"})
}
if err := mailSrv.SendOK(email.EmailWithAttachments{
To: cfg.MailTo,
Bucket: bucketName,
BackupDir: backupDir,
CountLocal: countNotInLocalBuffer,
CountCloud: countNotInCloudBuffer,
Attachments: attachments,
}); err != nil {
panic(fmt.Errorf("error while send email: %w", err))
}
},
}

View File

@ -2,11 +2,14 @@ package cmd
import (
"context"
"fmt"
"log"
"os"
"gitea.urkob.com/urko/backblaze-backup/internal/services"
"gitea.urkob.com/urko/backblaze-backup/internal/services/backblaze"
"gitea.urkob.com/urko/backblaze-backup/kit"
"gitea.urkob.com/urko/backblaze-backup/kit/config"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -20,18 +23,23 @@ var Cleanup = &cobra.Command{
bucketName, err := cmd.Flags().GetString("bucket")
if err != nil {
log.Fatalln("bucket %w", err)
panic(fmt.Errorf("bucket %w", err))
}
envFile := ""
if os.Getenv("BACKBLAZE_ENV") == "dev" {
envFile = ".env"
envFile = kit.RootDir() + "/.env"
}
cfg := config.NewConfig(envFile)
logger := logrus.New()
logger.SetLevel(logrus.Level(cfg.LogLevel))
bbService := services.NewBackBlaze(cfg.BbId, cfg.BbKey)
bbService, err := backblaze.NewBackBlaze(ctx, logger, cfg.BbId, cfg.BbKey)
if err != nil {
panic(fmt.Errorf("NewBackBlaze %w", err))
}
if err := bbService.CleanUp(ctx, cancel, bucketName); err != nil {
log.Fatalln("bbService.CleanUp()", err)
panic(fmt.Errorf("bbService.CleanUp %w", err))
}
},
}

View File

@ -2,13 +2,16 @@ package cmd
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"gitea.urkob.com/urko/backblaze-backup/internal/services"
"gitea.urkob.com/urko/backblaze-backup/internal/services/backblaze"
"gitea.urkob.com/urko/backblaze-backup/kit"
"gitea.urkob.com/urko/backblaze-backup/kit/config"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -22,13 +25,18 @@ var Versions = &cobra.Command{
envFile := ""
if os.Getenv("BACKBLAZE_ENV") == "dev" {
envFile = ".env"
envFile = kit.RootDir() + "/.env"
}
cfg := config.NewConfig(envFile)
logger := logrus.New()
logger.SetLevel(logrus.Level(cfg.LogLevel))
bbService := services.NewBackBlaze(cfg.BbId, cfg.BbKey)
bbService, err := backblaze.NewBackBlaze(ctx, logger, cfg.BbId, cfg.BbKey)
if err != nil {
panic(fmt.Errorf("NewBackBlaze %w", err))
}
if err := bbService.ListDuplicateVersions(ctx, cancel); err != nil {
log.Fatalln("bbService.ListDuplicateVersions()", err)
panic(fmt.Errorf("bbService.ListDuplicateVersions %w", err))
}
},
}

View File

@ -2,11 +2,14 @@ package cmd
import (
"context"
"fmt"
"log"
"os"
"gitea.urkob.com/urko/backblaze-backup/internal/services"
"gitea.urkob.com/urko/backblaze-backup/internal/services/backblaze"
"gitea.urkob.com/urko/backblaze-backup/kit"
"gitea.urkob.com/urko/backblaze-backup/kit/config"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -21,26 +24,34 @@ var Sync = &cobra.Command{
log.SetFlags(log.Ldate | log.Lmicroseconds)
filePath, err := cmd.Flags().GetString("file")
if err != nil {
log.Fatalln("file %w", err)
panic(fmt.Errorf("file %w", err))
}
dir, err := cmd.Flags().GetString("dir")
if err != nil {
log.Fatalln("dir %w", err)
panic(fmt.Errorf("dir %w", err))
}
bucketName, err := cmd.Flags().GetString("bucket")
if err != nil {
log.Fatalln("bucket %w", err)
panic(fmt.Errorf("bucket %w", err))
}
envFile := ""
if os.Getenv("BACKBLAZE_ENV") == "dev" {
envFile = ".env"
envFile = kit.RootDir() + "/.env"
}
cfg := config.NewConfig(envFile)
bbService := services.NewBackBlaze(cfg.BbId, cfg.BbKey).WithBucket(bucketName).WithDir(dir).WithFile(filePath)
cfg := config.NewConfig(envFile)
logger := logrus.New()
logger.SetLevel(logrus.Level(cfg.LogLevel))
bbService, err := backblaze.NewBackBlaze(ctx, logger, cfg.BbId, cfg.BbKey)
if err != nil {
panic(fmt.Errorf("NewBackBlaze %w", err))
}
bbService = bbService.WithBucket(bucketName).WithDir(dir).WithFile(filePath)
if err := bbService.Sync(ctx); err != nil {
log.Fatalln("bbService.Sync()", err)
panic(fmt.Errorf("bbService.Sync %w", err))
}
},
}

16
go.mod
View File

@ -9,18 +9,22 @@ require (
github.com/kurin/blazer v0.5.3
github.com/rclone/rclone v1.63.1
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.3
golang.org/x/sync v0.3.0
)
require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/Max-Sum/base32768 v0.0.0-20230304063302-18e6ce5945fd // indirect
github.com/abbot/go-http-auth v0.4.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-chi/chi/v5 v5.0.8 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jzelinskie/whirlpool v0.0.0-20201016144138-0675e54bb004 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
@ -28,6 +32,7 @@ require (
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
@ -41,11 +46,14 @@ require (
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/api v0.126.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

38
go.sum
View File

@ -12,6 +12,7 @@ cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bP
cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKVk=
cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs=
cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc=
cloud.google.com/go v0.65.0 h1:Dg9iHVQfrhq82rUNu9ZxUDrJLaxFUe/HlCVaLyRruq8=
cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
@ -19,8 +20,9 @@ cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvf
cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
cloud.google.com/go/compute v1.19.0 h1:+9zda3WGgW1ZSTlVppLCYFIr48Pa35q1uG2N1itbCEQ=
cloud.google.com/go/compute v1.19.3 h1:DcTwsFgGev/wV5+q8o2fzgcHOaac+DKGC91ZlvpsQds=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
@ -161,11 +163,13 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/googleapis/enterprise-certificate-proxy v0.2.3 h1:yk9/cqRKtT9wXZSsRH9aurXEpJX+U6FLtpYTdC3R06k=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.8.0 h1:UBtEZqx1bjXtOQ5BVTkuYghXrr3N4V123VKJK67vJZc=
github.com/googleapis/gax-go/v2 v2.11.0 h1:9V9PWXEsWnPpQhu/PeQIkS4eGzMlTLGgt80cUUI8Ki4=
github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5iydzRfb3peWZJI=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
@ -208,8 +212,10 @@ github.com/koofr/go-httpclient v0.0.0-20230225102643-5d51a2e9dea6 h1:uF5FHZ/L5gv
github.com/koofr/go-koofrclient v0.0.0-20221207135200-cbd7fc9ad6a6 h1:FHVoZMOVRA+6/y4yRlbiR3WvsrOcKBd/f64H7YiWR2U=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kurin/blazer v0.5.3 h1:SAgYv0TKU0kN/ETfO5ExjNAPyMt2FocO2s/UlCHfjAk=
github.com/kurin/blazer v0.5.3/go.mod h1:4FCXMUWo9DllR2Do4TtBd377ezyAJ51vB5uTBjt0pGU=
@ -339,8 +345,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -402,8 +408,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -411,7 +417,8 @@ golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4Iltr
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw=
golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8=
golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -470,8 +477,8 @@ golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -479,8 +486,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@ -546,7 +553,8 @@ google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M
google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE=
google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM=
google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc=
google.golang.org/api v0.115.0 h1:6FFkVvStt4YqXSx3azKyzj7fXerGnVlLJ/eud01nBDE=
google.golang.org/api v0.126.0 h1:q4GJq+cAdMAC7XP7njvQ4tvohGLiSlytuL4BQxbIZ+o=
google.golang.org/api v0.126.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@ -583,7 +591,8 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08=
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc h1:8DyZCyvI8mE1IdLy/60bS+52xfymkE72wv1asokgtao=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@ -596,7 +605,7 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@ -615,6 +624,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -1,364 +0,0 @@
package services
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"log"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
rcloneb2 "github.com/rclone/rclone/backend/b2"
"github.com/kurin/blazer/b2"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/operations"
"golang.org/x/sync/semaphore"
)
const writers = 4
const maxConcurrentWeight = 5
const largeFileSize = 500 * 1024 * 1024 // 500 MB
type BackBalze struct {
bucketName string
dir string
filePath string
maxWorkers int
bbID string
bbKey string
}
func NewBackBlaze(bbID, bbKey string) *BackBalze {
log.Println("runtime.NumCPU()", runtime.NumCPU())
return &BackBalze{
bbID: bbID,
bbKey: bbKey,
maxWorkers: runtime.NumCPU(),
}
}
func (b *BackBalze) WithBucket(bucketName string) *BackBalze {
b.bucketName = bucketName
return b
}
func (b *BackBalze) WithDir(dir string) *BackBalze {
b.dir = dir
return b
}
func (b *BackBalze) WithFile(filePath string) *BackBalze {
b.filePath = filePath
return b
}
func (b *BackBalze) Sync(ctx context.Context) error {
if b.bucketName == "" && (b.filePath == "" || b.dir == "") {
return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir)
}
if b.filePath != "" && b.dir != "" {
return errors.New("you must select just 1 option, dir or file")
}
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
if err != nil {
return fmt.Errorf("b2.NewClient %w", err)
}
log.Println("b2Client ok")
bc, err := b2Client.Bucket(ctx, b.bucketName)
if err != nil {
return fmt.Errorf("b2Client.Bucket %w", err)
}
if bc == nil {
return fmt.Errorf("bucket doesn't exist %s", b.bucketName)
}
log.Println("bucket found:", bc.Name())
if b.filePath != "" {
log.Println("file:", b.filePath)
if err := copyFile(ctx, bc, b.filePath); err != nil {
return fmt.Errorf("copyFile %w", err)
}
return nil
}
if b.dir != "" {
oldFiles, err := bucketFiles(ctx, bc)
if err != nil {
return fmt.Errorf("bucketFiles %w", err)
}
log.Println(strings.Repeat("*", 40))
log.Println("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t"))
log.Println(strings.Repeat("*", 40))
fileChan := make(chan string)
var wg sync.WaitGroup
for i := 0; i < b.maxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for src := range fileChan {
err := copyFile(ctx, bc, src)
if err != nil {
log.Printf("error copying file %s: %v\n", src, err)
continue
}
}
}()
}
// Walk the directory and send files to the channel for uploading
err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
fileChan <- path
}
return nil
})
if err != nil {
return fmt.Errorf("error walking the directory: %v", err)
}
// Close the channel (no more files to send)
close(fileChan)
wg.Wait()
// Cleanup old files after backup is completed
if err := cleanBucket(ctx, bc, oldFiles); err != nil {
return fmt.Errorf("cleanBucket %w", err)
}
}
log.Println("copied successfully")
return nil
}
func (b *BackBalze) OldSync() error {
if b.bucketName == "" && (b.filePath == "" || b.dir == "") {
return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir)
}
if b.filePath != "" && b.dir != "" {
return errors.New("you must select just 1 option, dir or file")
}
ctx := context.Background()
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
if err != nil {
return fmt.Errorf("b2.NewClient %w", err)
}
log.Println("b2Client ok")
bc, err := b2Client.Bucket(ctx, b.bucketName)
if err != nil {
return fmt.Errorf("b2Client.Bucket %w", err)
}
if bc == nil {
return fmt.Errorf("bucket doesn't exist %s", b.bucketName)
}
log.Println("bucket found:", bc.Name())
if b.filePath != "" {
log.Println("file:", b.filePath)
if err := copyFile(ctx, bc, b.filePath); err != nil {
return fmt.Errorf("copyFile %w", err)
}
return nil
}
if b.dir != "" {
oldFiles, err := bucketFiles(ctx, bc)
if err != nil {
return fmt.Errorf("bucketFiles %w", err)
}
log.Println(strings.Repeat("*", 40))
log.Println("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t"))
log.Println(strings.Repeat("*", 40))
fileChan := make(chan string)
uploadSem := semaphore.NewWeighted(maxConcurrentWeight)
var wg sync.WaitGroup
for i := 0; i < b.maxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for src := range fileChan {
info, err := os.Stat(src)
if err != nil {
log.Printf("error getting file info %s: %v\n", src, err)
continue
}
weight := int64(1)
if info.Size() > largeFileSize {
weight = 2
}
if err := uploadSem.Acquire(ctx, weight); err == nil {
log.Println("start copying file", src)
if err := copyFile(ctx, bc, src); err != nil {
log.Printf("error copying file %s: %v\n", src, err)
}
uploadSem.Release(weight)
} else {
log.Printf("error acquiring semaphore: %v\n", err)
}
}
}()
}
// Walk the directory and send files to the channel for uploading
err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
fileChan <- path
}
return nil
})
if err != nil {
return fmt.Errorf("error walking the directory: %v", err)
}
// Close the channel (no more files to send)
close(fileChan)
wg.Wait()
// Cleanup old files after backup is completed
if err := cleanBucket(ctx, bc, oldFiles); err != nil {
return fmt.Errorf("cleanBucket %w", err)
}
}
log.Println("copied successfully")
return nil
}
func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error {
f, err := os.Open(src)
if err != nil {
return err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return err
}
w := bucket.Object(fi.Name()).NewWriter(ctx)
w.ConcurrentUploads = writers
w.UseFileBuffer = true
log.Println("start copying", fi.Name())
if _, err := io.Copy(w, f); err != nil {
w.Close()
return err
}
log.Println("end copying", fi.Name())
return w.Close()
}
func cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error {
var errorBuilder strings.Builder
for _, v := range files {
obj := bucket.Object(v)
if obj == nil {
log.Println("object is nil", v)
continue
}
if err := obj.Delete(ctx); err != nil {
errorBuilder.WriteString(fmt.Errorf("error deleting %s : %w", v, err).Error())
errorBuilder.WriteString("; ")
}
}
if errorBuilder.Len() > 0 {
return errors.New(errorBuilder.String())
}
return nil
}
func bucketFiles(ctx context.Context, bucket *b2.Bucket) ([]string, error) {
bucketIter := bucket.List(ctx)
if bucketIter == nil {
return nil, fmt.Errorf("bucket list cannot be nil")
}
var files []string
for {
if !bucketIter.Next() {
if bucketIter.Err() != nil {
return nil, fmt.Errorf("bucketIter err %w", bucketIter.Err())
}
break
}
if bucketIter.Object() == nil {
log.Println("bucketIter Object is nil")
continue
}
files = append(files, bucketIter.Object().Name())
}
return files, nil
}
type duplicate struct {
bucket string
file string
count int
}
func (b *BackBalze) CleanUp(ctx context.Context, cancel context.CancelFunc, bucketName string) error {
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
if err != nil {
return fmt.Errorf("b2.NewClient %w", err)
}
log.Println("b2Client ok")
var dups []duplicate
if bucketName != "" {
dups, err = b.listDuplicatesFromBucket(ctx, cancel, b2Client, bucketName)
if err != nil {
return fmt.Errorf("b.listDuplicatesFromBucket: %w", err)
}
} else {
dups, err = b.listDuplicates(ctx, cancel, b2Client)
if err != nil {
return fmt.Errorf("b.listDuplicates: %w", err)
}
}
if len(dups) <= 0 {
return nil
}
smpl := configmap.Simple{}
smpl.Set("account", b.bbID)
smpl.Set("key", b.bbKey)
smpl.Set("chunk_size", strconv.FormatInt(int64(9600), 10))
log.Println("duplicates", len(dups))
for _, d := range dups {
f, err := rcloneb2.NewFs(ctx, "B2", d.dir(), smpl)
if err != nil {
return fmt.Errorf("rclonefs.NewFs %w", err)
}
if err := operations.CleanUp(ctx, f); err != nil {
return fmt.Errorf("operations.CleanUp %w", err)
}
log.Println(d.dir(), "cleaned up")
}
return nil
}

View File

@ -0,0 +1,50 @@
package backblaze
import (
"context"
"fmt"
"runtime"
"github.com/kurin/blazer/b2"
"github.com/sirupsen/logrus"
)
type BackBlaze struct {
logger *logrus.Logger
bucketName string
bbID string
bbKey string
dir string
filePath string
maxWorkers int
b2Client *b2.Client
}
// NewBackBlaze initializes a new BackBlaze struct with given BackBlaze ID and Key.
func NewBackBlaze(ctx context.Context, logger *logrus.Logger, bbID, bbKey string) (*BackBlaze, error) {
b2Client, err := b2.NewClient(ctx, bbID, bbKey)
if err != nil {
return nil, fmt.Errorf("b2.NewClient %w", err)
}
return &BackBlaze{
logger: logger,
b2Client: b2Client,
bbID: bbID,
bbKey: bbKey,
maxWorkers: runtime.NumCPU(),
}, nil
}
func (b *BackBlaze) WithBucket(bucketName string) *BackBlaze {
b.bucketName = bucketName
return b
}
func (b *BackBlaze) WithDir(dir string) *BackBlaze {
b.dir = dir
return b
}
func (b *BackBlaze) WithFile(filePath string) *BackBlaze {
b.filePath = filePath
return b
}

View File

@ -0,0 +1,152 @@
package backblaze
import (
"context"
"errors"
"fmt"
"io/fs"
"path/filepath"
"sync"
"github.com/kurin/blazer/b2"
)
// localFiles lists the local files in the given backup directory and sends them to a channel.
// It closes the channel after all files are listed.
func (b *BackBlaze) localFiles(backupDir string, fileChan chan<- string) error {
defer close(fileChan)
// Walk the directory and send files to the channel
err := filepath.WalkDir(backupDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
fileChan <- filepath.Base(path)
}
return nil
})
if err != nil {
return fmt.Errorf("error walking the directory: %v", err)
}
return nil
}
// b2BucketFiles lists the files in the given B2 bucket and sends them to a channel.
// It closes the channel after all files are listed.
func (b *BackBlaze) b2BucketFiles(ctx context.Context, bucketName string, fileChan chan<- string) error {
bucket, err := b.b2Client.Bucket(ctx, bucketName)
defer close(fileChan)
if err != nil {
return fmt.Errorf("b2Client.Bucket %w", err)
}
bucketIter := bucket.List(ctx, b2.ListHidden())
if bucketIter == nil {
return errors.New("bucket list cannot be nil")
}
for {
if !bucketIter.Next() {
if bucketIter.Err() != nil {
return fmt.Errorf("bucketIter err %w", bucketIter.Err())
}
break
}
if bucketIter.Object() == nil {
return errors.New("bucketIter Object is nil")
}
b.logger.Debug("bucket file: ", bucketIter.Object().Name())
fileChan <- bucketIter.Object().Name()
}
return nil
}
var ErrLocalNotInCloud error = errors.New("exists locally but not in the cloud")
var ErrCloudNotInLocal error = errors.New("exists on cloud but not locally")
type B2LocalErr struct {
File string
Err error
}
// CompareConcurrent concurrently fetches the list of local files and cloud files,
// then compares them to ensure all local files exist in the cloud.
// Errors are sent to a provided error channel. The function will panic if an error occurs while listing files.
func (b *BackBlaze) CompareConcurrent(ctx context.Context, backupDir, bucketName string, errors chan<- B2LocalErr) {
var wg sync.WaitGroup
localFiles := make(map[string]int)
cloudFiles := make(map[string]int)
localFileChan := make(chan string)
b2FileChan := make(chan string)
// Local listing
wg.Add(1)
go func() {
defer wg.Done()
wg.Add(1)
go func() {
defer wg.Done()
for f := range localFileChan {
if _, ok := localFiles[f]; ok {
panic(fmt.Errorf("local file already exists in map: %s", f))
}
b.logger.Debug("local file", f)
localFiles[f]++
}
}()
if err := b.localFiles(backupDir, localFileChan); err != nil {
panic(fmt.Errorf("b.LocalFilesWithB2: %w", err))
}
}()
// Cloud listing
wg.Add(1)
go func() {
defer wg.Done()
wg.Add(1)
go func() {
defer wg.Done()
for f := range b2FileChan {
if _, ok := cloudFiles[f]; ok {
panic(fmt.Errorf("cloud file already exists in map: %s", f))
}
b.logger.Debug("B2 file", f)
cloudFiles[f]++
}
}()
if err := b.b2BucketFiles(ctx, bucketName, b2FileChan); err != nil {
panic(fmt.Errorf("b.LocalFilesWithB2: %w", err))
}
}()
// Wait for both to complete
wg.Wait()
// Now check local files that are not presesnt in cloud
wg.Add(2)
go func() {
defer wg.Done()
for localFile := range localFiles {
if _, exists := cloudFiles[localFile]; !exists {
errors <- B2LocalErr{File: localFile, Err: ErrLocalNotInCloud}
}
}
}()
// Now check cloud files that are not in local
go func() {
defer wg.Done()
for cloudFile := range cloudFiles {
if _, exists := localFiles[cloudFile]; !exists {
errors <- B2LocalErr{File: cloudFile, Err: ErrCloudNotInLocal}
}
}
}()
wg.Wait()
errors <- B2LocalErr{Err: nil}
close(errors)
}

View File

@ -0,0 +1,50 @@
package backblaze
import (
"context"
"fmt"
"strconv"
"github.com/rclone/rclone/backend/b2"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/operations"
)
func (b *BackBlaze) CleanUp(ctx context.Context, cancel context.CancelFunc, bucketName string) error {
var dups []duplicate
var err error
if bucketName != "" {
dups, err = b.listDuplicatesFromBucket(ctx, cancel, b.b2Client, bucketName)
if err != nil {
return fmt.Errorf("b.listDuplicatesFromBucket: %w", err)
}
} else {
dups, err = b.listDuplicates(ctx, cancel, b.b2Client)
if err != nil {
return fmt.Errorf("b.listDuplicates: %w", err)
}
}
if len(dups) <= 0 {
return nil
}
smpl := configmap.Simple{}
smpl.Set("account", b.bbID)
smpl.Set("key", b.bbKey)
smpl.Set("chunk_size", strconv.FormatInt(int64(9600), 10))
b.logger.Info("duplicates", len(dups))
for _, d := range dups {
f, err := b2.NewFs(ctx, "B2", d.dir(), smpl)
if err != nil {
return fmt.Errorf("rclonefs.NewFs %w", err)
}
if err := operations.CleanUp(ctx, f); err != nil {
return fmt.Errorf("operations.CleanUp %w", err)
}
b.logger.Info(d.dir(), "cleaned up")
}
return nil
}

View File

@ -1,4 +1,4 @@
package services
package backblaze
import (
"context"
@ -12,6 +12,12 @@ import (
"golang.org/x/sync/semaphore"
)
type duplicate struct {
bucket string
file string
count int
}
func (d duplicate) dir() string {
if !strings.Contains(d.file, "/") {
return d.bucket
@ -20,7 +26,7 @@ func (d duplicate) dir() string {
return strings.Join(splitted[:(len(splitted)-1)], "/")
}
func (b *BackBalze) ListDuplicateVersions(ctx context.Context, cancel context.CancelFunc) error {
func (b *BackBlaze) ListDuplicateVersions(ctx context.Context, cancel context.CancelFunc) error {
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
if err != nil {
return fmt.Errorf("b2.NewClient %w", err)
@ -42,7 +48,7 @@ func (b *BackBalze) ListDuplicateVersions(ctx context.Context, cancel context.Ca
return nil
}
func (b *BackBalze) listDuplicates(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client) ([]duplicate, error) {
func (b *BackBlaze) listDuplicates(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client) ([]duplicate, error) {
buckets, err := b2Client.ListBuckets(ctx)
if err != nil {
return nil, fmt.Errorf("b2Client.Bucket %w", err)
@ -66,20 +72,20 @@ func (b *BackBalze) listDuplicates(ctx context.Context, cancel context.CancelFun
bucketIter := bc.List(ctx, b2.ListHidden())
if bucketIter == nil {
log.Println("bucket list cannot be nil")
b.logger.Error("bucket list cannot be nil")
return
}
for {
if !bucketIter.Next() {
if bucketIter.Err() != nil {
log.Println("bucketIter err %w", bucketIter.Err())
b.logger.Error("bucketIter err %w", bucketIter.Err())
return
}
break
}
if bucketIter.Object() == nil {
log.Println("bucketIter Object is nil")
b.logger.Error("bucketIter Object is nil")
continue
}
files[bucketIter.Object().Name()]++
@ -102,7 +108,7 @@ func (b *BackBalze) listDuplicates(ctx context.Context, cancel context.CancelFun
return dups, nil
}
func (b *BackBalze) listDuplicatesFromBucket(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client, bucketName string) ([]duplicate, error) {
func (b *BackBlaze) listDuplicatesFromBucket(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client, bucketName string) ([]duplicate, error) {
bucket, err := b2Client.Bucket(ctx, bucketName)
if err != nil {
return nil, fmt.Errorf("b2Client.Bucket %w", err)

View File

@ -0,0 +1,169 @@
package backblaze
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"log"
"os"
"path/filepath"
"strings"
"sync"
"github.com/kurin/blazer/b2"
)
const writers = 4
func (b *BackBlaze) Sync(ctx context.Context) error {
if b.bucketName == "" && (b.filePath == "" || b.dir == "") {
return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir)
}
if b.filePath != "" && b.dir != "" {
return errors.New("you must select just 1 option, dir or file")
}
bc, err := b.b2Client.Bucket(ctx, b.bucketName)
if err != nil {
return fmt.Errorf("b2Client.Bucket %w", err)
}
if bc == nil {
return fmt.Errorf("bucket doesn't exist %s", b.bucketName)
}
log.Println("bucket found:", bc.Name())
if b.filePath != "" {
log.Println("file:", b.filePath)
if err := b.copyFile(ctx, bc, b.filePath); err != nil {
return fmt.Errorf("copyFile %w", err)
}
return nil
}
if b.dir != "" {
oldFiles, err := b.bucketFiles(ctx, bc)
if err != nil {
return fmt.Errorf("bucketFiles %w", err)
}
b.logger.Debug(strings.Repeat("*", 40))
b.logger.Debug("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t"))
b.logger.Debug(strings.Repeat("*", 40))
fileChan := make(chan string)
var wg sync.WaitGroup
for i := 0; i < b.maxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for src := range fileChan {
if err := b.copyFile(ctx, bc, src); err != nil {
b.logger.Errorf("error copying file %s: %v\n", src, err)
continue
}
}
}()
}
// Walk the directory and send files to the channel for uploading
err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
fileChan <- path
}
return nil
})
if err != nil {
return fmt.Errorf("error walking the directory: %v", err)
}
// Close the channel (no more files to send)
close(fileChan)
wg.Wait()
// Cleanup old files after backup is completed
if err := b.cleanBucket(ctx, bc, oldFiles); err != nil {
return fmt.Errorf("cleanBucket %w", err)
}
}
b.logger.Info("copied successfully")
return nil
}
func (b *BackBlaze) copyFile(ctx context.Context, bucket *b2.Bucket, src string) error {
f, err := os.Open(src)
if err != nil {
return err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return err
}
w := bucket.Object(fi.Name()).NewWriter(ctx)
w.ConcurrentUploads = writers
w.UseFileBuffer = true
b.logger.Info("start copying", fi.Name())
if _, err := io.Copy(w, f); err != nil {
w.Close()
return err
}
if err := w.Close(); err != nil {
return err
}
b.logger.Info("end copying", fi.Name())
return nil
}
func (b *BackBlaze) cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error {
var errorBuilder strings.Builder
for _, v := range files {
obj := bucket.Object(v)
if obj == nil {
b.logger.Error("bucket.Object is nil", v)
continue
}
if err := obj.Delete(ctx); err != nil {
errorBuilder.WriteString(fmt.Errorf("error deleting %s : %w", v, err).Error())
errorBuilder.WriteString("; ")
}
}
if errorBuilder.Len() > 0 {
return errors.New(errorBuilder.String())
}
return nil
}
func (b *BackBlaze) bucketFiles(ctx context.Context, bucket *b2.Bucket) ([]string, error) {
bucketIter := bucket.List(ctx)
if bucketIter == nil {
return nil, fmt.Errorf("bucket list cannot be nil")
}
var files []string
for {
if !bucketIter.Next() {
if bucketIter.Err() != nil {
return nil, fmt.Errorf("bucketIter err %w", bucketIter.Err())
}
break
}
if bucketIter.Object() == nil {
b.logger.Error("bucketIter Object is nil")
continue
}
files = append(files, bucketIter.Object().Name())
}
return files, nil
}

View File

@ -0,0 +1,183 @@
package email
import (
"bytes"
"crypto/tls"
"encoding/base64"
"fmt"
"io"
"net/smtp"
"strings"
)
const (
mime = "MIME-version: 1.0;\nContent-Type: text/html; charset=\"UTF-8\";\n\n"
subjectReport = "B2 vs Local Files"
templateSuccess = "success.html"
templateConfirm = "confirm.html"
templateForgotPassword = "forgot_password.html"
templateRegister = "register.html"
delimeter = "**=myohmy689407924327"
)
type EmailService struct {
auth smtp.Auth
host string
port string
from string
tlsconfig *tls.Config
}
type EmailWithAttachments struct {
To string
Bucket string
BackupDir string
CountLocal int
CountCloud int
Attachments []EmailAttachment
}
type EmailAttachment struct {
File io.Reader
Title string
}
func (e EmailAttachment) ReadContent() ([]byte, error) {
bts, err := io.ReadAll(e.File)
if err != nil {
return nil, fmt.Errorf("error loading attachment: %s", err)
}
return bts, nil
}
type MailServiceConfig struct {
Auth smtp.Auth
Host string
Port string
From string // Sender email address
}
func NewMailService(config MailServiceConfig) *EmailService {
return &EmailService{
auth: config.Auth,
host: config.Host,
port: config.Port,
from: config.From,
tlsconfig: &tls.Config{
InsecureSkipVerify: true,
ServerName: config.Host,
},
}
}
func (e *EmailService) SendOK(emailData EmailWithAttachments) error {
template := strings.Replace(htmlTemplate, "{{bucket}}", emailData.Bucket, -1)
template = strings.Replace(template, "{{local_backup_path}}", emailData.BackupDir, -1)
template = strings.Replace(template, "{{count_ErrCloudNotInLocal}}", fmt.Sprint(emailData.CountCloud), -1)
template = strings.Replace(template, "{{count_ErrLocalNotInCloud}}", fmt.Sprint(emailData.CountLocal), -1)
msg, err := newMessage(e.from, emailData.To, subjectReport).
withAttachments(template, emailData.Attachments)
if err != nil {
return fmt.</