frostfs-node/pkg/local_object_storage/blobstor/blobtree/iterate.go
Dmitrii Stepanov 4daef70774 [#645] blobtree: Store path as storageID
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-11-29 19:34:22 +03:00

109 lines
2.2 KiB
Go

package blobtree
import (
"context"
"os"
"path/filepath"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
func (b *BlobTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
var (
startedAt = time.Now()
err error
)
defer func() {
b.cfg.metrics.Iterate(time.Since(startedAt), err == nil)
}()
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Iterate",
trace.WithAttributes(
attribute.String("path", b.cfg.rootPath),
attribute.Bool("ignore_errors", prm.IgnoreErrors),
))
defer span.End()
err = b.iterateDir("", prm)
return common.IterateRes{}, err
}
func (b *BlobTree) iterateDir(dir string, prm common.IteratePrm) error {
entities, err := os.ReadDir(filepath.Join(b.cfg.rootPath, dir))
if err != nil {
if prm.IgnoreErrors {
return nil
}
return err
}
for _, entity := range entities {
if entity.IsDir() {
err := b.iterateDir(filepath.Join(dir, entity.Name()), prm)
if err != nil {
return err
}
continue
}
if b.isTempFile(entity.Name()) {
continue
}
path := filepath.Join(dir, entity.Name())
err = b.iterateRecords(path, prm)
if err != nil {
return err
}
}
return nil
}
func (b *BlobTree) iterateRecords(path string, prm common.IteratePrm) error {
b.fileLock.RLock(path)
defer b.fileLock.RUnlock(path)
records, err := b.readFileContent(path)
if err != nil {
if prm.IgnoreErrors {
return nil
}
return err
}
for _, record := range records {
if prm.LazyHandler != nil {
if err = prm.LazyHandler(record.Address, func() ([]byte, error) {
return record.Data, nil
}); err != nil {
return err
}
continue
}
record.Data, err = b.compressor.Decompress(record.Data)
if err != nil {
if prm.IgnoreErrors {
if prm.ErrorHandler != nil {
return prm.ErrorHandler(record.Address, err)
}
continue
}
return err
}
err = prm.Handler(common.IterationElement{
Address: record.Address,
ObjectData: record.Data,
StorageID: pathToStorageID(path),
})
if err != nil {
return err
}
}
return nil
}