Refactor storage iterator #807
9 changed files with 62 additions and 87 deletions
|
@ -818,6 +818,7 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
||||||
fstree.WithPerm(sRead.perm),
|
fstree.WithPerm(sRead.perm),
|
||||||
fstree.WithDepth(sRead.depth),
|
fstree.WithDepth(sRead.depth),
|
||||||
fstree.WithNoSync(sRead.noSync),
|
fstree.WithNoSync(sRead.noSync),
|
||||||
|
fstree.WithLogger(c.log),
|
||||||
}
|
}
|
||||||
if c.metricsCollector != nil {
|
if c.metricsCollector != nil {
|
||||||
fstreeOpts = append(fstreeOpts,
|
fstreeOpts = append(fstreeOpts,
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
@ -13,6 +14,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/hrw"
|
"git.frostfs.info/TrueCloudLab/hrw"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Iterate iterates over all objects in b.
|
// Iterate iterates over all objects in b.
|
||||||
|
@ -38,9 +40,11 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
|
||||||
data, err := b.compression.Decompress(elem.ObjectData())
|
data, err := b.compression.Decompress(elem.ObjectData())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if prm.IgnoreErrors {
|
if prm.IgnoreErrors {
|
||||||
if prm.ErrorHandler != nil {
|
b.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
|
||||||
return prm.ErrorHandler(elem.Address(), err)
|
zap.Stringer("address", elem.Address()),
|
||||||
}
|
zap.String("err", err.Error()),
|
||||||
|
zap.String("storage_id", p),
|
||||||
|
zap.String("root_path", b.rootPath))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not decompress object data: %w", err)
|
return fmt.Errorf("could not decompress object data: %w", err)
|
||||||
|
@ -53,9 +57,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
|
||||||
StorageID: []byte(p),
|
StorageID: []byte(p),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return prm.LazyHandler(elem.Address(), func() ([]byte, error) {
|
return nil
|
||||||
return data, err
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
subPrm.DecodeAddresses()
|
subPrm.DecodeAddresses()
|
||||||
|
|
||||||
|
@ -72,6 +74,10 @@ func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors boo
|
||||||
blz, err := shBlz.Open()
|
blz, err := shBlz.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
|
b.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
|
||||||
|
zap.String("err", err.Error()),
|
||||||
|
zap.String("storage_id", p),
|
||||||
|
zap.String("root_path", b.rootPath))
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
||||||
|
|
|
@ -15,9 +15,7 @@ type IterationHandler func(IterationElement) error
|
||||||
// IteratePrm groups the parameters of Iterate operation.
|
// IteratePrm groups the parameters of Iterate operation.
|
||||||
type IteratePrm struct {
|
type IteratePrm struct {
|
||||||
Handler IterationHandler
|
Handler IterationHandler
|
||||||
LazyHandler func(oid.Address, func() ([]byte, error)) error
|
|
||||||
IgnoreErrors bool
|
IgnoreErrors bool
|
||||||
ErrorHandler func(oid.Address, error) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateRes groups the resulting values of Iterate operation.
|
// IterateRes groups the resulting values of Iterate operation.
|
||||||
|
|
|
@ -14,10 +14,12 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
@ -25,6 +27,7 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type keyLock interface {
|
type keyLock interface {
|
||||||
|
@ -41,6 +44,8 @@ func (l *noopKeyLock) Unlock(string) {}
|
||||||
type FSTree struct {
|
type FSTree struct {
|
||||||
Info
|
Info
|
||||||
|
|
||||||
|
log *logger.Logger
|
||||||
|
|
||||||
*compression.Config
|
*compression.Config
|
||||||
Depth uint64
|
Depth uint64
|
||||||
DirNameLen int
|
DirNameLen int
|
||||||
|
@ -86,6 +91,7 @@ func New(opts ...Option) *FSTree {
|
||||||
metrics: &noopMetrics{},
|
metrics: &noopMetrics{},
|
||||||
fileGuard: &noopKeyLock{},
|
fileGuard: &noopKeyLock{},
|
||||||
fileCounter: &noopCounter{},
|
fileCounter: &noopCounter{},
|
||||||
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
}
|
}
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](f)
|
opts[i](f)
|
||||||
|
@ -145,9 +151,13 @@ func (t *FSTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.Ite
|
||||||
|
|
||||||
func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error {
|
func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error {
|
||||||
curName := strings.Join(curPath[1:], "")
|
curName := strings.Join(curPath[1:], "")
|
||||||
des, err := os.ReadDir(filepath.Join(curPath...))
|
dirPath := filepath.Join(curPath...)
|
||||||
|
des, err := os.ReadDir(dirPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if prm.IgnoreErrors {
|
if prm.IgnoreErrors {
|
||||||
|
t.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
|
||||||
|
zap.String("err", err.Error()),
|
||||||
|
zap.String("directory_path", dirPath))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -182,37 +192,31 @@ func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, pr
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
path := filepath.Join(curPath...)
|
||||||
data, err := os.ReadFile(filepath.Join(curPath...))
|
data, err := os.ReadFile(path)
|
||||||
if err != nil && os.IsNotExist(err) {
|
if err != nil && os.IsNotExist(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.LazyHandler != nil {
|
if err == nil {
|
||||||
err = prm.LazyHandler(addr, func() ([]byte, error) {
|
data, err = t.Decompress(data)
|
||||||
return data, err
|
}
|
||||||
})
|
if err != nil {
|
||||||
} else {
|
if prm.IgnoreErrors {
|
||||||
if err == nil {
|
t.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
|
||||||
data, err = t.Decompress(data)
|
zap.Stringer("address", addr),
|
||||||
|
zap.String("err", err.Error()),
|
||||||
|
zap.String("path", path))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
if err != nil {
|
return err
|
||||||
if prm.IgnoreErrors {
|
|
||||||
if prm.ErrorHandler != nil {
|
|
||||||
return prm.ErrorHandler(addr, err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = prm.Handler(common.IterationElement{
|
|
||||||
Address: addr,
|
|
||||||
ObjectData: data,
|
|
||||||
StorageID: []byte{},
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = prm.Handler(common.IterationElement{
|
||||||
|
Address: addr,
|
||||||
|
ObjectData: data,
|
||||||
|
StorageID: []byte{},
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,9 @@ package fstree
|
||||||
import (
|
import (
|
||||||
"io/fs"
|
"io/fs"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Option func(*FSTree)
|
type Option func(*FSTree)
|
||||||
|
@ -51,3 +53,9 @@ func WithFileCounter(c FileCounter) Option {
|
||||||
f.fileGuard = utilSync.NewKeyLocker[string]()
|
f.fileGuard = utilSync.NewKeyLocker[string]()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithLogger(l *logger.Logger) Option {
|
||||||
|
return func(f *FSTree) {
|
||||||
|
f.log = &logger.Logger{Logger: l.With(zap.String("component", "FSTree"))}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -30,8 +29,6 @@ func TestIterate(t *testing.T, cons Constructor, min, max uint64) {
|
||||||
|
|
||||||
runTestNormalHandler(t, s, objects)
|
runTestNormalHandler(t, s, objects)
|
||||||
|
|
||||||
runTestLazyHandler(t, s, objects)
|
|
||||||
|
|
||||||
runTestIgnoreLogicalErrors(t, s, objects)
|
runTestIgnoreLogicalErrors(t, s, objects)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,30 +59,6 @@ func runTestNormalHandler(t *testing.T, s common.Storage, objects []objectDesc)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func runTestLazyHandler(t *testing.T, s common.Storage, objects []objectDesc) {
|
|
||||||
t.Run("lazy handler", func(t *testing.T) {
|
|
||||||
seen := make(map[string]func() ([]byte, error))
|
|
||||||
|
|
||||||
var iterPrm common.IteratePrm
|
|
||||||
iterPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
|
||||||
seen[addr.String()] = f
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := s.Iterate(context.Background(), iterPrm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, len(objects), len(seen))
|
|
||||||
for i := range objects {
|
|
||||||
f, ok := seen[objects[i].addr.String()]
|
|
||||||
require.True(t, ok)
|
|
||||||
|
|
||||||
data, err := f()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, objects[i].raw, data)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func runTestIgnoreLogicalErrors(t *testing.T, s common.Storage, objects []objectDesc) {
|
func runTestIgnoreLogicalErrors(t *testing.T, s common.Storage, objects []objectDesc) {
|
||||||
t.Run("ignore errors doesn't work for logical errors", func(t *testing.T) {
|
t.Run("ignore errors doesn't work for logical errors", func(t *testing.T) {
|
||||||
seen := make(map[string]objectDesc)
|
seen := make(map[string]objectDesc)
|
||||||
|
|
|
@ -40,7 +40,14 @@ func (b *BlobStor) Iterate(ctx context.Context, prm common.IteratePrm) (common.I
|
||||||
|
|
||||||
for i := range b.storage {
|
for i := range b.storage {
|
||||||
_, err := b.storage[i].Storage.Iterate(ctx, prm)
|
_, err := b.storage[i].Storage.Iterate(ctx, prm)
|
||||||
if err != nil && !prm.IgnoreErrors {
|
if err != nil {
|
||||||
|
if prm.IgnoreErrors {
|
||||||
|
b.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
|
||||||
|
zap.String("storage_path", b.storage[i].Storage.Path()),
|
||||||
|
zap.String("storage_type", b.storage[i].Storage.Type()),
|
||||||
|
zap.String("err", err.Error()))
|
||||||
|
continue
|
||||||
|
}
|
||||||
return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err)
|
return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,12 +64,6 @@ func IterateBinaryObjects(ctx context.Context, blz *BlobStor, f func(addr oid.Ad
|
||||||
return f(elem.Address, elem.ObjectData, elem.StorageID)
|
return f(elem.Address, elem.ObjectData, elem.StorageID)
|
||||||
}
|
}
|
||||||
prm.IgnoreErrors = true
|
prm.IgnoreErrors = true
|
||||||
prm.ErrorHandler = func(addr oid.Address, err error) error {
|
|
||||||
blz.log.Warn(logs.BlobstorErrorOccurredDuringTheIteration,
|
|
||||||
zap.Stringer("address", addr),
|
|
||||||
zap.String("err", err.Error()))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := blz.Iterate(ctx, prm)
|
_, err := blz.Iterate(ctx, prm)
|
||||||
|
|
||||||
|
|
|
@ -142,9 +142,6 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
|
||||||
var err error
|
var err error
|
||||||
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {
|
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {
|
||||||
if req.IgnoreErrors {
|
if req.IgnoreErrors {
|
||||||
if req.ErrorHandler != nil {
|
|
||||||
return common.IterateRes{}, req.ErrorHandler(elem.Address, err)
|
|
||||||
}
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decompressing data for address %q: %v", s, elem.Address.String(), err))
|
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decompressing data for address %q: %v", s, elem.Address.String(), err))
|
||||||
|
@ -154,10 +151,6 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
|
||||||
if err := req.Handler(elem); err != nil {
|
if err := req.Handler(elem); err != nil {
|
||||||
return common.IterateRes{}, err
|
return common.IterateRes{}, err
|
||||||
}
|
}
|
||||||
case req.LazyHandler != nil:
|
|
||||||
if err := req.LazyHandler(elem.Address, func() ([]byte, error) { return elem.ObjectData, nil }); err != nil {
|
|
||||||
return common.IterateRes{}, err
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
if !req.IgnoreErrors {
|
if !req.IgnoreErrors {
|
||||||
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) no Handler or LazyHandler set for IteratePrm", s))
|
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) no Handler or LazyHandler set for IteratePrm", s))
|
||||||
|
|
|
@ -180,20 +180,11 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
|
||||||
func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
var prm common.IteratePrm
|
var prm common.IteratePrm
|
||||||
prm.IgnoreErrors = ignoreErrors
|
prm.IgnoreErrors = ignoreErrors
|
||||||
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
prm.Handler = func(e common.IterationElement) error {
|
||||||
sAddr := addr.EncodeToString()
|
sAddr := e.Address.EncodeToString()
|
||||||
|
|
||||||
data, err := f()
|
|
||||||
if err != nil {
|
|
||||||
c.reportFlushError(logs.FSTreeCantReadFile, sAddr, metaerr.Wrap(err))
|
|
||||||
if ignoreErrors {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var obj objectSDK.Object
|
var obj objectSDK.Object
|
||||||
err = obj.Unmarshal(data)
|
err := obj.Unmarshal(e.ObjectData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.reportFlushError(logs.FSTreeCantUnmarshalObject, sAddr, metaerr.Wrap(err))
|
c.reportFlushError(logs.FSTreeCantUnmarshalObject, sAddr, metaerr.Wrap(err))
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
|
@ -202,7 +193,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.flushObject(ctx, &obj, data, writecache.StorageTypeFSTree)
|
err = c.flushObject(ctx, &obj, e.ObjectData, writecache.StorageTypeFSTree)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Add table
Reference in a new issue