backblaze-backup/internal/services/backblaze/sync.go

175 lines
4.2 KiB
Go

package backblaze
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
"sync"
"github.com/kurin/blazer/b2"
)
func (b *BackBlaze) Sync(ctx context.Context) error {
if b.options.Bucket == "" && (b.options.FilePath == "" || b.options.Dir == "") {
return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.options.Bucket, b.options.FilePath, b.options.Dir)
}
if b.options.FilePath != "" && b.options.Dir != "" {
return errors.New("you must select just 1 option, dir or file")
}
bc, err := b.b2Client.Bucket(ctx, b.options.Bucket)
if err != nil {
return fmt.Errorf("b2Client.Bucket %w", err)
}
if bc == nil {
return fmt.Errorf("bucket doesn't exist %s", b.options.Bucket)
}
b.logger.Infoln("bucket found:", bc.Name())
if b.options.FilePath != "" {
// Create a separate context for long-running operations
longRunningCtx, cancelLongRunningOps := context.WithCancel(context.Background())
defer cancelLongRunningOps()
b.logger.Infoln("file:", b.options.FilePath)
if err := b.copyFile(longRunningCtx, bc, b.options.FilePath); err != nil {
return fmt.Errorf("copyFile %w", err)
}
return nil
}
if b.options.Dir != "" {
// Create a separate context for long-running operations
longRunningCtx, cancelLongRunningOps := context.WithCancel(context.Background())
defer cancelLongRunningOps()
oldFiles, err := b.bucketFiles(longRunningCtx, bc)
if err != nil {
return fmt.Errorf("bucketFiles %w", err)
}
b.logger.Debugln(strings.Repeat("*", 40))
b.logger.Debugln("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t"))
b.logger.Debugln(strings.Repeat("*", 40))
fileChan := make(chan string)
var wg sync.WaitGroup
for i := 0; i < b.options.MaxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for src := range fileChan {
if err := b.copyFile(longRunningCtx, bc, src); err != nil {
b.logger.Errorf("error copying file %s: %v\n", src, err)
continue
}
}
}()
}
// Walk the directory and send files to the channel for uploading
err = filepath.WalkDir(b.options.Dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
fileChan <- path
}
return nil
})
if err != nil {
return fmt.Errorf("error walking the directory: %v", err)
}
// Close the channel (no more files to send)
close(fileChan)
wg.Wait()
// Cleanup old files after backup is completed
if err := b.cleanBucket(longRunningCtx, bc, oldFiles); err != nil {
return fmt.Errorf("cleanBucket %w", err)
}
}
b.logger.Infoln("copied successfully")
return nil
}
func (b *BackBlaze) copyFile(ctx context.Context, bucket *b2.Bucket, src string) error {
f, err := os.Open(src)
if err != nil {
return err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return err
}
w := bucket.Object(fi.Name()).NewWriter(ctx)
w.ConcurrentUploads = b.options.ConcurrentUploads
w.UseFileBuffer = true
b.logger.Infoln("start copying ", fi.Name())
if _, err := io.Copy(w, f); err != nil {
w.Close()
return err
}
if err := w.Close(); err != nil {
return err
}
b.logger.Infoln("end copying ", fi.Name())
return nil
}
func (b *BackBlaze) cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error {
var errorBuilder strings.Builder
for _, v := range files {
obj := bucket.Object(v)
if obj == nil {
b.logger.Errorln("bucket.Object is nil", v)
continue
}
if err := obj.Delete(ctx); err != nil {
errorBuilder.WriteString(fmt.Errorf("error deleting %s : %w", v, err).Error())
errorBuilder.WriteString("; ")
}
}
if errorBuilder.Len() > 0 {
return errors.New(errorBuilder.String())
}
return nil
}
func (b *BackBlaze) bucketFiles(ctx context.Context, bucket *b2.Bucket) ([]string, error) {
bucketIter := bucket.List(ctx)
if bucketIter == nil {
return nil, fmt.Errorf("bucket list cannot be nil")
}
var files []string
for {
if !bucketIter.Next() {
if bucketIter.Err() != nil {
return nil, fmt.Errorf("bucketIter err %w", bucketIter.Err())
}
break
}
if bucketIter.Object() == nil {
b.logger.Errorln("bucketIter Object is nil")
continue
}
files = append(files, bucketIter.Object().Name())
}
return files, nil
}