package services import ( "context" "errors" "fmt" "io" "io/fs" "log" "os" "path/filepath" "runtime" "strconv" "strings" "sync" rcloneb2 "github.com/rclone/rclone/backend/b2" "github.com/kurin/blazer/b2" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/operations" "golang.org/x/sync/semaphore" ) const writers = 20 const maxConcurrentWeight = 10 const largeFileSize = 500 * 1024 * 1024 // 500 MB type BackBalze struct { bucketName string dir string filePath string maxWorkers int bbID string bbKey string } func NewBackBlaze(bbID, bbKey string) *BackBalze { log.Println("runtime.NumCPU()", runtime.NumCPU()) return &BackBalze{ bbID: bbID, bbKey: bbKey, maxWorkers: runtime.NumCPU() * 3, } } func (b *BackBalze) WithBucket(bucketName string) *BackBalze { b.bucketName = bucketName return b } func (b *BackBalze) WithDir(dir string) *BackBalze { b.dir = dir return b } func (b *BackBalze) WithFile(filePath string) *BackBalze { b.filePath = filePath return b } func (b *BackBalze) Sync(ctx context.Context) error { if b.bucketName == "" && (b.filePath == "" || b.dir == "") { return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) } if b.filePath != "" && b.dir != "" { return errors.New("you must select just 1 option, dir or file") } b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey) if err != nil { return fmt.Errorf("b2.NewClient %w", err) } log.Println("b2Client ok") bc, err := b2Client.Bucket(ctx, b.bucketName) if err != nil { return fmt.Errorf("b2Client.Bucket %w", err) } if bc == nil { return fmt.Errorf("bucket doesn't exist %s", b.bucketName) } log.Println("bucket found:", bc.Name()) if b.filePath != "" { log.Println("file:", b.filePath) if err := copyFile(ctx, bc, b.filePath); err != nil { return fmt.Errorf("copyFile %w", err) } return nil } if b.dir != "" { oldFiles, err := bucketFiles(ctx, bc) if err != nil { return fmt.Errorf("bucketFiles %w", err) } log.Println(strings.Repeat("*", 40)) log.Println("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t")) log.Println(strings.Repeat("*", 40)) fileChan := make(chan string) var wg sync.WaitGroup for i := 0; i < b.maxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for src := range fileChan { log.Println("start copying file", src) if err := copyFile(ctx, bc, src); err != nil { log.Printf("error copying file %s: %v\n", src, err) } } }() } // Walk the directory and send files to the channel for uploading err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error { if err != nil { return err } if !d.IsDir() { fileChan <- path } return nil }) if err != nil { return fmt.Errorf("error walking the directory: %v", err) } // Close the channel (no more files to send) close(fileChan) wg.Wait() // Cleanup old files after backup is completed if err := cleanBucket(ctx, bc, oldFiles); err != nil { return fmt.Errorf("cleanBucket %w", err) } } log.Println("copied successfully") return nil } func (b *BackBalze) OldSync() error { if b.bucketName == "" && (b.filePath == "" || b.dir == "") { return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) } if b.filePath != "" && b.dir != "" { return errors.New("you must select just 1 option, dir or file") } ctx := context.Background() b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey) if err != nil { return fmt.Errorf("b2.NewClient %w", err) } log.Println("b2Client ok") bc, err := b2Client.Bucket(ctx, b.bucketName) if err != nil { return fmt.Errorf("b2Client.Bucket %w", err) } if bc == nil { return fmt.Errorf("bucket doesn't exist %s", b.bucketName) } log.Println("bucket found:", bc.Name()) if b.filePath != "" { log.Println("file:", b.filePath) if err := copyFile(ctx, bc, b.filePath); err != nil { return fmt.Errorf("copyFile %w", err) } return nil } if b.dir != "" { oldFiles, err := bucketFiles(ctx, bc) if err != nil { return fmt.Errorf("bucketFiles %w", err) } log.Println(strings.Repeat("*", 40)) log.Println("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t")) log.Println(strings.Repeat("*", 40)) fileChan := make(chan string) uploadSem := semaphore.NewWeighted(maxConcurrentWeight) var wg sync.WaitGroup for i := 0; i < b.maxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for src := range fileChan { info, err := os.Stat(src) if err != nil { log.Printf("error getting file info %s: %v\n", src, err) continue } weight := int64(1) if info.Size() > largeFileSize { weight = 2 } if err := uploadSem.Acquire(ctx, weight); err == nil { log.Println("start copying file", src) if err := copyFile(ctx, bc, src); err != nil { log.Printf("error copying file %s: %v\n", src, err) } uploadSem.Release(weight) } else { log.Printf("error acquiring semaphore: %v\n", err) } } }() } // Walk the directory and send files to the channel for uploading err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error { if err != nil { return err } if !d.IsDir() { fileChan <- path } return nil }) if err != nil { return fmt.Errorf("error walking the directory: %v", err) } // Close the channel (no more files to send) close(fileChan) wg.Wait() // Cleanup old files after backup is completed if err := cleanBucket(ctx, bc, oldFiles); err != nil { return fmt.Errorf("cleanBucket %w", err) } } log.Println("copied successfully") return nil } func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error { f, err := os.Open(src) if err != nil { return err } defer f.Close() fi, err := f.Stat() if err != nil { return err } w := bucket.Object(fi.Name()).NewWriter(ctx) w.ConcurrentUploads = writers if _, err := io.Copy(w, f); err != nil { w.Close() return err } return w.Close() } func cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error { var errorBuilder strings.Builder for _, v := range files { obj := bucket.Object(v) if obj == nil { log.Println("object is nil", v) continue } if err := obj.Delete(ctx); err != nil { errorBuilder.WriteString(fmt.Errorf("error deleting %s : %w", v, err).Error()) errorBuilder.WriteString("; ") } } if errorBuilder.Len() > 0 { return errors.New(errorBuilder.String()) } return nil } func bucketFiles(ctx context.Context, bucket *b2.Bucket) ([]string, error) { bucketIter := bucket.List(ctx) if bucketIter == nil { return nil, fmt.Errorf("bucket list cannot be nil") } var files []string for { if !bucketIter.Next() { if bucketIter.Err() != nil { return nil, fmt.Errorf("bucketIter err %w", bucketIter.Err()) } break } if bucketIter.Object() == nil { log.Println("bucketIter Object is nil") continue } files = append(files, bucketIter.Object().Name()) } return files, nil } type duplicate struct { bucket string file string count int } func (d duplicate) dir() string { if !strings.Contains(d.file, "/") { return d.bucket } splitted := strings.Split(d.file, "/") return strings.Join(splitted[:(len(splitted)-1)], "/") } func (b *BackBalze) CleanUp(ctx context.Context, cancel context.CancelFunc) error { b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey) if err != nil { return fmt.Errorf("b2.NewClient %w", err) } log.Println("b2Client ok") dups, err := b.listDuplicates(ctx, cancel, b2Client) if err != nil { return fmt.Errorf("b.listDuplicates: %w", err) } if len(dups) <= 0 { return nil } for _, d := range dups { smpl := configmap.Simple{} smpl.Set("account", b.bbID) smpl.Set("key", b.bbKey) smpl.Set("chunk_size", strconv.FormatInt(int64(9600), 10)) f, err := rcloneb2.NewFs(ctx, "B2", d.dir(), smpl) if err != nil { return fmt.Errorf("rclonefs.NewFs %w", err) } if err := operations.CleanUp(ctx, f); err != nil { return fmt.Errorf("operations.CleanUp %w", err) } } return nil } func (b *BackBalze) ListDuplicateVersions(ctx context.Context, cancel context.CancelFunc) error { b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey) if err != nil { return fmt.Errorf("b2.NewClient %w", err) } log.Println("b2Client ok") dups, err := b.listDuplicates(ctx, cancel, b2Client) if err != nil { return fmt.Errorf("b.listDuplicates: %w", err) } if len(dups) > 0 { var builder strings.Builder for _, dup := range dups { builder.WriteString(fmt.Sprintf("%+v\n", dup)) } return fmt.Errorf("found duplicates: %s", builder.String()) } return nil } func (b *BackBalze) listDuplicates(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client) ([]duplicate, error) { buckets, err := b2Client.ListBuckets(ctx) if err != nil { return nil, fmt.Errorf("b2Client.Bucket %w", err) } wg := sync.WaitGroup{} dups := make([]duplicate, 0) log.Println("len(buckets)", len(buckets)) sm := semaphore.NewWeighted(int64(b.maxWorkers)) wg.Add(len(buckets)) for _, bc := range buckets { if err := sm.Acquire(ctx, 1); err != nil { return nil, fmt.Errorf("sm.Acquire %w", err) } go func(bc *b2.Bucket) { defer sm.Release(1) defer wg.Done() files := make(map[string]int, 0) bucketIter := bc.List(ctx, b2.ListHidden()) if bucketIter == nil { log.Println("bucket list cannot be nil") return } for { if !bucketIter.Next() { if bucketIter.Err() != nil { log.Println("bucketIter err %w", bucketIter.Err()) return } break } if bucketIter.Object() == nil { log.Println("bucketIter Object is nil") continue } files[bucketIter.Object().Name()]++ } // Search duplicates for file, count := range files { if count > 1 { dups = append(dups, duplicate{ bucket: bc.Name(), file: file, count: count, }) } } }(bc) } wg.Wait() return dups, nil }