feat: allow cleanup by bucket

This commit is contained in:
Urko 2023-08-12 09:52:04 +02:00
parent 9c3f59b836
commit 5bbeb8c392
4 changed files with 173 additions and 98 deletions

View File

@ -18,6 +18,11 @@ var Cleanup = &cobra.Command{
ctx, cancel := context.WithCancel(signalContext(cmd.Context()))
defer cancel()
bucketName, err := cmd.Flags().GetString("bucket")
if err != nil {
log.Fatalln("bucket %w", err)
}
envFile := ""
if os.Getenv("BACKBLAZE_ENV") == "dev" {
envFile = ".env"
@ -25,7 +30,7 @@ var Cleanup = &cobra.Command{
cfg := config.NewConfig(envFile)
bbService := services.NewBackBlaze(cfg.BbId, cfg.BbKey)
if err := bbService.CleanUp(ctx, cancel); err != nil {
if err := bbService.CleanUp(ctx, cancel, bucketName); err != nil {
log.Fatalln("bbService.CleanUp()", err)
}
},

View File

@ -317,35 +317,39 @@ type duplicate struct {
count int
}
func (d duplicate) dir() string {
if !strings.Contains(d.file, "/") {
return d.bucket
}
splitted := strings.Split(d.file, "/")
return strings.Join(splitted[:(len(splitted)-1)], "/")
}
func (b *BackBalze) CleanUp(ctx context.Context, cancel context.CancelFunc, bucketName string) error {
func (b *BackBalze) CleanUp(ctx context.Context, cancel context.CancelFunc) error {
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
if err != nil {
return fmt.Errorf("b2.NewClient %w", err)
}
log.Println("b2Client ok")
dups, err := b.listDuplicates(ctx, cancel, b2Client)
var dups []duplicate
if bucketName != "" {
dups, err = b.listDuplicatesFromBucket(ctx, cancel, b2Client, bucketName)
if err != nil {
return fmt.Errorf("b.listDuplicatesFromBucket: %w", err)
}
} else {
dups, err = b.listDuplicates(ctx, cancel, b2Client)
if err != nil {
return fmt.Errorf("b.listDuplicates: %w", err)
}
}
if len(dups) <= 0 {
return nil
}
for _, d := range dups {
smpl := configmap.Simple{}
smpl.Set("account", b.bbID)
smpl.Set("key", b.bbKey)
smpl.Set("chunk_size", strconv.FormatInt(int64(9600), 10))
log.Println("duplicates", len(dups))
for _, d := range dups {
f, err := rcloneb2.NewFs(ctx, "B2", d.dir(), smpl)
if err != nil {
return fmt.Errorf("rclonefs.NewFs %w", err)
@ -353,89 +357,8 @@ func (b *BackBalze) CleanUp(ctx context.Context, cancel context.CancelFunc) erro
if err := operations.CleanUp(ctx, f); err != nil {
return fmt.Errorf("operations.CleanUp %w", err)
}
log.Println(d.dir(), "cleaned up")
}
return nil
}
func (b *BackBalze) ListDuplicateVersions(ctx context.Context, cancel context.CancelFunc) error {
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
if err != nil {
return fmt.Errorf("b2.NewClient %w", err)
}
log.Println("b2Client ok")
dups, err := b.listDuplicates(ctx, cancel, b2Client)
if err != nil {
return fmt.Errorf("b.listDuplicates: %w", err)
}
if len(dups) > 0 {
var builder strings.Builder
for _, dup := range dups {
builder.WriteString(fmt.Sprintf("%+v\n", dup))
}
return fmt.Errorf("found duplicates: %s", builder.String())
}
return nil
}
func (b *BackBalze) listDuplicates(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client) ([]duplicate, error) {
buckets, err := b2Client.ListBuckets(ctx)
if err != nil {
return nil, fmt.Errorf("b2Client.Bucket %w", err)
}
wg := sync.WaitGroup{}
dups := make([]duplicate, 0)
log.Println("len(buckets)", len(buckets))
sm := semaphore.NewWeighted(int64(b.maxWorkers))
wg.Add(len(buckets))
for _, bc := range buckets {
if err := sm.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("sm.Acquire %w", err)
}
go func(bc *b2.Bucket) {
defer sm.Release(1)
defer wg.Done()
files := make(map[string]int, 0)
bucketIter := bc.List(ctx, b2.ListHidden())
if bucketIter == nil {
log.Println("bucket list cannot be nil")
return
}
for {
if !bucketIter.Next() {
if bucketIter.Err() != nil {
log.Println("bucketIter err %w", bucketIter.Err())
return
}
break
}
if bucketIter.Object() == nil {
log.Println("bucketIter Object is nil")
continue
}
files[bucketIter.Object().Name()]++
}
// Search duplicates
for file, count := range files {
if count > 1 {
dups = append(dups, duplicate{
bucket: bc.Name(),
file: file,
count: count,
})
}
}
}(bc)
}
wg.Wait()
return dups, nil
}

View File

@ -0,0 +1,145 @@
package services
import (
"context"
"errors"
"fmt"
"log"
"strings"
"sync"
"github.com/kurin/blazer/b2"
"golang.org/x/sync/semaphore"
)
func (d duplicate) dir() string {
if !strings.Contains(d.file, "/") {
return d.bucket
}
splitted := strings.Split(d.file, "/")
return strings.Join(splitted[:(len(splitted)-1)], "/")
}
func (b *BackBalze) ListDuplicateVersions(ctx context.Context, cancel context.CancelFunc) error {
b2Client, err := b2.NewClient(ctx, b.bbID, b.bbKey)
if err != nil {
return fmt.Errorf("b2.NewClient %w", err)
}
log.Println("b2Client ok")
dups, err := b.listDuplicates(ctx, cancel, b2Client)
if err != nil {
return fmt.Errorf("b.listDuplicates: %w", err)
}
if len(dups) > 0 {
var builder strings.Builder
for _, dup := range dups {
builder.WriteString(fmt.Sprintf("%+v\n", dup))
}
return fmt.Errorf("found duplicates: %s", builder.String())
}
return nil
}
func (b *BackBalze) listDuplicates(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client) ([]duplicate, error) {
buckets, err := b2Client.ListBuckets(ctx)
if err != nil {
return nil, fmt.Errorf("b2Client.Bucket %w", err)
}
wg := sync.WaitGroup{}
dups := make([]duplicate, 0)
log.Println("len(buckets)", len(buckets))
sm := semaphore.NewWeighted(int64(b.maxWorkers))
wg.Add(len(buckets))
for _, bc := range buckets {
if err := sm.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("sm.Acquire %w", err)
}
go func(bc *b2.Bucket) {
defer sm.Release(1)
defer wg.Done()
files := make(map[string]int, 0)
bucketIter := bc.List(ctx, b2.ListHidden())
if bucketIter == nil {
log.Println("bucket list cannot be nil")
return
}
for {
if !bucketIter.Next() {
if bucketIter.Err() != nil {
log.Println("bucketIter err %w", bucketIter.Err())
return
}
break
}
if bucketIter.Object() == nil {
log.Println("bucketIter Object is nil")
continue
}
files[bucketIter.Object().Name()]++
}
// Search duplicates
for file, count := range files {
if count > 1 {
dups = append(dups, duplicate{
bucket: bc.Name(),
file: file,
count: count,
})
}
}
}(bc)
}
wg.Wait()
return dups, nil
}
func (b *BackBalze) listDuplicatesFromBucket(ctx context.Context, cancel context.CancelFunc, b2Client *b2.Client, bucketName string) ([]duplicate, error) {
bucket, err := b2Client.Bucket(ctx, bucketName)
if err != nil {
return nil, fmt.Errorf("b2Client.Bucket %w", err)
}
dups := make([]duplicate, 0)
files := make(map[string]int, 0)
bucketIter := bucket.List(ctx, b2.ListHidden())
if bucketIter == nil {
return nil, errors.New("bucket list cannot be nil")
}
for {
if !bucketIter.Next() {
if bucketIter.Err() != nil {
return nil, fmt.Errorf("bucketIter err %w", bucketIter.Err())
}
break
}
if bucketIter.Object() == nil {
return nil, errors.New("bucketIter Object is nil")
}
files[bucketIter.Object().Name()]++
}
// Search duplicates
for file, count := range files {
if count > 1 {
dups = append(dups, duplicate{
bucket: bucket.Name(),
file: file,
count: count,
})
}
}
return dups, nil
}

View File

@ -19,6 +19,8 @@ func init() {
cmd.Sync.PersistentFlags().String("file", "", "absolute path of the file you want to upload to backblaze")
cmd.Sync.PersistentFlags().String("dir", "", "absolute path of the directory you want to upload to backblaze")
cmd.Sync.PersistentFlags().String("bucket", "", "backblaze bucket name")
cmd.Cleanup.PersistentFlags().String("bucket", "", "backblaze bucket name")
}
func main() {