package backblaze import ( "context" "errors" "fmt" "io" "io/fs" "os" "path/filepath" "strings" "sync" "github.com/kurin/blazer/b2" ) func (b *BackBlaze) Sync(ctx context.Context) error { if b.options.Bucket == "" && (b.options.FilePath == "" || b.options.Dir == "") { return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.options.Bucket, b.options.FilePath, b.options.Dir) } if b.options.FilePath != "" && b.options.Dir != "" { return errors.New("you must select just 1 option, dir or file") } bc, err := b.b2Client.Bucket(ctx, b.options.Bucket) if err != nil { return fmt.Errorf("b2Client.Bucket %w", err) } if bc == nil { return fmt.Errorf("bucket doesn't exist %s", b.options.Bucket) } b.logger.Infoln("bucket found:", bc.Name()) if b.options.FilePath != "" { // Create a separate context for long-running operations longRunningCtx, cancelLongRunningOps := context.WithCancel(context.Background()) defer cancelLongRunningOps() b.logger.Infoln("file:", b.options.FilePath) if err := b.copyFile(longRunningCtx, bc, b.options.FilePath); err != nil { return fmt.Errorf("copyFile %w", err) } return nil } if b.options.Dir != "" { // Create a separate context for long-running operations longRunningCtx, cancelLongRunningOps := context.WithCancel(context.Background()) defer cancelLongRunningOps() oldFiles, err := b.bucketFiles(longRunningCtx, bc) if err != nil { return fmt.Errorf("bucketFiles %w", err) } b.logger.Debugln(strings.Repeat("*", 40)) b.logger.Debugln("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t")) b.logger.Debugln(strings.Repeat("*", 40)) fileChan := make(chan string) var wg sync.WaitGroup for i := 0; i < b.options.MaxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for src := range fileChan { if err := b.copyFile(longRunningCtx, bc, src); err != nil { b.logger.Errorf("error copying file %s: %v\n", src, err) continue } } }() } // Walk the directory and send files to the channel for uploading err = filepath.WalkDir(b.options.Dir, func(path string, d fs.DirEntry, err error) error { if err != nil { return err } if !d.IsDir() { fileChan <- path } return nil }) if err != nil { return fmt.Errorf("error walking the directory: %v", err) } // Close the channel (no more files to send) close(fileChan) wg.Wait() // Cleanup old files after backup is completed if err := b.cleanBucket(longRunningCtx, bc, oldFiles); err != nil { return fmt.Errorf("cleanBucket %w", err) } } b.logger.Infoln("copied successfully") return nil } func (b *BackBlaze) copyFile(ctx context.Context, bucket *b2.Bucket, src string) error { f, err := os.Open(src) if err != nil { return err } defer f.Close() fi, err := f.Stat() if err != nil { return err } w := bucket.Object(fi.Name()).NewWriter(ctx) w.ConcurrentUploads = b.options.ConcurrentUploads w.UseFileBuffer = true b.logger.Infoln("start copying ", fi.Name()) if _, err := io.Copy(w, f); err != nil { w.Close() return err } if err := w.Close(); err != nil { return err } b.logger.Infoln("end copying ", fi.Name()) return nil } func (b *BackBlaze) cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error { var errorBuilder strings.Builder for _, v := range files { obj := bucket.Object(v) if obj == nil { b.logger.Errorln("bucket.Object is nil", v) continue } if err := obj.Delete(ctx); err != nil { errorBuilder.WriteString(fmt.Errorf("error deleting %s : %w", v, err).Error()) errorBuilder.WriteString("; ") } } if errorBuilder.Len() > 0 { return errors.New(errorBuilder.String()) } return nil } func (b *BackBlaze) bucketFiles(ctx context.Context, bucket *b2.Bucket) ([]string, error) { bucketIter := bucket.List(ctx) if bucketIter == nil { return nil, fmt.Errorf("bucket list cannot be nil") } var files []string for { if !bucketIter.Next() { if bucketIter.Err() != nil { return nil, fmt.Errorf("bucketIter err %w", bucketIter.Err()) } break } if bucketIter.Object() == nil { b.logger.Errorln("bucketIter Object is nil") continue } files = append(files, bucketIter.Object().Name()) } return files, nil }