frostfs-node/pkg/local_object_storage/blobstor/blobtree/iterate.go
Dmitrii Stepanov fba369ec34 [#645] blobtree: Add metrics
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-11-29 19:34:22 +03:00

106 lines
2.1 KiB
Go

package blobtree
import (
"context"
"encoding/binary"
"os"
"path/filepath"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
)
func (b *BlobTree) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) {
var (
startedAt = time.Now()
err error
)
defer func() {
b.cfg.metrics.Iterate(time.Since(startedAt), err == nil)
}()
err = b.iterateDir(b.cfg.rootPath, 0, prm)
return common.IterateRes{}, err
}
func (b *BlobTree) iterateDir(dir string, depth uint64, prm common.IteratePrm) error {
entities, err := os.ReadDir(dir)
if err != nil {
if prm.IgnoreErrors {
return nil
}
return err
}
for _, entity := range entities {
if depth < b.cfg.depth && entity.IsDir() {
err := b.iterateDir(filepath.Join(dir, entity.Name()), depth+1, prm)
if err != nil {
return err
}
}
if depth != b.cfg.depth {
continue
}
if b.isTempFile(entity.Name()) {
continue
}
idx, err := b.parseIdx(entity.Name())
if err != nil {
continue
}
path := b.getFilePath(dir, idx)
err = b.iterateRecords(idx, path, prm)
if err != nil {
return err
}
}
return nil
}
func (b *BlobTree) iterateRecords(idx uint64, path string, prm common.IteratePrm) error {
b.fileLock.RLock(path)
defer b.fileLock.RUnlock(path)
records, err := b.readFileContent(path)
if err != nil {
if prm.IgnoreErrors {
return nil
}
return err
}
for _, record := range records {
if prm.LazyHandler != nil {
if err = prm.LazyHandler(record.Address, func() ([]byte, error) {
return record.Data, nil
}); err != nil {
return err
}
continue
}
record.Data, err = b.compressor.Decompress(record.Data)
if err != nil {
if prm.IgnoreErrors {
if prm.ErrorHandler != nil {
return prm.ErrorHandler(record.Address, err)
}
continue
}
return err
}
storageID := make([]byte, storageIDLength)
binary.LittleEndian.PutUint64(storageID, idx)
err = prm.Handler(common.IterationElement{
Address: record.Address,
ObjectData: record.Data,
StorageID: storageID,
})
if err != nil {
return err
}
}
return nil
}