From b89196ac0e3017885db7413e92d234ce7a26f775 Mon Sep 17 00:00:00 2001 From: Urko Date: Mon, 10 Jul 2023 11:59:37 +0200 Subject: [PATCH] feat: backblaze service --- internal/services/backblaze.go | 187 +++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 internal/services/backblaze.go diff --git a/internal/services/backblaze.go b/internal/services/backblaze.go new file mode 100644 index 0000000..680ea8b --- /dev/null +++ b/internal/services/backblaze.go @@ -0,0 +1,187 @@ +package services + +import ( + "context" + "errors" + "fmt" + "io" + "io/fs" + "log" + "os" + "path/filepath" + "runtime" + "sync" + + "github.com/kurin/blazer/b2" + "golang.org/x/sync/semaphore" +) + +const writers = 10 +const maxConcurrentWeight = 4 +const largeFileSize = 500 * 1024 * 1024 // 500 MB + +type BackBalze struct { + bucketName string + dir string + filePath string + maxWorkers int + bbID string + bbKey string +} + +func NewBackBlaze(bbID, bbKey string) *BackBalze { + return &BackBalze{ + bbID: bbID, + bbKey: bbKey, + maxWorkers: runtime.NumCPU(), + } +} +func (b *BackBalze) WithBucket(bucketName string) *BackBalze { + b.bucketName = bucketName + return b +} +func (b *BackBalze) WithDir(dir string) *BackBalze { + b.dir = dir + return b +} +func (b *BackBalze) WithFile(filePath string) *BackBalze { + b.filePath = filePath + return b +} + +func (b *BackBalze) Sync() error { + if b.bucketName == "" && (b.filePath == "" || b.dir == "") { + return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) + } + + if b.filePath != "" && b.dir != "" { + return errors.New("you must select just 1 option, dir or file") + } + + ctx := context.Background() + b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey) + if err != nil { + return fmt.Errorf("b2.NewClient %w", err) + } + log.Println("b2Client ok") + bc, err := b2Client.Bucket(ctx, b.bucketName) + if err != nil { + return fmt.Errorf("b2Client.Bucket %w", err) + } + + if bc == nil { + return fmt.Errorf("bucket doesn't exist %s", b.bucketName) + } + + log.Println("bucket found:", bc.Name()) + if b.filePath != "" { + log.Println("file:", b.filePath) + + if err := copyFile(ctx, bc, b.filePath); err != nil { + return fmt.Errorf("copyFile %w", err) + } + return nil + } + + if b.dir != "" { + if err := cleanBucket(ctx, bc); err != nil { + return fmt.Errorf("clearBucket %w", err) + } + + fileChan := make(chan string) + uploadSem := semaphore.NewWeighted(maxConcurrentWeight) + + var wg sync.WaitGroup + + for i := 0; i < b.maxWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for src := range fileChan { + info, err := os.Stat(src) + if err != nil { + log.Printf("error getting file info %s: %v\n", src, err) + continue + } + weight := int64(1) + if info.Size() > largeFileSize { + weight = 2 + } + if err := uploadSem.Acquire(ctx, weight); err == nil { + if err := copyFile(ctx, bc, src); err != nil { + log.Printf("error copying file %s: %v\n", src, err) + } + uploadSem.Release(weight) + } else { + log.Printf("error acquiring semaphore: %v\n", err) + } + } + }() + } + + // Walk the directory and send files to the channel for uploading + err := filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() { + fileChan <- path + } + return nil + }) + if err != nil { + return fmt.Errorf("error walking the directory: %v", err) + } + + // Close the channel (no more files to send) + close(fileChan) + wg.Wait() + } + + log.Println("copied successfully") + return nil +} + +func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error { + log.Println("copying file", src) + f, err := os.Open(src) + if err != nil { + return err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return err + } + w := bucket.Object(fi.Name()).NewWriter(ctx) + w.ConcurrentUploads = writers + if _, err := io.Copy(w, f); err != nil { + w.Close() + return err + } + return w.Close() +} + +func cleanBucket(ctx context.Context, bucket *b2.Bucket) error { + bucketIter := bucket.List(ctx) + if bucketIter == nil { + return fmt.Errorf("bucket list cannot be nil") + } + + for { + if !bucketIter.Next() { + if bucketIter.Err() != nil { + return fmt.Errorf("bucketIter err %w", bucketIter.Err()) + } + break + } + if bucketIter.Object() == nil { + log.Println("bucketIter Object is nil") + continue + } + if err := bucketIter.Object().Delete(ctx); err != nil { + return fmt.Errorf("bucketIter.Object().Delete() %s | err %w", bucketIter.Object().Name(), err) + } + } + return nil +}