package backblaze import ( "context" "errors" "fmt" "io/fs" "path" "path/filepath" "sync" "sync/atomic" "time" "github.com/kurin/blazer/b2" ) type File struct { Path string Size int // file size in bytes ModTime time.Time IsUploaded *bool } // localFiles lists the local files in the given backup directory and sends them to a channel. // It closes the channel after all files are listed. func (b *BackBlaze) localFiles(backupDir string, fileChan chan<- File) error { defer close(fileChan) // Walk the directory and send files to the channel err := filepath.WalkDir(backupDir, func(path string, d fs.DirEntry, err error) error { if err != nil { return err } if !d.IsDir() { i, err := d.Info() if err != nil { return fmt.Errorf("d.Info: %w", err) } fileChan <- File{Path: filepath.Base(path), Size: int(i.Size()), ModTime: i.ModTime()} } return nil }) if err != nil { return fmt.Errorf("error walking the directory: %w", err) } return nil } // b2BucketFiles lists the files in the given B2 bucket and sends them to a channel. // It closes the channel after all files are listed. func (b *BackBlaze) b2BucketFiles(ctx context.Context, bucketName string, fileChan chan<- File) error { bucket, err := b.b2Client.Bucket(ctx, bucketName) defer close(fileChan) if err != nil { return fmt.Errorf("b2Client.Bucket %w", err) } bucketIter := bucket.List(ctx, b2.ListHidden()) if bucketIter == nil { return errors.New("bucket list cannot be nil") } for { if !bucketIter.Next() { if bucketIter.Err() != nil { return fmt.Errorf("bucketIter err %w", bucketIter.Err()) } break } if bucketIter.Object() == nil { return errors.New("bucketIter Object is nil") } fileName := path.Base(bucketIter.Object().Name()) attrs, err := bucketIter.Object().Attrs(ctx) if err != nil { return fmt.Errorf("bucketIter.Object().Attrs %s err %w", fileName, bucketIter.Err()) } isUploaded := attrs.Status != b2.Uploaded fileChan <- File{Path: path.Base(fileName), Size: int(attrs.Size), IsUploaded: &isUploaded, ModTime: attrs.UploadTimestamp} } return nil } var ErrLocalNotInCloud error = errors.New("exists locally but not in the cloud") var ErrCloudNotInLocal error = errors.New("exists on B2 but not locally") type B2Local struct { File File Err error } // CompareConcurrent concurrently fetches the list of local files and cloud files, // then compares them to ensure all local files exist in the cloud. // Errors are sent to a provided error channel. The function will panic if an error occurs while listing files. func (b *BackBlaze) CompareConcurrent( ctx context.Context, backupDir, bucketName string, localChan, b2Chan chan<- B2Local, doneChan chan<- int, ) { var wg sync.WaitGroup localFiles := make(map[string]File) cloudFiles := make(map[string]File) localFileChan := make(chan File) b2FileChan := make(chan File) // Local listing wg.Add(1) go func() { defer wg.Done() wg.Add(1) go func() { defer wg.Done() for f := range localFileChan { if _, ok := localFiles[f.Path]; ok { panic(fmt.Errorf("local file already exists in map: %s", f.Path)) } b.logger.Debugf("local file %+v\n", f) localFiles[f.Path] = f } }() if err := b.localFiles(backupDir, localFileChan); err != nil { panic(fmt.Errorf("b.LocalFilesWithB2: %w", err)) } }() // Cloud listing wg.Add(1) go func() { defer wg.Done() wg.Add(1) go func() { defer wg.Done() for f := range b2FileChan { if _, ok := cloudFiles[f.Path]; ok { panic(fmt.Errorf(`cloud file already exists in map: %s\n you should run 'backblazebackup cleanup --bucket "%s"'`, f.Path, b.bucketName)) } b.logger.Debugf("B2 file %+v\n", f) cloudFiles[f.Path] = f } }() if err := b.b2BucketFiles(ctx, bucketName, b2FileChan); err != nil { panic(fmt.Errorf("b.LocalFilesWithB2: %w", err)) } }() // Wait for both to complete wg.Wait() // Now check local files that are not present in cloud var count atomic.Int64 wg.Add(2) go func() { defer wg.Done() for path, localFile := range localFiles { if _, exists := cloudFiles[path]; !exists { localChan <- B2Local{File: localFile, Err: ErrLocalNotInCloud} continue } b.logger.Debugf("localFile %+v\n", localFile) localChan <- B2Local{File: localFile, Err: nil} count.Add(1) } }() // Now check cloud files that are not in local go func() { defer wg.Done() for path, cloudFile := range cloudFiles { if _, exists := localFiles[path]; !exists { b2Chan <- B2Local{File: cloudFile, Err: ErrCloudNotInLocal} continue } b2Chan <- B2Local{File: cloudFile, Err: nil} } }() wg.Wait() close(localChan) close(b2Chan) doneChan <- int(count.Load()) }