refactor: paremeterize max workers

This commit is contained in:
Urko. 2023-10-01 14:35:50 +02:00
parent 083762e1c5
commit cd88fd2668
6 changed files with 55 additions and 46 deletions

View File

@ -35,6 +35,16 @@ var Sync = &cobra.Command{
panic(fmt.Errorf("bucket %w", err)) panic(fmt.Errorf("bucket %w", err))
} }
workers, err := cmd.Flags().GetInt("workers")
if err != nil {
panic(fmt.Errorf("workers %w", err))
}
concurrentUploads, err := cmd.Flags().GetInt("concurrent-uploads")
if err != nil {
panic(fmt.Errorf("workers %w", err))
}
envFile := "" envFile := ""
if os.Getenv("BACKBLAZE_ENV") == "dev" { if os.Getenv("BACKBLAZE_ENV") == "dev" {
envFile = kit.RootDir() + "/.env" envFile = kit.RootDir() + "/.env"
@ -49,7 +59,14 @@ var Sync = &cobra.Command{
panic(fmt.Errorf("NewBackBlaze %w", err)) panic(fmt.Errorf("NewBackBlaze %w", err))
} }
bbService = bbService.WithBucket(bucketName).WithDir(dir).WithFile(filePath) bbService = bbService.WithOptions(backblaze.BackBlazeOptions{
Bucket: bucketName,
Dir: dir,
FilePath: filePath,
MaxWorkers: workers,
ConcurrentUploads: concurrentUploads,
})
if err := bbService.Sync(ctx); err != nil { if err := bbService.Sync(ctx); err != nil {
panic(fmt.Errorf("bbService.Sync %w", err)) panic(fmt.Errorf("bbService.Sync %w", err))
} }

View File

@ -3,7 +3,6 @@ package backblaze
import ( import (
"context" "context"
"fmt" "fmt"
"runtime"
"github.com/kurin/blazer/b2" "github.com/kurin/blazer/b2"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -11,13 +10,10 @@ import (
type BackBlaze struct { type BackBlaze struct {
logger *logrus.Logger logger *logrus.Logger
bucketName string b2Client *b2.Client
bbID string bbID string
bbKey string bbKey string
dir string options BackBlazeOptions
filePath string
maxWorkers int
b2Client *b2.Client
} }
// NewBackBlaze initializes a new BackBlaze struct with given BackBlaze ID and Key. // NewBackBlaze initializes a new BackBlaze struct with given BackBlaze ID and Key.
@ -32,19 +28,18 @@ func NewBackBlaze(ctx context.Context, logger *logrus.Logger, bbID, bbKey string
b2Client: b2Client, b2Client: b2Client,
bbID: bbID, bbID: bbID,
bbKey: bbKey, bbKey: bbKey,
maxWorkers: runtime.NumCPU(),
}, nil }, nil
} }
func (b *BackBlaze) WithBucket(bucketName string) *BackBlaze { type BackBlazeOptions struct {
b.bucketName = bucketName Bucket string
return b Dir string
} FilePath string
func (b *BackBlaze) WithDir(dir string) *BackBlaze { MaxWorkers int
b.dir = dir ConcurrentUploads int
return b }
}
func (b *BackBlaze) WithFile(filePath string) *BackBlaze { func (b *BackBlaze) WithOptions(options BackBlazeOptions) *BackBlaze {
b.filePath = filePath b.options = options
return b return b
} }

View File

@ -136,7 +136,7 @@ func (b *BackBlaze) CompareConcurrent(
defer wg.Done() defer wg.Done()
for f := range b2FileChan { for f := range b2FileChan {
if _, ok := cloudFiles[f.Path]; ok { if _, ok := cloudFiles[f.Path]; ok {
panic(fmt.Errorf(`cloud file already exists in map: %s\n you should run 'backblazebackup cleanup --bucket "%s"'`, f.Path, b.bucketName)) panic(fmt.Errorf(`cloud file already exists in map: %s\n you should run 'backblazebackup cleanup --bucket "%s"'`, f.Path, b.options.Bucket))
} }
b.logger.Debugf("B2 file %+v\n", f) b.logger.Debugf("B2 file %+v\n", f)
cloudFiles[f.Path] = f cloudFiles[f.Path] = f

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"log"
"strings" "strings"
"sync" "sync"
@ -31,7 +30,6 @@ func (b *BackBlaze) ListDuplicateVersions(ctx context.Context, cancel context.Ca
if err != nil { if err != nil {
return fmt.Errorf("b2.NewClient %w", err) return fmt.Errorf("b2.NewClient %w", err)
} }
log.Println("b2Client ok")
dups, err := b.listDuplicates(ctx, cancel, b2Client) dups, err := b.listDuplicates(ctx, cancel, b2Client)
if err != nil { if err != nil {
@ -57,8 +55,7 @@ func (b *BackBlaze) listDuplicates(ctx context.Context, cancel context.CancelFun
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
dups := make([]duplicate, 0) dups := make([]duplicate, 0)
log.Println("len(buckets)", len(buckets)) sm := semaphore.NewWeighted(int64(b.options.MaxWorkers))
sm := semaphore.NewWeighted(int64(b.maxWorkers))
wg.Add(len(buckets)) wg.Add(len(buckets))
for _, bc := range buckets { for _, bc := range buckets {
if err := sm.Acquire(ctx, 1); err != nil { if err := sm.Acquire(ctx, 1); err != nil {

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"io" "io"
"io/fs" "io/fs"
"log"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@ -15,37 +14,35 @@ import (
"github.com/kurin/blazer/b2" "github.com/kurin/blazer/b2"
) )
const writers = 4
func (b *BackBlaze) Sync(ctx context.Context) error { func (b *BackBlaze) Sync(ctx context.Context) error {
if b.bucketName == "" && (b.filePath == "" || b.dir == "") { if b.options.Bucket == "" && (b.options.FilePath == "" || b.options.Dir == "") {
return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.options.Bucket, b.options.FilePath, b.options.Dir)
} }
if b.filePath != "" && b.dir != "" { if b.options.FilePath != "" && b.options.Dir != "" {
return errors.New("you must select just 1 option, dir or file") return errors.New("you must select just 1 option, dir or file")
} }
bc, err := b.b2Client.Bucket(ctx, b.bucketName) bc, err := b.b2Client.Bucket(ctx, b.options.Bucket)
if err != nil { if err != nil {
return fmt.Errorf("b2Client.Bucket %w", err) return fmt.Errorf("b2Client.Bucket %w", err)
} }
if bc == nil { if bc == nil {
return fmt.Errorf("bucket doesn't exist %s", b.bucketName) return fmt.Errorf("bucket doesn't exist %s", b.options.Bucket)
} }
log.Println("bucket found:", bc.Name()) b.logger.Infoln("bucket found:", bc.Name())
if b.filePath != "" { if b.options.FilePath != "" {
log.Println("file:", b.filePath) b.logger.Infoln("file:", b.options.FilePath)
if err := b.copyFile(ctx, bc, b.filePath); err != nil { if err := b.copyFile(ctx, bc, b.options.FilePath); err != nil {
return fmt.Errorf("copyFile %w", err) return fmt.Errorf("copyFile %w", err)
} }
return nil return nil
} }
if b.dir != "" { if b.options.Dir != "" {
oldFiles, err := b.bucketFiles(ctx, bc) oldFiles, err := b.bucketFiles(ctx, bc)
if err != nil { if err != nil {
return fmt.Errorf("bucketFiles %w", err) return fmt.Errorf("bucketFiles %w", err)
@ -58,7 +55,7 @@ func (b *BackBlaze) Sync(ctx context.Context) error {
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < b.maxWorkers; i++ { for i := 0; i < b.options.MaxWorkers; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
@ -72,7 +69,7 @@ func (b *BackBlaze) Sync(ctx context.Context) error {
} }
// Walk the directory and send files to the channel for uploading // Walk the directory and send files to the channel for uploading
err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error { err = filepath.WalkDir(b.options.Dir, func(path string, d fs.DirEntry, err error) error {
if err != nil { if err != nil {
return err return err
} }
@ -110,7 +107,7 @@ func (b *BackBlaze) copyFile(ctx context.Context, bucket *b2.Bucket, src string)
return err return err
} }
w := bucket.Object(fi.Name()).NewWriter(ctx) w := bucket.Object(fi.Name()).NewWriter(ctx)
w.ConcurrentUploads = writers w.ConcurrentUploads = b.options.ConcurrentUploads
w.UseFileBuffer = true w.UseFileBuffer = true
b.logger.Infoln("start copying ", fi.Name()) b.logger.Infoln("start copying ", fi.Name())
if _, err := io.Copy(w, f); err != nil { if _, err := io.Copy(w, f); err != nil {

View File

@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
"runtime"
"gitea.urkob.com/urko/backblaze-backup/cmd" "gitea.urkob.com/urko/backblaze-backup/cmd"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -19,6 +20,8 @@ func init() {
cmd.Sync.PersistentFlags().String("file", "", "absolute path of the file you want to upload to backblaze") cmd.Sync.PersistentFlags().String("file", "", "absolute path of the file you want to upload to backblaze")
cmd.Sync.PersistentFlags().String("dir", "", "absolute path of the directory you want to upload to backblaze") cmd.Sync.PersistentFlags().String("dir", "", "absolute path of the directory you want to upload to backblaze")
cmd.Sync.PersistentFlags().String("bucket", "", "backblaze bucket name") cmd.Sync.PersistentFlags().String("bucket", "", "backblaze bucket name")
cmd.Sync.PersistentFlags().Int("workers", runtime.NumCPU(), "The number of worker goroutines that are spawned to handle file processing in parallel. Each worker handles the upload of one file at a time, allowing for efficient use of system resources when dealing with a large number of files. The default value is the number of CPU cores available on the system, enabling the application to automatically scale its parallel processing capabilities based on the available hardware.")
cmd.Sync.PersistentFlags().Int("concurrent-uploads", 4, "The number of chunk uploads that can be performed simultaneously for a single file. When a file is uploaded, it might be split into multiple chunks to enable more efficient and reliable data transfer. The concurrent-uploads flag controls how many of these chunks can be uploaded in parallel during a single file upload. This is particularly useful for large files, where parallel chunk uploads can significantly speed up the overall upload time. The default value is 4.")
cmd.Check.PersistentFlags().String("dir", "", "Specifies the absolute path of the directory containing the backup files to be compared against the Backblaze B2 bucket. This flag is mutually exclusive with the 'file' flag.") cmd.Check.PersistentFlags().String("dir", "", "Specifies the absolute path of the directory containing the backup files to be compared against the Backblaze B2 bucket. This flag is mutually exclusive with the 'file' flag.")
cmd.Check.PersistentFlags().String("bucket", "", "Name of the Backblaze B2 bucket against which the local files or directory will be compared.") cmd.Check.PersistentFlags().String("bucket", "", "Name of the Backblaze B2 bucket against which the local files or directory will be compared.")