diff --git a/internal/services/backblaze.go b/internal/services/backblaze.go index 278a8d2..4a0d8cf 100644 --- a/internal/services/backblaze.go +++ b/internal/services/backblaze.go @@ -13,6 +13,7 @@ import ( "strconv" "strings" "sync" + "time" rcloneb2 "github.com/rclone/rclone/backend/b2" @@ -26,6 +27,12 @@ const writers = 20 const maxConcurrentWeight = 10 const largeFileSize = 500 * 1024 * 1024 // 500 MB +type UploadMessage struct { + key string + startAt time.Time + endAt time.Time +} + type BackBalze struct { bucketName string dir string @@ -56,6 +63,7 @@ func (b *BackBalze) WithFile(filePath string) *BackBalze { return b } func (b *BackBalze) Sync(ctx context.Context) error { + msgsChan := make(chan UploadMessage) if b.bucketName == "" && (b.filePath == "" || b.dir == "") { return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) } @@ -82,7 +90,7 @@ func (b *BackBalze) Sync(ctx context.Context) error { if b.filePath != "" { log.Println("file:", b.filePath) - if err := copyFile(ctx, bc, b.filePath); err != nil { + if _, err := copyFile(ctx, bc, b.filePath); err != nil { return fmt.Errorf("copyFile %w", err) } return nil @@ -101,15 +109,22 @@ func (b *BackBalze) Sync(ctx context.Context) error { var wg sync.WaitGroup + go func() { + for m := range msgsChan { + log.Printf("\n\t%s:\n\tstart %s \n\tend %s\n", m.key, m.startAt.Format(time.RFC3339Nano), m.endAt.Format(time.RFC3339Nano)) + } + }() + for i := 0; i < b.maxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for src := range fileChan { - log.Println("start copying file", src) - if err := copyFile(ctx, bc, src); err != nil { + msg, err := copyFile(ctx, bc, src) + if err != nil { log.Printf("error copying file %s: %v\n", src, err) } + msgsChan <- msg } }() } @@ -131,6 +146,7 @@ func (b *BackBalze) Sync(ctx context.Context) error { // Close the channel (no more files to send) close(fileChan) wg.Wait() + close(msgsChan) // Cleanup old files after backup is completed if err := cleanBucket(ctx, bc, oldFiles); err != nil { @@ -170,7 +186,7 @@ func (b *BackBalze) OldSync() error { if b.filePath != "" { log.Println("file:", b.filePath) - if err := copyFile(ctx, bc, b.filePath); err != nil { + if _, err := copyFile(ctx, bc, b.filePath); err != nil { return fmt.Errorf("copyFile %w", err) } return nil @@ -206,7 +222,7 @@ func (b *BackBalze) OldSync() error { } if err := uploadSem.Acquire(ctx, weight); err == nil { log.Println("start copying file", src) - if err := copyFile(ctx, bc, src); err != nil { + if _, err := copyFile(ctx, bc, src); err != nil { log.Printf("error copying file %s: %v\n", src, err) } uploadSem.Release(weight) @@ -245,23 +261,28 @@ func (b *BackBalze) OldSync() error { return nil } -func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error { +func copyFile(ctx context.Context, bucket *b2.Bucket, src string) (UploadMessage, error) { f, err := os.Open(src) if err != nil { - return err + return UploadMessage{}, err } defer f.Close() fi, err := f.Stat() if err != nil { - return err + return UploadMessage{}, err } w := bucket.Object(fi.Name()).NewWriter(ctx) w.ConcurrentUploads = writers + msg := UploadMessage{ + key: fi.Name(), + startAt: time.Now(), + } if _, err := io.Copy(w, f); err != nil { w.Close() - return err + return msg, err } - return w.Close() + msg.endAt = time.Now() + return msg, w.Close() } func cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error {