package backblaze import ( "context" "errors" "fmt" "io/fs" "path" "path/filepath" "sync" "github.com/kurin/blazer/b2" ) // localFiles lists the local files in the given backup directory and sends them to a channel. // It closes the channel after all files are listed. func (b *BackBlaze) localFiles(backupDir string, fileChan chan<- string) error { defer close(fileChan) // Walk the directory and send files to the channel err := filepath.WalkDir(backupDir, func(path string, d fs.DirEntry, err error) error { if err != nil { return err } if !d.IsDir() { fileChan <- filepath.Base(path) } return nil }) if err != nil { return fmt.Errorf("error walking the directory: %v", err) } return nil } // b2BucketFiles lists the files in the given B2 bucket and sends them to a channel. // It closes the channel after all files are listed. func (b *BackBlaze) b2BucketFiles(ctx context.Context, bucketName string, fileChan chan<- string) error { bucket, err := b.b2Client.Bucket(ctx, bucketName) defer close(fileChan) if err != nil { return fmt.Errorf("b2Client.Bucket %w", err) } bucketIter := bucket.List(ctx, b2.ListHidden()) if bucketIter == nil { return errors.New("bucket list cannot be nil") } for { if !bucketIter.Next() { if bucketIter.Err() != nil { return fmt.Errorf("bucketIter err %w", bucketIter.Err()) } break } if bucketIter.Object() == nil { return errors.New("bucketIter Object is nil") } // Retrieve just filename b.logger.Debugln("bucket file: ", path.Base(bucketIter.Object().Name())) fileChan <- path.Base(bucketIter.Object().Name()) } return nil } var ErrLocalNotInCloud error = errors.New("exists locally but not in the cloud") var ErrCloudNotInLocal error = errors.New("exists on cloud but not locally") type B2Local struct { File string Err error LocalCount int } // CompareConcurrent concurrently fetches the list of local files and cloud files, // then compares them to ensure all local files exist in the cloud. // Errors are sent to a provided error channel. The function will panic if an error occurs while listing files. func (b *BackBlaze) CompareConcurrent(ctx context.Context, backupDir, bucketName string, msgChan chan<- B2Local) { var wg sync.WaitGroup localFiles := make(map[string]int) cloudFiles := make(map[string]int) localFileChan := make(chan string) b2FileChan := make(chan string) // Local listing wg.Add(1) go func() { defer wg.Done() wg.Add(1) go func() { defer wg.Done() for f := range localFileChan { if _, ok := localFiles[f]; ok { panic(fmt.Errorf("local file already exists in map: %s", f)) } b.logger.Debugln("local file ", f) localFiles[f]++ } }() if err := b.localFiles(backupDir, localFileChan); err != nil { panic(fmt.Errorf("b.LocalFilesWithB2: %w", err)) } }() // Cloud listing wg.Add(1) go func() { defer wg.Done() wg.Add(1) go func() { defer wg.Done() for f := range b2FileChan { if _, ok := cloudFiles[f]; ok { panic(fmt.Errorf("cloud file already exists in map: %s", f)) } b.logger.Debugln("B2 file ", f) cloudFiles[f]++ } }() if err := b.b2BucketFiles(ctx, bucketName, b2FileChan); err != nil { panic(fmt.Errorf("b.LocalFilesWithB2: %w", err)) } }() // Wait for both to complete wg.Wait() // Now check local files that are not present in cloud wg.Add(2) go func() { defer wg.Done() for localFile := range localFiles { if _, exists := cloudFiles[localFile]; !exists { msgChan <- B2Local{File: localFile, Err: ErrLocalNotInCloud} continue } msgChan <- B2Local{File: localFile, Err: nil} } }() // Now check cloud files that are not in local go func() { defer wg.Done() for cloudFile := range cloudFiles { b.logger.Debugln("cloudFile ", cloudFile) if _, exists := localFiles[cloudFile]; !exists { msgChan <- B2Local{File: cloudFile, Err: ErrCloudNotInLocal} } } }() wg.Wait() msgChan <- B2Local{Err: nil, LocalCount: len(localFiles)} close(msgChan) }