feat: backblaze service
This commit is contained in:
parent
318510b3f4
commit
b89196ac0e
|
@ -0,0 +1,187 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/kurin/blazer/b2"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
const writers = 10
|
||||
const maxConcurrentWeight = 4
|
||||
const largeFileSize = 500 * 1024 * 1024 // 500 MB
|
||||
|
||||
type BackBalze struct {
|
||||
bucketName string
|
||||
dir string
|
||||
filePath string
|
||||
maxWorkers int
|
||||
bbID string
|
||||
bbKey string
|
||||
}
|
||||
|
||||
func NewBackBlaze(bbID, bbKey string) *BackBalze {
|
||||
return &BackBalze{
|
||||
bbID: bbID,
|
||||
bbKey: bbKey,
|
||||
maxWorkers: runtime.NumCPU(),
|
||||
}
|
||||
}
|
||||
func (b *BackBalze) WithBucket(bucketName string) *BackBalze {
|
||||
b.bucketName = bucketName
|
||||
return b
|
||||
}
|
||||
func (b *BackBalze) WithDir(dir string) *BackBalze {
|
||||
b.dir = dir
|
||||
return b
|
||||
}
|
||||
func (b *BackBalze) WithFile(filePath string) *BackBalze {
|
||||
b.filePath = filePath
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *BackBalze) Sync() error {
|
||||
if b.bucketName == "" && (b.filePath == "" || b.dir == "") {
|
||||
return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir)
|
||||
}
|
||||
|
||||
if b.filePath != "" && b.dir != "" {
|
||||
return errors.New("you must select just 1 option, dir or file")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("b2.NewClient %w", err)
|
||||
}
|
||||
log.Println("b2Client ok")
|
||||
bc, err := b2Client.Bucket(ctx, b.bucketName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("b2Client.Bucket %w", err)
|
||||
}
|
||||
|
||||
if bc == nil {
|
||||
return fmt.Errorf("bucket doesn't exist %s", b.bucketName)
|
||||
}
|
||||
|
||||
log.Println("bucket found:", bc.Name())
|
||||
if b.filePath != "" {
|
||||
log.Println("file:", b.filePath)
|
||||
|
||||
if err := copyFile(ctx, bc, b.filePath); err != nil {
|
||||
return fmt.Errorf("copyFile %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if b.dir != "" {
|
||||
if err := cleanBucket(ctx, bc); err != nil {
|
||||
return fmt.Errorf("clearBucket %w", err)
|
||||
}
|
||||
|
||||
fileChan := make(chan string)
|
||||
uploadSem := semaphore.NewWeighted(maxConcurrentWeight)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < b.maxWorkers; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for src := range fileChan {
|
||||
info, err := os.Stat(src)
|
||||
if err != nil {
|
||||
log.Printf("error getting file info %s: %v\n", src, err)
|
||||
continue
|
||||
}
|
||||
weight := int64(1)
|
||||
if info.Size() > largeFileSize {
|
||||
weight = 2
|
||||
}
|
||||
if err := uploadSem.Acquire(ctx, weight); err == nil {
|
||||
if err := copyFile(ctx, bc, src); err != nil {
|
||||
log.Printf("error copying file %s: %v\n", src, err)
|
||||
}
|
||||
uploadSem.Release(weight)
|
||||
} else {
|
||||
log.Printf("error acquiring semaphore: %v\n", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Walk the directory and send files to the channel for uploading
|
||||
err := filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !d.IsDir() {
|
||||
fileChan <- path
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error walking the directory: %v", err)
|
||||
}
|
||||
|
||||
// Close the channel (no more files to send)
|
||||
close(fileChan)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
log.Println("copied successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error {
|
||||
log.Println("copying file", src)
|
||||
f, err := os.Open(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w := bucket.Object(fi.Name()).NewWriter(ctx)
|
||||
w.ConcurrentUploads = writers
|
||||
if _, err := io.Copy(w, f); err != nil {
|
||||
w.Close()
|
||||
return err
|
||||
}
|
||||
return w.Close()
|
||||
}
|
||||
|
||||
func cleanBucket(ctx context.Context, bucket *b2.Bucket) error {
|
||||
bucketIter := bucket.List(ctx)
|
||||
if bucketIter == nil {
|
||||
return fmt.Errorf("bucket list cannot be nil")
|
||||
}
|
||||
|
||||
for {
|
||||
if !bucketIter.Next() {
|
||||
if bucketIter.Err() != nil {
|
||||
return fmt.Errorf("bucketIter err %w", bucketIter.Err())
|
||||
}
|
||||
break
|
||||
}
|
||||
if bucketIter.Object() == nil {
|
||||
log.Println("bucketIter Object is nil")
|
||||
continue
|
||||
}
|
||||
if err := bucketIter.Object().Delete(ctx); err != nil {
|
||||
return fmt.Errorf("bucketIter.Object().Delete() %s | err %w", bucketIter.Object().Name(), err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue