feat: handle log of start and end

This commit is contained in:
Urko 2023-08-01 13:41:20 +02:00
parent bcd461ad06
commit ec7b19808f
1 changed files with 31 additions and 10 deletions

View File

@ -13,6 +13,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
rcloneb2 "github.com/rclone/rclone/backend/b2" rcloneb2 "github.com/rclone/rclone/backend/b2"
@ -26,6 +27,12 @@ const writers = 20
const maxConcurrentWeight = 10 const maxConcurrentWeight = 10
const largeFileSize = 500 * 1024 * 1024 // 500 MB const largeFileSize = 500 * 1024 * 1024 // 500 MB
type UploadMessage struct {
key string
startAt time.Time
endAt time.Time
}
type BackBalze struct { type BackBalze struct {
bucketName string bucketName string
dir string dir string
@ -56,6 +63,7 @@ func (b *BackBalze) WithFile(filePath string) *BackBalze {
return b return b
} }
func (b *BackBalze) Sync(ctx context.Context) error { func (b *BackBalze) Sync(ctx context.Context) error {
msgsChan := make(chan UploadMessage)
if b.bucketName == "" && (b.filePath == "" || b.dir == "") { if b.bucketName == "" && (b.filePath == "" || b.dir == "") {
return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir)
} }
@ -82,7 +90,7 @@ func (b *BackBalze) Sync(ctx context.Context) error {
if b.filePath != "" { if b.filePath != "" {
log.Println("file:", b.filePath) log.Println("file:", b.filePath)
if err := copyFile(ctx, bc, b.filePath); err != nil { if _, err := copyFile(ctx, bc, b.filePath); err != nil {
return fmt.Errorf("copyFile %w", err) return fmt.Errorf("copyFile %w", err)
} }
return nil return nil
@ -101,15 +109,22 @@ func (b *BackBalze) Sync(ctx context.Context) error {
var wg sync.WaitGroup var wg sync.WaitGroup
go func() {
for m := range msgsChan {
log.Printf("\n\t%s:\n\tstart %s \n\tend %s\n", m.key, m.startAt.Format(time.RFC3339Nano), m.endAt.Format(time.RFC3339Nano))
}
}()
for i := 0; i < b.maxWorkers; i++ { for i := 0; i < b.maxWorkers; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
for src := range fileChan { for src := range fileChan {
log.Println("start copying file", src) msg, err := copyFile(ctx, bc, src)
if err := copyFile(ctx, bc, src); err != nil { if err != nil {
log.Printf("error copying file %s: %v\n", src, err) log.Printf("error copying file %s: %v\n", src, err)
} }
msgsChan <- msg
} }
}() }()
} }
@ -131,6 +146,7 @@ func (b *BackBalze) Sync(ctx context.Context) error {
// Close the channel (no more files to send) // Close the channel (no more files to send)
close(fileChan) close(fileChan)
wg.Wait() wg.Wait()
close(msgsChan)
// Cleanup old files after backup is completed // Cleanup old files after backup is completed
if err := cleanBucket(ctx, bc, oldFiles); err != nil { if err := cleanBucket(ctx, bc, oldFiles); err != nil {
@ -170,7 +186,7 @@ func (b *BackBalze) OldSync() error {
if b.filePath != "" { if b.filePath != "" {
log.Println("file:", b.filePath) log.Println("file:", b.filePath)
if err := copyFile(ctx, bc, b.filePath); err != nil { if _, err := copyFile(ctx, bc, b.filePath); err != nil {
return fmt.Errorf("copyFile %w", err) return fmt.Errorf("copyFile %w", err)
} }
return nil return nil
@ -206,7 +222,7 @@ func (b *BackBalze) OldSync() error {
} }
if err := uploadSem.Acquire(ctx, weight); err == nil { if err := uploadSem.Acquire(ctx, weight); err == nil {
log.Println("start copying file", src) log.Println("start copying file", src)
if err := copyFile(ctx, bc, src); err != nil { if _, err := copyFile(ctx, bc, src); err != nil {
log.Printf("error copying file %s: %v\n", src, err) log.Printf("error copying file %s: %v\n", src, err)
} }
uploadSem.Release(weight) uploadSem.Release(weight)
@ -245,23 +261,28 @@ func (b *BackBalze) OldSync() error {
return nil return nil
} }
func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error { func copyFile(ctx context.Context, bucket *b2.Bucket, src string) (UploadMessage, error) {
f, err := os.Open(src) f, err := os.Open(src)
if err != nil { if err != nil {
return err return UploadMessage{}, err
} }
defer f.Close() defer f.Close()
fi, err := f.Stat() fi, err := f.Stat()
if err != nil { if err != nil {
return err return UploadMessage{}, err
} }
w := bucket.Object(fi.Name()).NewWriter(ctx) w := bucket.Object(fi.Name()).NewWriter(ctx)
w.ConcurrentUploads = writers w.ConcurrentUploads = writers
msg := UploadMessage{
key: fi.Name(),
startAt: time.Now(),
}
if _, err := io.Copy(w, f); err != nil { if _, err := io.Copy(w, f); err != nil {
w.Close() w.Close()
return err return msg, err
} }
return w.Close() msg.endAt = time.Now()
return msg, w.Close()
} }
func cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error { func cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error {