feat: delete files after upload is completed

This commit is contained in:
Urko 2023-07-10 16:24:24 +02:00
parent 993199aca3
commit 9176c33eee
2 changed files with 43 additions and 13 deletions

View File

@ -15,6 +15,7 @@ var rootCmd = &cobra.Command{
Short: "Backblaze backup tool", Short: "Backblaze backup tool",
Long: `A tool to backup files and directories to Backblaze.`, Long: `A tool to backup files and directories to Backblaze.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
log.SetFlags(log.Lmicroseconds)
filePath, err := cmd.Flags().GetString("file") filePath, err := cmd.Flags().GetString("file")
if err != nil { if err != nil {
log.Fatalln("file %w", err) log.Fatalln("file %w", err)

View File

@ -10,6 +10,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
"strings"
"sync" "sync"
"github.com/kurin/blazer/b2" "github.com/kurin/blazer/b2"
@ -84,9 +85,13 @@ func (b *BackBalze) Sync() error {
} }
if b.dir != "" { if b.dir != "" {
if err := cleanBucket(ctx, bc); err != nil { oldFiles, err := bucketFiles(ctx, bc)
return fmt.Errorf("clearBucket %w", err) if err != nil {
return fmt.Errorf("bucketFiles %w", err)
} }
log.Println(strings.Repeat("*", 40))
log.Println("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t"))
log.Println(strings.Repeat("*", 40))
fileChan := make(chan string) fileChan := make(chan string)
uploadSem := semaphore.NewWeighted(maxConcurrentWeight) uploadSem := semaphore.NewWeighted(maxConcurrentWeight)
@ -108,6 +113,7 @@ func (b *BackBalze) Sync() error {
weight = 2 weight = 2
} }
if err := uploadSem.Acquire(ctx, weight); err == nil { if err := uploadSem.Acquire(ctx, weight); err == nil {
log.Println("start copying file", src)
if err := copyFile(ctx, bc, src); err != nil { if err := copyFile(ctx, bc, src); err != nil {
log.Printf("error copying file %s: %v\n", src, err) log.Printf("error copying file %s: %v\n", src, err)
} }
@ -120,7 +126,7 @@ func (b *BackBalze) Sync() error {
} }
// Walk the directory and send files to the channel for uploading // Walk the directory and send files to the channel for uploading
err := filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error { err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error {
if err != nil { if err != nil {
return err return err
} }
@ -136,6 +142,11 @@ func (b *BackBalze) Sync() error {
// Close the channel (no more files to send) // Close the channel (no more files to send)
close(fileChan) close(fileChan)
wg.Wait() wg.Wait()
// Cleanup old files after backup is completed
if err := cleanBucket(ctx, bc, oldFiles); err != nil {
return fmt.Errorf("cleanBucket %w", err)
}
} }
log.Println("copied successfully") log.Println("copied successfully")
@ -143,7 +154,6 @@ func (b *BackBalze) Sync() error {
} }
func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error { func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error {
log.Println("copying file", src)
f, err := os.Open(src) f, err := os.Open(src)
if err != nil { if err != nil {
return err return err
@ -162,16 +172,37 @@ func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error {
return w.Close() return w.Close()
} }
func cleanBucket(ctx context.Context, bucket *b2.Bucket) error { func cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error {
bucketIter := bucket.List(ctx) var errorBuilder strings.Builder
if bucketIter == nil { for _, v := range files {
return fmt.Errorf("bucket list cannot be nil") obj := bucket.Object(v)
if obj == nil {
log.Println("object is nil", v)
continue
}
if err := obj.Delete(ctx); err != nil {
errorBuilder.WriteString(fmt.Errorf("error deleting %s : %w", v, err).Error())
errorBuilder.WriteString("; ")
}
} }
if errorBuilder.Len() > 0 {
return errors.New(errorBuilder.String())
}
return nil
}
func bucketFiles(ctx context.Context, bucket *b2.Bucket) ([]string, error) {
bucketIter := bucket.List(ctx)
if bucketIter == nil {
return nil, fmt.Errorf("bucket list cannot be nil")
}
var files []string
for { for {
if !bucketIter.Next() { if !bucketIter.Next() {
if bucketIter.Err() != nil { if bucketIter.Err() != nil {
return fmt.Errorf("bucketIter err %w", bucketIter.Err()) return nil, fmt.Errorf("bucketIter err %w", bucketIter.Err())
} }
break break
} }
@ -179,9 +210,7 @@ func cleanBucket(ctx context.Context, bucket *b2.Bucket) error {
log.Println("bucketIter Object is nil") log.Println("bucketIter Object is nil")
continue continue
} }
if err := bucketIter.Object().Delete(ctx); err != nil { files = append(files, bucketIter.Object().Name())
return fmt.Errorf("bucketIter.Object().Delete() %s | err %w", bucketIter.Object().Name(), err)
}
} }
return nil return files, nil
} }