backblaze-backup/internal/services/backblaze/check.go

185 lines
4.7 KiB
Go
Raw Permalink Normal View History

package backblaze
import (
"context"
"errors"
"fmt"
"io/fs"
2023-08-29 11:40:07 +02:00
"path"
"path/filepath"
"sync"
2023-09-13 21:18:57 +02:00
"sync/atomic"
"time"
"github.com/kurin/blazer/b2"
)
type File struct {
Path string
Size int // file size in bytes
ModTime time.Time
IsUploaded *bool
}
// localFiles lists the local files in the given backup directory and sends them to a channel.
// It closes the channel after all files are listed.
func (b *BackBlaze) localFiles(backupDir string, fileChan chan<- File) error {
defer close(fileChan)
// Walk the directory and send files to the channel
err := filepath.WalkDir(backupDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
i, err := d.Info()
if err != nil {
return fmt.Errorf("d.Info: %w", err)
}
fileChan <- File{Path: filepath.Base(path), Size: int(i.Size()), ModTime: i.ModTime()}
}
return nil
})
if err != nil {
return fmt.Errorf("error walking the directory: %w", err)
}
return nil
}
// b2BucketFiles lists the files in the given B2 bucket and sends them to a channel.
// It closes the channel after all files are listed.
func (b *BackBlaze) b2BucketFiles(ctx context.Context, bucketName string, fileChan chan<- File) error {
bucket, err := b.b2Client.Bucket(ctx, bucketName)
defer close(fileChan)
if err != nil {
return fmt.Errorf("b2Client.Bucket %w", err)
}
bucketIter := bucket.List(ctx, b2.ListHidden())
if bucketIter == nil {
return errors.New("bucket list cannot be nil")
}
for {
if !bucketIter.Next() {
if bucketIter.Err() != nil {
return fmt.Errorf("bucketIter err %w", bucketIter.Err())
}
break
}
if bucketIter.Object() == nil {
return errors.New("bucketIter Object is nil")
}
2023-08-29 11:40:07 +02:00
fileName := path.Base(bucketIter.Object().Name())
attrs, err := bucketIter.Object().Attrs(ctx)
if err != nil {
return fmt.Errorf("bucketIter.Object().Attrs %s err %w", fileName, bucketIter.Err())
}
isUploaded := attrs.Status != b2.Uploaded
fileChan <- File{Path: path.Base(fileName), Size: int(attrs.Size), IsUploaded: &isUploaded, ModTime: attrs.UploadTimestamp}
}
return nil
}
var ErrLocalNotInCloud error = errors.New("exists locally but not in the cloud")
var ErrCloudNotInLocal error = errors.New("exists on B2 but not locally")
2023-08-28 11:01:57 +02:00
type B2Local struct {
File File
Err error
}
// CompareConcurrent concurrently fetches the list of local files and cloud files,
// then compares them to ensure all local files exist in the cloud.
// Errors are sent to a provided error channel. The function will panic if an error occurs while listing files.
2023-09-13 21:18:57 +02:00
func (b *BackBlaze) CompareConcurrent(
ctx context.Context,
backupDir, bucketName string,
localChan, b2Chan chan<- B2Local,
doneChan chan<- int,
) {
var wg sync.WaitGroup
localFiles := make(map[string]File)
cloudFiles := make(map[string]File)
localFileChan := make(chan File)
b2FileChan := make(chan File)
// Local listing
wg.Add(1)
go func() {
defer wg.Done()
wg.Add(1)
go func() {
defer wg.Done()
for f := range localFileChan {
if _, ok := localFiles[f.Path]; ok {
panic(fmt.Errorf("local file already exists in map: %s", f.Path))
}
b.logger.Debugf("local file %+v\n", f)
localFiles[f.Path] = f
}
}()
if err := b.localFiles(backupDir, localFileChan); err != nil {
panic(fmt.Errorf("b.LocalFilesWithB2: %w", err))
}
}()
// Cloud listing
wg.Add(1)
go func() {
defer wg.Done()
wg.Add(1)
go func() {
defer wg.Done()
for f := range b2FileChan {
if _, ok := cloudFiles[f.Path]; ok {
2023-10-01 14:35:50 +02:00
panic(fmt.Errorf(`cloud file already exists in map: %s\n you should run 'backblazebackup cleanup --bucket "%s"'`, f.Path, b.options.Bucket))
}
b.logger.Debugf("B2 file %+v\n", f)
cloudFiles[f.Path] = f
}
}()
if err := b.b2BucketFiles(ctx, bucketName, b2FileChan); err != nil {
panic(fmt.Errorf("b.LocalFilesWithB2: %w", err))
}
}()
// Wait for both to complete
wg.Wait()
2023-08-28 11:01:57 +02:00
// Now check local files that are not present in cloud
2023-09-13 21:18:57 +02:00
var count atomic.Int64
wg.Add(2)
go func() {
defer wg.Done()
for path, localFile := range localFiles {
if _, exists := cloudFiles[path]; !exists {
localChan <- B2Local{File: localFile, Err: ErrLocalNotInCloud}
2023-08-28 11:01:57 +02:00
continue
}
b.logger.Debugf("localFile %+v\n", localFile)
localChan <- B2Local{File: localFile, Err: nil}
2023-09-13 21:18:57 +02:00
count.Add(1)
}
}()
// Now check cloud files that are not in local
go func() {
defer wg.Done()
for path, cloudFile := range cloudFiles {
if _, exists := localFiles[path]; !exists {
b2Chan <- B2Local{File: cloudFile, Err: ErrCloudNotInLocal}
continue
}
b2Chan <- B2Local{File: cloudFile, Err: nil}
}
}()
wg.Wait()
close(localChan)
close(b2Chan)
2023-09-13 21:18:57 +02:00
doneChan <- int(count.Load())
}