Refactor storage iterator #807

Merged
dstepanov-yadro merged 2 commits from dstepanov-yadro/frostfs-node:fix/storage_iterator into master 2024-09-04 19:51:04 +00:00
9 changed files with 62 additions and 87 deletions

View file

@ -818,6 +818,7 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
fstree.WithPerm(sRead.perm), fstree.WithPerm(sRead.perm),
fstree.WithDepth(sRead.depth), fstree.WithDepth(sRead.depth),
fstree.WithNoSync(sRead.noSync), fstree.WithNoSync(sRead.noSync),
fstree.WithLogger(c.log),
} }
if c.metricsCollector != nil { if c.metricsCollector != nil {
fstreeOpts = append(fstreeOpts, fstreeOpts = append(fstreeOpts,

View file

@ -6,6 +6,7 @@ import (
"path/filepath" "path/filepath"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
@ -13,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/hrw" "git.frostfs.info/TrueCloudLab/hrw"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
) )
// Iterate iterates over all objects in b. // Iterate iterates over all objects in b.
@ -38,9 +40,11 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
data, err := b.compression.Decompress(elem.ObjectData()) data, err := b.compression.Decompress(elem.ObjectData())
if err != nil { if err != nil {
if prm.IgnoreErrors { if prm.IgnoreErrors {
if prm.ErrorHandler != nil { b.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
return prm.ErrorHandler(elem.Address(), err) zap.Stringer("address", elem.Address()),
} zap.String("err", err.Error()),
zap.String("storage_id", p),
zap.String("root_path", b.rootPath))
return nil return nil
} }
return fmt.Errorf("could not decompress object data: %w", err) return fmt.Errorf("could not decompress object data: %w", err)
@ -53,9 +57,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
StorageID: []byte(p), StorageID: []byte(p),
}) })
} }
return prm.LazyHandler(elem.Address(), func() ([]byte, error) { return nil
return data, err
})
}) })
subPrm.DecodeAddresses() subPrm.DecodeAddresses()
@ -72,6 +74,10 @@ func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors boo
blz, err := shBlz.Open() blz, err := shBlz.Open()
if err != nil { if err != nil {
if ignoreErrors { if ignoreErrors {
b.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
zap.String("err", err.Error()),
zap.String("storage_id", p),
zap.String("root_path", b.rootPath))
return false, nil return false, nil
} }
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err) return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)

View file

@ -15,9 +15,7 @@ type IterationHandler func(IterationElement) error
// IteratePrm groups the parameters of Iterate operation. // IteratePrm groups the parameters of Iterate operation.
type IteratePrm struct { type IteratePrm struct {
Handler IterationHandler Handler IterationHandler
LazyHandler func(oid.Address, func() ([]byte, error)) error
IgnoreErrors bool IgnoreErrors bool
ErrorHandler func(oid.Address, error) error
} }
// IterateRes groups the resulting values of Iterate operation. // IterateRes groups the resulting values of Iterate operation.

View file

@ -14,10 +14,12 @@ import (
"syscall" "syscall"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -25,6 +27,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
) )
type keyLock interface { type keyLock interface {
@ -41,6 +44,8 @@ func (l *noopKeyLock) Unlock(string) {}
type FSTree struct { type FSTree struct {
Info Info
log *logger.Logger
*compression.Config *compression.Config
Depth uint64 Depth uint64
DirNameLen int DirNameLen int
@ -86,6 +91,7 @@ func New(opts ...Option) *FSTree {
metrics: &noopMetrics{}, metrics: &noopMetrics{},
fileGuard: &noopKeyLock{}, fileGuard: &noopKeyLock{},
fileCounter: &noopCounter{}, fileCounter: &noopCounter{},
log: &logger.Logger{Logger: zap.L()},
} }
for i := range opts { for i := range opts {
opts[i](f) opts[i](f)
@ -145,9 +151,13 @@ func (t *FSTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.Ite
func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error { func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error {
curName := strings.Join(curPath[1:], "") curName := strings.Join(curPath[1:], "")
des, err := os.ReadDir(filepath.Join(curPath...)) dirPath := filepath.Join(curPath...)
des, err := os.ReadDir(dirPath)
if err != nil { if err != nil {
if prm.IgnoreErrors { if prm.IgnoreErrors {
t.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
zap.String("err", err.Error()),
zap.String("directory_path", dirPath))
return nil return nil
} }
return err return err
@ -182,37 +192,31 @@ func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, pr
if err != nil { if err != nil {
continue continue
} }
path := filepath.Join(curPath...)
data, err := os.ReadFile(filepath.Join(curPath...)) data, err := os.ReadFile(path)
if err != nil && os.IsNotExist(err) { if err != nil && os.IsNotExist(err) {
continue continue
} }
if prm.LazyHandler != nil { if err == nil {
err = prm.LazyHandler(addr, func() ([]byte, error) { data, err = t.Decompress(data)
return data, err }
}) if err != nil {
} else { if prm.IgnoreErrors {
if err == nil { t.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
data, err = t.Decompress(data) zap.Stringer("address", addr),
zap.String("err", err.Error()),
zap.String("path", path))
continue
} }
if err != nil { return err
if prm.IgnoreErrors {
if prm.ErrorHandler != nil {
return prm.ErrorHandler(addr, err)
}
continue
}
return err
}
err = prm.Handler(common.IterationElement{
Address: addr,
ObjectData: data,
StorageID: []byte{},
})
} }
err = prm.Handler(common.IterationElement{
Address: addr,
ObjectData: data,
StorageID: []byte{},
})
if err != nil { if err != nil {
return err return err
} }

View file

@ -3,7 +3,9 @@ package fstree
import ( import (
"io/fs" "io/fs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
"go.uber.org/zap"
) )
type Option func(*FSTree) type Option func(*FSTree)
@ -51,3 +53,9 @@ func WithFileCounter(c FileCounter) Option {
f.fileGuard = utilSync.NewKeyLocker[string]() f.fileGuard = utilSync.NewKeyLocker[string]()
} }
} }
func WithLogger(l *logger.Logger) Option {
return func(f *FSTree) {
f.log = &logger.Logger{Logger: l.With(zap.String("component", "FSTree"))}
}
}

View file

@ -6,7 +6,6 @@ import (
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -30,8 +29,6 @@ func TestIterate(t *testing.T, cons Constructor, min, max uint64) {
runTestNormalHandler(t, s, objects) runTestNormalHandler(t, s, objects)
runTestLazyHandler(t, s, objects)
runTestIgnoreLogicalErrors(t, s, objects) runTestIgnoreLogicalErrors(t, s, objects)
} }
@ -62,30 +59,6 @@ func runTestNormalHandler(t *testing.T, s common.Storage, objects []objectDesc)
}) })
} }
func runTestLazyHandler(t *testing.T, s common.Storage, objects []objectDesc) {
t.Run("lazy handler", func(t *testing.T) {
seen := make(map[string]func() ([]byte, error))
var iterPrm common.IteratePrm
iterPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
seen[addr.String()] = f
return nil
}
_, err := s.Iterate(context.Background(), iterPrm)
require.NoError(t, err)
require.Equal(t, len(objects), len(seen))
for i := range objects {
f, ok := seen[objects[i].addr.String()]
require.True(t, ok)
data, err := f()
require.NoError(t, err)
require.Equal(t, objects[i].raw, data)
}
})
}
func runTestIgnoreLogicalErrors(t *testing.T, s common.Storage, objects []objectDesc) { func runTestIgnoreLogicalErrors(t *testing.T, s common.Storage, objects []objectDesc) {
t.Run("ignore errors doesn't work for logical errors", func(t *testing.T) { t.Run("ignore errors doesn't work for logical errors", func(t *testing.T) {
seen := make(map[string]objectDesc) seen := make(map[string]objectDesc)

View file

@ -40,7 +40,14 @@ func (b *BlobStor) Iterate(ctx context.Context, prm common.IteratePrm) (common.I
for i := range b.storage { for i := range b.storage {
_, err := b.storage[i].Storage.Iterate(ctx, prm) _, err := b.storage[i].Storage.Iterate(ctx, prm)
if err != nil && !prm.IgnoreErrors { if err != nil {
if prm.IgnoreErrors {
b.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
zap.String("storage_path", b.storage[i].Storage.Path()),
zap.String("storage_type", b.storage[i].Storage.Type()),
zap.String("err", err.Error()))
continue
}
return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err) return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err)
} }
} }
@ -57,12 +64,6 @@ func IterateBinaryObjects(ctx context.Context, blz *BlobStor, f func(addr oid.Ad
return f(elem.Address, elem.ObjectData, elem.StorageID) return f(elem.Address, elem.ObjectData, elem.StorageID)
} }
prm.IgnoreErrors = true prm.IgnoreErrors = true
prm.ErrorHandler = func(addr oid.Address, err error) error {
blz.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
zap.Stringer("address", addr),
zap.String("err", err.Error()))
return nil
}
_, err := blz.Iterate(ctx, prm) _, err := blz.Iterate(ctx, prm)

View file

@ -142,9 +142,6 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
var err error var err error
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil { if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {
if req.IgnoreErrors { if req.IgnoreErrors {
if req.ErrorHandler != nil {
return common.IterateRes{}, req.ErrorHandler(elem.Address, err)
}
continue continue
} }
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decompressing data for address %q: %v", s, elem.Address.String(), err)) return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decompressing data for address %q: %v", s, elem.Address.String(), err))
@ -154,10 +151,6 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
if err := req.Handler(elem); err != nil { if err := req.Handler(elem); err != nil {
return common.IterateRes{}, err return common.IterateRes{}, err
} }
case req.LazyHandler != nil:
if err := req.LazyHandler(elem.Address, func() ([]byte, error) { return elem.ObjectData, nil }); err != nil {
return common.IterateRes{}, err
}
default: default:
if !req.IgnoreErrors { if !req.IgnoreErrors {
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) no Handler or LazyHandler set for IteratePrm", s)) return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) no Handler or LazyHandler set for IteratePrm", s))

View file

@ -180,20 +180,11 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
var prm common.IteratePrm var prm common.IteratePrm
prm.IgnoreErrors = ignoreErrors prm.IgnoreErrors = ignoreErrors
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { prm.Handler = func(e common.IterationElement) error {
sAddr := addr.EncodeToString() sAddr := e.Address.EncodeToString()
data, err := f()
if err != nil {
c.reportFlushError(logs.FSTreeCantReadFile, sAddr, metaerr.Wrap(err))
if ignoreErrors {
return nil
}
return err
}
var obj objectSDK.Object var obj objectSDK.Object
err = obj.Unmarshal(data) err := obj.Unmarshal(e.ObjectData)
if err != nil { if err != nil {
c.reportFlushError(logs.FSTreeCantUnmarshalObject, sAddr, metaerr.Wrap(err)) c.reportFlushError(logs.FSTreeCantUnmarshalObject, sAddr, metaerr.Wrap(err))
if ignoreErrors { if ignoreErrors {
@ -202,7 +193,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
err = c.flushObject(ctx, &obj, data, writecache.StorageTypeFSTree) err = c.flushObject(ctx, &obj, e.ObjectData, writecache.StorageTypeFSTree)
if err != nil { if err != nil {
if ignoreErrors { if ignoreErrors {
return nil return nil