From 2ca045913d6173bccdc10e7621883d3743064708 Mon Sep 17 00:00:00 2001 From: Urko Date: Thu, 13 Jul 2023 22:01:18 +0200 Subject: [PATCH] feat: upload 1 file per CPU core --- internal/services/backblaze.go | 90 +++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/internal/services/backblaze.go b/internal/services/backblaze.go index e372f86..ba47e04 100644 --- a/internal/services/backblaze.go +++ b/internal/services/backblaze.go @@ -31,6 +31,7 @@ type BackBalze struct { } func NewBackBlaze(bbID, bbKey string) *BackBalze { + log.Println("runtime.NumCPU()", runtime.NumCPU()) return &BackBalze{ bbID: bbID, bbKey: bbKey, @@ -49,7 +50,6 @@ func (b *BackBalze) WithFile(filePath string) *BackBalze { b.filePath = filePath return b } - func (b *BackBalze) Sync() error { if b.bucketName == "" && (b.filePath == "" || b.dir == "") { return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) @@ -84,6 +84,94 @@ func (b *BackBalze) Sync() error { return nil } + if b.dir != "" { + oldFiles, err := bucketFiles(ctx, bc) + if err != nil { + return fmt.Errorf("bucketFiles %w", err) + } + log.Println(strings.Repeat("*", 40)) + log.Println("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t")) + log.Println(strings.Repeat("*", 40)) + + fileChan := make(chan string) + + var wg sync.WaitGroup + + for i := 0; i < b.maxWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for src := range fileChan { + log.Println("start copying file", src) + if err := copyFile(ctx, bc, src); err != nil { + log.Printf("error copying file %s: %v\n", src, err) + } + } + }() + } + + // Walk the directory and send files to the channel for uploading + err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() { + fileChan <- path + } + return nil + }) + if err != nil { + return fmt.Errorf("error walking the directory: %v", err) + } + + // Close the channel (no more files to send) + close(fileChan) + wg.Wait() + + // Cleanup old files after backup is completed + if err := cleanBucket(ctx, bc, oldFiles); err != nil { + return fmt.Errorf("cleanBucket %w", err) + } + } + + log.Println("copied successfully") + return nil +} + +func (b *BackBalze) OldSync() error { + if b.bucketName == "" && (b.filePath == "" || b.dir == "") { + return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) + } + + if b.filePath != "" && b.dir != "" { + return errors.New("you must select just 1 option, dir or file") + } + + ctx := context.Background() + b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey) + if err != nil { + return fmt.Errorf("b2.NewClient %w", err) + } + log.Println("b2Client ok") + bc, err := b2Client.Bucket(ctx, b.bucketName) + if err != nil { + return fmt.Errorf("b2Client.Bucket %w", err) + } + + if bc == nil { + return fmt.Errorf("bucket doesn't exist %s", b.bucketName) + } + + log.Println("bucket found:", bc.Name()) + if b.filePath != "" { + log.Println("file:", b.filePath) + + if err := copyFile(ctx, bc, b.filePath); err != nil { + return fmt.Errorf("copyFile %w", err) + } + return nil + } + if b.dir != "" { oldFiles, err := bucketFiles(ctx, bc) if err != nil {