From cd88fd26685c5b90e772be2383b64435ebd301af Mon Sep 17 00:00:00 2001 From: "Urko." Date: Sun, 1 Oct 2023 14:35:50 +0200 Subject: [PATCH] refactor: paremeterize max workers --- cmd/sync.go | 19 +++++++++- internal/services/backblaze/backblaze.go | 43 ++++++++++------------- internal/services/backblaze/check.go | 2 +- internal/services/backblaze/duplicates.go | 5 +-- internal/services/backblaze/sync.go | 29 +++++++-------- main.go | 3 ++ 6 files changed, 55 insertions(+), 46 deletions(-) diff --git a/cmd/sync.go b/cmd/sync.go index 4f5bb07..2272ca1 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -35,6 +35,16 @@ var Sync = &cobra.Command{ panic(fmt.Errorf("bucket %w", err)) } + workers, err := cmd.Flags().GetInt("workers") + if err != nil { + panic(fmt.Errorf("workers %w", err)) + } + + concurrentUploads, err := cmd.Flags().GetInt("concurrent-uploads") + if err != nil { + panic(fmt.Errorf("workers %w", err)) + } + envFile := "" if os.Getenv("BACKBLAZE_ENV") == "dev" { envFile = kit.RootDir() + "/.env" @@ -49,7 +59,14 @@ var Sync = &cobra.Command{ panic(fmt.Errorf("NewBackBlaze %w", err)) } - bbService = bbService.WithBucket(bucketName).WithDir(dir).WithFile(filePath) + bbService = bbService.WithOptions(backblaze.BackBlazeOptions{ + Bucket: bucketName, + Dir: dir, + FilePath: filePath, + MaxWorkers: workers, + ConcurrentUploads: concurrentUploads, + }) + if err := bbService.Sync(ctx); err != nil { panic(fmt.Errorf("bbService.Sync %w", err)) } diff --git a/internal/services/backblaze/backblaze.go b/internal/services/backblaze/backblaze.go index 808837a..9354a98 100644 --- a/internal/services/backblaze/backblaze.go +++ b/internal/services/backblaze/backblaze.go @@ -3,21 +3,17 @@ package backblaze import ( "context" "fmt" - "runtime" "github.com/kurin/blazer/b2" "github.com/sirupsen/logrus" ) type BackBlaze struct { - logger *logrus.Logger - bucketName string - bbID string - bbKey string - dir string - filePath string - maxWorkers int - b2Client *b2.Client + logger *logrus.Logger + b2Client *b2.Client + bbID string + bbKey string + options BackBlazeOptions } // NewBackBlaze initializes a new BackBlaze struct with given BackBlaze ID and Key. @@ -28,23 +24,22 @@ func NewBackBlaze(ctx context.Context, logger *logrus.Logger, bbID, bbKey string } return &BackBlaze{ - logger: logger, - b2Client: b2Client, - bbID: bbID, - bbKey: bbKey, - maxWorkers: runtime.NumCPU(), + logger: logger, + b2Client: b2Client, + bbID: bbID, + bbKey: bbKey, }, nil } -func (b *BackBlaze) WithBucket(bucketName string) *BackBlaze { - b.bucketName = bucketName - return b -} -func (b *BackBlaze) WithDir(dir string) *BackBlaze { - b.dir = dir - return b -} -func (b *BackBlaze) WithFile(filePath string) *BackBlaze { - b.filePath = filePath +type BackBlazeOptions struct { + Bucket string + Dir string + FilePath string + MaxWorkers int + ConcurrentUploads int +} + +func (b *BackBlaze) WithOptions(options BackBlazeOptions) *BackBlaze { + b.options = options return b } diff --git a/internal/services/backblaze/check.go b/internal/services/backblaze/check.go index 61ed3df..6106818 100644 --- a/internal/services/backblaze/check.go +++ b/internal/services/backblaze/check.go @@ -136,7 +136,7 @@ func (b *BackBlaze) CompareConcurrent( defer wg.Done() for f := range b2FileChan { if _, ok := cloudFiles[f.Path]; ok { - panic(fmt.Errorf(`cloud file already exists in map: %s\n you should run 'backblazebackup cleanup --bucket "%s"'`, f.Path, b.bucketName)) + panic(fmt.Errorf(`cloud file already exists in map: %s\n you should run 'backblazebackup cleanup --bucket "%s"'`, f.Path, b.options.Bucket)) } b.logger.Debugf("B2 file %+v\n", f) cloudFiles[f.Path] = f diff --git a/internal/services/backblaze/duplicates.go b/internal/services/backblaze/duplicates.go index 8769994..188cd05 100644 --- a/internal/services/backblaze/duplicates.go +++ b/internal/services/backblaze/duplicates.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "log" "strings" "sync" @@ -31,7 +30,6 @@ func (b *BackBlaze) ListDuplicateVersions(ctx context.Context, cancel context.Ca if err != nil { return fmt.Errorf("b2.NewClient %w", err) } - log.Println("b2Client ok") dups, err := b.listDuplicates(ctx, cancel, b2Client) if err != nil { @@ -57,8 +55,7 @@ func (b *BackBlaze) listDuplicates(ctx context.Context, cancel context.CancelFun wg := sync.WaitGroup{} dups := make([]duplicate, 0) - log.Println("len(buckets)", len(buckets)) - sm := semaphore.NewWeighted(int64(b.maxWorkers)) + sm := semaphore.NewWeighted(int64(b.options.MaxWorkers)) wg.Add(len(buckets)) for _, bc := range buckets { if err := sm.Acquire(ctx, 1); err != nil { diff --git a/internal/services/backblaze/sync.go b/internal/services/backblaze/sync.go index 9245439..9cc735c 100644 --- a/internal/services/backblaze/sync.go +++ b/internal/services/backblaze/sync.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "io/fs" - "log" "os" "path/filepath" "strings" @@ -15,37 +14,35 @@ import ( "github.com/kurin/blazer/b2" ) -const writers = 4 - func (b *BackBlaze) Sync(ctx context.Context) error { - if b.bucketName == "" && (b.filePath == "" || b.dir == "") { - return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) + if b.options.Bucket == "" && (b.options.FilePath == "" || b.options.Dir == "") { + return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.options.Bucket, b.options.FilePath, b.options.Dir) } - if b.filePath != "" && b.dir != "" { + if b.options.FilePath != "" && b.options.Dir != "" { return errors.New("you must select just 1 option, dir or file") } - bc, err := b.b2Client.Bucket(ctx, b.bucketName) + bc, err := b.b2Client.Bucket(ctx, b.options.Bucket) if err != nil { return fmt.Errorf("b2Client.Bucket %w", err) } if bc == nil { - return fmt.Errorf("bucket doesn't exist %s", b.bucketName) + return fmt.Errorf("bucket doesn't exist %s", b.options.Bucket) } - log.Println("bucket found:", bc.Name()) - if b.filePath != "" { - log.Println("file:", b.filePath) + b.logger.Infoln("bucket found:", bc.Name()) + if b.options.FilePath != "" { + b.logger.Infoln("file:", b.options.FilePath) - if err := b.copyFile(ctx, bc, b.filePath); err != nil { + if err := b.copyFile(ctx, bc, b.options.FilePath); err != nil { return fmt.Errorf("copyFile %w", err) } return nil } - if b.dir != "" { + if b.options.Dir != "" { oldFiles, err := b.bucketFiles(ctx, bc) if err != nil { return fmt.Errorf("bucketFiles %w", err) @@ -58,7 +55,7 @@ func (b *BackBlaze) Sync(ctx context.Context) error { var wg sync.WaitGroup - for i := 0; i < b.maxWorkers; i++ { + for i := 0; i < b.options.MaxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() @@ -72,7 +69,7 @@ func (b *BackBlaze) Sync(ctx context.Context) error { } // Walk the directory and send files to the channel for uploading - err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error { + err = filepath.WalkDir(b.options.Dir, func(path string, d fs.DirEntry, err error) error { if err != nil { return err } @@ -110,7 +107,7 @@ func (b *BackBlaze) copyFile(ctx context.Context, bucket *b2.Bucket, src string) return err } w := bucket.Object(fi.Name()).NewWriter(ctx) - w.ConcurrentUploads = writers + w.ConcurrentUploads = b.options.ConcurrentUploads w.UseFileBuffer = true b.logger.Infoln("start copying ", fi.Name()) if _, err := io.Copy(w, f); err != nil { diff --git a/main.go b/main.go index e12f5fe..6320cea 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "runtime" "gitea.urkob.com/urko/backblaze-backup/cmd" "github.com/spf13/cobra" @@ -19,6 +20,8 @@ func init() { cmd.Sync.PersistentFlags().String("file", "", "absolute path of the file you want to upload to backblaze") cmd.Sync.PersistentFlags().String("dir", "", "absolute path of the directory you want to upload to backblaze") cmd.Sync.PersistentFlags().String("bucket", "", "backblaze bucket name") + cmd.Sync.PersistentFlags().Int("workers", runtime.NumCPU(), "The number of worker goroutines that are spawned to handle file processing in parallel. Each worker handles the upload of one file at a time, allowing for efficient use of system resources when dealing with a large number of files. The default value is the number of CPU cores available on the system, enabling the application to automatically scale its parallel processing capabilities based on the available hardware.") + cmd.Sync.PersistentFlags().Int("concurrent-uploads", 4, "The number of chunk uploads that can be performed simultaneously for a single file. When a file is uploaded, it might be split into multiple chunks to enable more efficient and reliable data transfer. The concurrent-uploads flag controls how many of these chunks can be uploaded in parallel during a single file upload. This is particularly useful for large files, where parallel chunk uploads can significantly speed up the overall upload time. The default value is 4.") cmd.Check.PersistentFlags().String("dir", "", "Specifies the absolute path of the directory containing the backup files to be compared against the Backblaze B2 bucket. This flag is mutually exclusive with the 'file' flag.") cmd.Check.PersistentFlags().String("bucket", "", "Name of the Backblaze B2 bucket against which the local files or directory will be compared.")