458 lines
10 KiB
Go
458 lines
10 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
rcloneb2 "github.com/rclone/rclone/backend/b2"
|
|
|
|
"github.com/kurin/blazer/b2"
|
|
"github.com/rclone/rclone/fs/config/configmap"
|
|
"github.com/rclone/rclone/fs/operations"
|
|
"golang.org/x/sync/semaphore"
|
|
)
|
|
|
|
const writers = 20
|
|
const maxConcurrentWeight = 10
|
|
const largeFileSize = 500 * 1024 * 1024 // 500 MB
|
|
|
|
type UploadMessage struct {
|
|
key string
|
|
startAt time.Time
|
|
endAt time.Time
|
|
}
|
|
|
|
type BackBalze struct {
|
|
bucketName string
|
|
dir string
|
|
filePath string
|
|
maxWorkers int
|
|
bbID string
|
|
bbKey string
|
|
}
|
|
|
|
func NewBackBlaze(bbID, bbKey string) *BackBalze {
|
|
log.Println("runtime.NumCPU()", runtime.NumCPU())
|
|
return &BackBalze{
|
|
bbID: bbID,
|
|
bbKey: bbKey,
|
|
maxWorkers: runtime.NumCPU() * 3,
|
|
}
|
|
}
|
|
func (b *BackBalze) WithBucket(bucketName string) *BackBalze {
|
|
b.bucketName = bucketName
|
|
return b
|
|
}
|
|
func (b *BackBalze) WithDir(dir string) *BackBalze {
|
|
b.dir = dir
|
|
return b
|
|
}
|
|
func (b *BackBalze) WithFile(filePath string) *BackBalze {
|
|
b.filePath = filePath
|
|
return b
|
|
}
|
|
func (b *BackBalze) Sync(ctx context.Context) error {
|
|
msgsChan := make(chan UploadMessage)
|
|
if b.bucketName == "" && (b.filePath == "" || b.dir == "") {
|
|
return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir)
|
|
}
|
|
|
|
if b.filePath != "" && b.dir != "" {
|
|
return errors.New("you must select just 1 option, dir or file")
|
|
}
|
|
|
|
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
|
|
if err != nil {
|
|
return fmt.Errorf("b2.NewClient %w", err)
|
|
}
|
|
log.Println("b2Client ok")
|
|
bc, err := b2Client.Bucket(ctx, b.bucketName)
|
|
if err != nil {
|
|
return fmt.Errorf("b2Client.Bucket %w", err)
|
|
}
|
|
|
|
if bc == nil {
|
|
return fmt.Errorf("bucket doesn't exist %s", b.bucketName)
|
|
}
|
|
|
|
log.Println("bucket found:", bc.Name())
|
|
if b.filePath != "" {
|
|
log.Println("file:", b.filePath)
|
|
|
|
if err := copyFile(ctx, bc, b.filePath); err != nil {
|
|
return fmt.Errorf("copyFile %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if b.dir != "" {
|
|
oldFiles, err := bucketFiles(ctx, bc)
|
|
if err != nil {
|
|
return fmt.Errorf("bucketFiles %w", err)
|
|
}
|
|
log.Println(strings.Repeat("*", 40))
|
|
log.Println("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t"))
|
|
log.Println(strings.Repeat("*", 40))
|
|
|
|
fileChan := make(chan string)
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
go func() {
|
|
for m := range msgsChan {
|
|
log.Printf("\n\t%s:\n\tstart %s \n\tend %s\n", m.key, m.startAt.Format(time.RFC3339Nano), m.endAt.Format(time.RFC3339Nano))
|
|
}
|
|
}()
|
|
|
|
for i := 0; i < b.maxWorkers; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for src := range fileChan {
|
|
err := copyFile(ctx, bc, src)
|
|
if err != nil {
|
|
log.Printf("error copying file %s: %v\n", src, err)
|
|
continue
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Walk the directory and send files to the channel for uploading
|
|
err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !d.IsDir() {
|
|
fileChan <- path
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("error walking the directory: %v", err)
|
|
}
|
|
|
|
// Close the channel (no more files to send)
|
|
close(fileChan)
|
|
wg.Wait()
|
|
close(msgsChan)
|
|
|
|
// Cleanup old files after backup is completed
|
|
if err := cleanBucket(ctx, bc, oldFiles); err != nil {
|
|
return fmt.Errorf("cleanBucket %w", err)
|
|
}
|
|
}
|
|
|
|
log.Println("copied successfully")
|
|
return nil
|
|
}
|
|
|
|
func (b *BackBalze) OldSync() error {
|
|
if b.bucketName == "" && (b.filePath == "" || b.dir == "") {
|
|
return fmt.Errorf("bucket name is %v | filePath is %v | dir is %v", b.bucketName, b.filePath, b.dir)
|
|
}
|
|
|
|
if b.filePath != "" && b.dir != "" {
|
|
return errors.New("you must select just 1 option, dir or file")
|
|
}
|
|
|
|
ctx := context.Background()
|
|
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
|
|
if err != nil {
|
|
return fmt.Errorf("b2.NewClient %w", err)
|
|
}
|
|
log.Println("b2Client ok")
|
|
bc, err := b2Client.Bucket(ctx, b.bucketName)
|
|
if err != nil {
|
|
return fmt.Errorf("b2Client.Bucket %w", err)
|
|
}
|
|
|
|
if bc == nil {
|
|
return fmt.Errorf("bucket doesn't exist %s", b.bucketName)
|
|
}
|
|
|
|
log.Println("bucket found:", bc.Name())
|
|
if b.filePath != "" {
|
|
log.Println("file:", b.filePath)
|
|
|
|
if err := copyFile(ctx, bc, b.filePath); err != nil {
|
|
return fmt.Errorf("copyFile %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if b.dir != "" {
|
|
oldFiles, err := bucketFiles(ctx, bc)
|
|
if err != nil {
|
|
return fmt.Errorf("bucketFiles %w", err)
|
|
}
|
|
log.Println(strings.Repeat("*", 40))
|
|
log.Println("oldFiles to clean:\n\t\t" + strings.Join(oldFiles, "\n\t\t"))
|
|
log.Println(strings.Repeat("*", 40))
|
|
|
|
fileChan := make(chan string)
|
|
uploadSem := semaphore.NewWeighted(maxConcurrentWeight)
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
for i := 0; i < b.maxWorkers; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for src := range fileChan {
|
|
info, err := os.Stat(src)
|
|
if err != nil {
|
|
log.Printf("error getting file info %s: %v\n", src, err)
|
|
continue
|
|
}
|
|
weight := int64(1)
|
|
if info.Size() > largeFileSize {
|
|
weight = 2
|
|
}
|
|
if err := uploadSem.Acquire(ctx, weight); err == nil {
|
|
log.Println("start copying file", src)
|
|
if err := copyFile(ctx, bc, src); err != nil {
|
|
log.Printf("error copying file %s: %v\n", src, err)
|
|
}
|
|
uploadSem.Release(weight)
|
|
} else {
|
|
log.Printf("error acquiring semaphore: %v\n", err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Walk the directory and send files to the channel for uploading
|
|
err = filepath.WalkDir(b.dir, func(path string, d fs.DirEntry, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !d.IsDir() {
|
|
fileChan <- path
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("error walking the directory: %v", err)
|
|
}
|
|
|
|
// Close the channel (no more files to send)
|
|
close(fileChan)
|
|
wg.Wait()
|
|
|
|
// Cleanup old files after backup is completed
|
|
if err := cleanBucket(ctx, bc, oldFiles); err != nil {
|
|
return fmt.Errorf("cleanBucket %w", err)
|
|
}
|
|
}
|
|
|
|
log.Println("copied successfully")
|
|
return nil
|
|
}
|
|
|
|
func copyFile(ctx context.Context, bucket *b2.Bucket, src string) error {
|
|
f, err := os.Open(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
fi, err := f.Stat()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
w := bucket.Object(fi.Name()).NewWriter(ctx)
|
|
w.ConcurrentUploads = writers
|
|
// w.ChunkSize = 1e9 / 2
|
|
w.UseFileBuffer = true
|
|
log.Println("start copying", fi.Name())
|
|
if _, err := io.Copy(w, f); err != nil {
|
|
w.Close()
|
|
return err
|
|
}
|
|
log.Println("end copying", fi.Name())
|
|
return w.Close()
|
|
}
|
|
|
|
func cleanBucket(ctx context.Context, bucket *b2.Bucket, files []string) error {
|
|
var errorBuilder strings.Builder
|
|
for _, v := range files {
|
|
obj := bucket.Object(v)
|
|
if obj == nil {
|
|
log.Println("object is nil", v)
|
|
continue
|
|
}
|
|
if err := obj.Delete(ctx); err != nil {
|
|
errorBuilder.WriteString(fmt.Errorf("error deleting %s : %w", v, err).Error())
|
|
errorBuilder.WriteString("; ")
|
|
}
|
|
}
|
|
|
|
if errorBuilder.Len() > 0 {
|
|
return errors.New(errorBuilder.String())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func bucketFiles(ctx context.Context, bucket *b2.Bucket) ([]string, error) {
|
|
bucketIter := bucket.List(ctx)
|
|
if bucketIter == nil {
|
|
return nil, fmt.Errorf("bucket list cannot be nil")
|
|
}
|
|
var files []string
|
|
for {
|
|
if !bucketIter.Next() {
|
|
if bucketIter.Err() != nil {
|
|
return nil, fmt.Errorf("bucketIter err %w", bucketIter.Err())
|
|
}
|
|
break
|
|
}
|
|
if bucketIter.Object() == nil {
|
|
log.Println("bucketIter Object is nil")
|
|
continue
|
|
}
|
|
files = append(files, bucketIter.Object().Name())
|
|
}
|
|
return files, nil
|
|
}
|
|
|
|
type duplicate struct {
|
|
bucket string
|
|
file string
|
|
count int
|
|
}
|
|
|
|
func (d duplicate) dir() string {
|
|
if !strings.Contains(d.file, "/") {
|
|
return d.bucket
|
|
}
|
|
splitted := strings.Split(d.file, "/")
|
|
return strings.Join(splitted[:(len(splitted)-1)], "/")
|
|
}
|
|
|
|
func (b *BackBalze) CleanUp(ctx context.Context, cancel context.CancelFunc) error {
|
|
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
|
|
if err != nil {
|
|
return fmt.Errorf("b2.NewClient %w", err)
|
|
}
|
|
log.Println("b2Client ok")
|
|
|
|
dups, err := b.listDuplicates(ctx, cancel, b2Client)
|
|
if err != nil {
|
|
return fmt.Errorf("b.listDuplicates: %w", err)
|
|
}
|
|
|
|
if len(dups) <= 0 {
|
|
return nil
|
|
}
|
|
|
|
for _, d := range dups {
|
|
smpl := configmap.Simple{}
|
|
smpl.Set("account", b.bbID)
|
|
smpl.Set("key", b.bbKey)
|
|
smpl.Set("chunk_size", strconv.FormatInt(int64(9600), 10))
|
|
f, err := rcloneb2.NewFs(ctx, "B2", d.dir(), smpl)
|
|
if err != nil {
|
|
return fmt.Errorf("rclonefs.NewFs %w", err)
|
|
}
|
|
if err := operations.CleanUp(ctx, f); err != nil {
|
|
return fmt.Errorf("operations.CleanUp %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *BackBalze) ListDuplicateVersions(ctx context.Context, cancel context.CancelFunc) error {
|
|
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
|
|
if err != nil {
|
|
return fmt.Errorf("b2.NewClient %w", err)
|
|
}
|
|
log.Println("b2Client ok")
|
|
|
|
dups, err := b.listDuplicates(ctx, cancel, b2Client)
|
|
if err != nil {
|
|
return fmt.Errorf("b.listDuplicates: %w", err)
|
|
}
|
|
|
|
if len(dups) > 0 {
|
|
var builder strings.Builder
|
|
for _, dup := range dups {
|
|
builder.WriteString(fmt.Sprintf("%+v\n", dup))
|
|
}
|
|
return fmt.Errorf("found duplicates: %s", builder.String())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *BackBalze) listDuplicates(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client) ([]duplicate, error) {
|
|
buckets, err := b2Client.ListBuckets(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("b2Client.Bucket %w", err)
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
dups := make([]duplicate, 0)
|
|
|
|
log.Println("len(buckets)", len(buckets))
|
|
sm := semaphore.NewWeighted(int64(b.maxWorkers))
|
|
wg.Add(len(buckets))
|
|
for _, bc := range buckets {
|
|
if err := sm.Acquire(ctx, 1); err != nil {
|
|
return nil, fmt.Errorf("sm.Acquire %w", err)
|
|
}
|
|
|
|
go func(bc *b2.Bucket) {
|
|
defer sm.Release(1)
|
|
defer wg.Done()
|
|
files := make(map[string]int, 0)
|
|
|
|
bucketIter := bc.List(ctx, b2.ListHidden())
|
|
if bucketIter == nil {
|
|
log.Println("bucket list cannot be nil")
|
|
return
|
|
}
|
|
|
|
for {
|
|
if !bucketIter.Next() {
|
|
if bucketIter.Err() != nil {
|
|
log.Println("bucketIter err %w", bucketIter.Err())
|
|
return
|
|
}
|
|
break
|
|
}
|
|
if bucketIter.Object() == nil {
|
|
log.Println("bucketIter Object is nil")
|
|
continue
|
|
}
|
|
files[bucketIter.Object().Name()]++
|
|
}
|
|
|
|
// Search duplicates
|
|
for file, count := range files {
|
|
if count > 1 {
|
|
dups = append(dups, duplicate{
|
|
bucket: bc.Name(),
|
|
file: file,
|
|
count: count,
|
|
})
|
|
}
|
|
}
|
|
}(bc)
|
|
}
|
|
wg.Wait()
|
|
|
|
return dups, nil
|
|
}
|