some fixes on goroutines

This commit is contained in:
Urko 2023-08-08 18:13:18 +02:00
parent 2a10ceca38
commit 9c3f59b836
1 changed files with 3 additions and 19 deletions

View File

@ -13,7 +13,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
rcloneb2 "github.com/rclone/rclone/backend/b2" rcloneb2 "github.com/rclone/rclone/backend/b2"
@ -23,16 +22,10 @@ import (
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
) )
const writers = 20 const writers = 4
const maxConcurrentWeight = 10 const maxConcurrentWeight = 5
const largeFileSize = 500 * 1024 * 1024 // 500 MB const largeFileSize = 500 * 1024 * 1024 // 500 MB
type UploadMessage struct {
key string
startAt time.Time
endAt time.Time
}
type BackBalze struct { type BackBalze struct {
bucketName string bucketName string
dir string dir string
@ -47,7 +40,7 @@ func NewBackBlaze(bbID, bbKey string) *BackBalze {
return &BackBalze{ return &BackBalze{
bbID: bbID, bbID: bbID,
bbKey: bbKey, bbKey: bbKey,
maxWorkers: runtime.NumCPU() * 3, maxWorkers: runtime.NumCPU(),
} }
} }
func (b *BackBalze) WithBucket(bucketName string) *BackBalze { func (b *BackBalze) WithBucket(bucketName string) *BackBalze {
@ -63,7 +56,6 @@ func (b *BackBalze) WithFile(filePath string) *BackBalze {
return b return b
} }
func (b *BackBalze) Sync(ctx context.Context) error { func (b *BackBalze) Sync(ctx context.Context) error {
msgsChan := make(chan UploadMessage)
if b.bucketName == "" && (b.filePath == "" || b.dir == "") { if b.bucketName == "" && (b.filePath == "" || b.dir == "") {
return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir) return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir)
} }
@ -109,12 +101,6 @@ func (b *BackBalze) Sync(ctx context.Context) error {
var wg sync.WaitGroup var wg sync.WaitGroup
go func() {
for m := range msgsChan {
log.Printf("\n\t%s:\n\tstart %s \n\tend %s\n", m.key, m.startAt.Format(time.RFC3339Nano), m.endAt.Format(time.RFC3339Nano))
}
}()
for i := 0; i < b.maxWorkers; i++ { for i := 0; i < b.maxWorkers; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -146,7 +132,6 @@ func (b *BackBalze) Sync(ctx context.Context) error {
// Close the channel (no more files to send) // Close the channel (no more files to send)
close(fileChan) close(fileChan)
wg.Wait() wg.Wait()
close(msgsChan)
// Cleanup old files after backup is completed // Cleanup old files after backup is completed
if err := cleanBucket(ctx, bc, oldFiles); err != nil { if err := cleanBucket(ctx, bc, oldFiles); err != nil {
@ -273,7 +258,6 @@ func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error {
} }
w := bucket.Object(fi.Name()).NewWriter(ctx) w := bucket.Object(fi.Name()).NewWriter(ctx)
w.ConcurrentUploads = writers w.ConcurrentUploads = writers
// w.ChunkSize = 1e9 / 2
w.UseFileBuffer = true w.UseFileBuffer = true
log.Println("start copying", fi.Name()) log.Println("start copying", fi.Name())
if _, err := io.Copy(w, f); err != nil { if _, err := io.Copy(w, f); err != nil {