package blobovniczatree import ( "context" "fmt" "os" "path/filepath" "strings" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/hrw" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) // Iterate iterates over all objects in b. func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) { var ( startedAt = time.Now() err error ) defer func() { b.metrics.Iterate(time.Since(startedAt), err == nil) }() ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Iterate", trace.WithAttributes( attribute.String("path", b.rootPath), attribute.Bool("ignore_errors", prm.IgnoreErrors), )) defer span.End() err = b.iterateBlobovniczas(ctx, prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error { var subPrm blobovnicza.IteratePrm subPrm.SetHandler(func(elem blobovnicza.IterationElement) error { data, err := b.compression.Decompress(elem.ObjectData()) if err != nil { if prm.IgnoreErrors { b.log.Warn(ctx, logs.BlobstorErrorOccurredDuringTheIteration, zap.Stringer("address", elem.Address()), zap.Error(err), zap.String("storage_id", p), zap.String("root_path", b.rootPath)) return nil } return fmt.Errorf("decompress object data: %w", err) } if prm.Handler != nil { return prm.Handler(common.IterationElement{ Address: elem.Address(), ObjectData: data, StorageID: []byte(strings.TrimSuffix(p, dbExtension)), }) } return nil }) subPrm.DecodeAddresses() _, err := blz.Iterate(ctx, subPrm) return err }) return common.IterateRes{}, err } // iterator over all Blobovniczas in unsorted order. Break on f's error return. func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error { return b.iterateExistingDBPaths(ctx, func(p string) (bool, error) { shBlz := b.getBlobovnicza(ctx, p) blz, err := shBlz.Open(ctx) if err != nil { if ignoreErrors { b.log.Warn(ctx, logs.BlobstorErrorOccurredDuringTheIteration, zap.Error(err), zap.String("storage_id", p), zap.String("root_path", b.rootPath)) return false, nil } return false, fmt.Errorf("open blobovnicza %s: %w", p, err) } defer shBlz.Close(ctx) err = f(p, blz) return err != nil, err }) } // iterateSortedLeaves iterates over the paths of Blobovniczas sorted by weight. // // Uses depth, width and leaf width for iteration. func (b *Blobovniczas) iterateSortedLeaves(ctx context.Context, addr *oid.Address, f func(string) (bool, error)) error { _, err := b.iterateSorted( ctx, addr, make([]string, 0, b.blzShallowDepth), b.blzShallowDepth, func(p []string) (bool, error) { return f(filepath.Join(p...)) }, ) return err } // iterator over directories with Blobovniczas sorted by weight. func (b *Blobovniczas) iterateDeepest(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error { depth := b.blzShallowDepth if depth > 0 { depth-- } _, err := b.iterateSorted( ctx, &addr, make([]string, 0, depth), depth, func(p []string) (bool, error) { return f(filepath.Join(p...)) }, ) return err } // iterator over particular level of directories. func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, curPath []string, execDepth uint64, f func([]string) (bool, error)) (bool, error) { isLeafLevel := uint64(len(curPath)) == b.blzShallowDepth levelWidth := b.blzShallowWidth if isLeafLevel { hasDBs, maxIdx, err := getBlobovniczaMaxIndex(filepath.Join(append([]string{b.rootPath}, curPath...)...)) if err != nil { return false, err } levelWidth = 0 if hasDBs { levelWidth = maxIdx + 1 } } indices := indexSlice(levelWidth) if !isLeafLevel { hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...))) } exec := uint64(len(curPath)) == execDepth for i := range indices { select { case <-ctx.Done(): return false, ctx.Err() default: } lastPart := u64ToHexString(indices[i]) if isLeafLevel { lastPart = u64ToHexStringExt(indices[i]) } if i == 0 { curPath = append(curPath, lastPart) } else { curPath[len(curPath)-1] = lastPart } if exec { if stop, err := f(curPath); err != nil { return false, err } else if stop { return true, nil } } else if stop, err := b.iterateSorted(ctx, addr, curPath, execDepth, f); err != nil { return false, err } else if stop { return true, nil } } return false, nil } // iterateExistingDBPaths iterates over the paths of Blobovniczas without any order. // // Uses existed blobovnicza files for iteration. func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string) (bool, error)) error { b.dbFilesGuard.RLock() defer b.dbFilesGuard.RUnlock() _, err := b.iterateExistingPathsDFS(ctx, "", f, func(path string) bool { return !strings.HasSuffix(path, rebuildSuffix) }) return err } func (b *Blobovniczas) iterateExistingPathsDFS(ctx context.Context, path string, f func(string) (bool, error), fileFilter func(path string) bool) (bool, error) { sysPath := filepath.Join(b.rootPath, path) entries, err := os.ReadDir(sysPath) if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode return false, nil } if err != nil { return false, err } for _, entry := range entries { select { case <-ctx.Done(): return false, ctx.Err() default: } if entry.IsDir() { stop, err := b.iterateExistingPathsDFS(ctx, filepath.Join(path, entry.Name()), f, fileFilter) if err != nil { return false, err } if stop { return true, nil } } else { if !fileFilter(entry.Name()) { continue } stop, err := f(filepath.Join(path, entry.Name())) if err != nil { return false, err } if stop { return true, nil } } } return false, nil } // iterateIncompletedRebuildDBPaths iterates over the paths of Blobovniczas with incompleted rebuild files without any order. func (b *Blobovniczas) iterateIncompletedRebuildDBPaths(ctx context.Context, f func(string) (bool, error)) error { b.dbFilesGuard.RLock() defer b.dbFilesGuard.RUnlock() _, err := b.iterateExistingPathsDFS(ctx, "", f, func(path string) bool { return strings.HasSuffix(path, rebuildSuffix) }) return err } func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error { b.dbFilesGuard.RLock() defer b.dbFilesGuard.RUnlock() _, err := b.iterateSordedDBPathsInternal(ctx, "", addr, f) return err } func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) { sysPath := filepath.Join(b.rootPath, path) entries, err := os.ReadDir(sysPath) if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode return false, nil } if err != nil { return false, err } var dbIdxs []uint64 var dirIdxs []uint64 for _, entry := range entries { if strings.HasSuffix(entry.Name(), rebuildSuffix) { continue } idx := u64FromHexString(entry.Name()) if entry.IsDir() { dirIdxs = append(dirIdxs, idx) } else { dbIdxs = append(dbIdxs, idx) } } if len(dbIdxs) > 0 { for _, dbIdx := range dbIdxs { dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx)) stop, err := f(dbPath) if err != nil { return false, err } if stop { return true, nil } } } if len(dirIdxs) > 0 { hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path)) for _, dirIdx := range dirIdxs { dirPath := filepath.Join(path, u64ToHexString(dirIdx)) stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f) if err != nil { return false, err } if stop { return true, nil } } } return false, nil } // makes slice of uint64 values from 0 to number-1. func indexSlice(number uint64) []uint64 { s := make([]uint64, number) for i := range s { s[i] = uint64(i) } return s }