From 9c3f59b836036660fac0d3cdeba0f2f976e20c8b Mon Sep 17 00:00:00 2001 From: Urko Date: Tue, 8 Aug 2023 18:13:18 +0200 Subject: [PATCH] some fixes on goroutines --- internal/services/backblaze.go | 22 +++------------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/internal/services/backblaze.go b/internal/services/backblaze.go index 9547812..7d3191b 100644 --- a/internal/services/backblaze.go +++ b/internal/services/backblaze.go @@ -13,7 +13,6 @@ import ( "strconv" "strings" "sync" - "time" rcloneb2 "github.com/rclone/rclone/backend/b2" @@ -23,16 +22,10 @@ import ( "golang.org/x/sync/semaphore" ) -const writers = 20 -const maxConcurrentWeight = 10 +const writers = 4 +const maxConcurrentWeight = 5 const largeFileSize = 500 * 1024 * 1024 // 500 MB -type UploadMessage struct { - key string - startAt time.Time - endAt time.Time -} - type BackBalze struct { bucketName string dir string @@ -47,7 +40,7 @@ func NewBackBlaze(bbID, bbKey string) *BackBalze { return &BackBalze{ bbID: bbID, bbKey: bbKey, - maxWorkers: runtime.NumCPU() * 3, + maxWorkers: runtime.NumCPU(), } } func (b *BackBalze) WithBucket(bucketName string) *BackBalze { @@ -63,7 +56,6 @@ func (b *BackBalze) WithFile(filePath string) *BackBalze { return b } func (b *BackBalze) Sync(ctx context.Context) error { - msgsChan := make(chan UploadMessage) if b.bucketName == "" && (b.filePath == "" || b.dir == "") { return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) } @@ -109,12 +101,6 @@ func (b *BackBalze) Sync(ctx context.Context) error { var wg sync.WaitGroup - go func() { - for m := range msgsChan { - log.Printf("\n\t%s:\n\tstart %s \n\tend %s\n", m.key, m.startAt.Format(time.RFC3339Nano), m.endAt.Format(time.RFC3339Nano)) - } - }() - for i := 0; i < b.maxWorkers; i++ { wg.Add(1) go func() { @@ -146,7 +132,6 @@ func (b *BackBalze) Sync(ctx context.Context) error { // Close the channel (no more files to send) close(fileChan) wg.Wait() - close(msgsChan) // Cleanup old files after backup is completed if err := cleanBucket(ctx, bc, oldFiles); err != nil { @@ -273,7 +258,6 @@ func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error { } w := bucket.Object(fi.Name()).NewWriter(ctx) w.ConcurrentUploads = writers - // w.ChunkSize = 1e9 / 2 w.UseFileBuffer = true log.Println("start copying", fi.Name()) if _, err := io.Copy(w, f); err != nil {