[#1520] shard: Ignore errors on metabase refill
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
78ea450c25
commit
7df50297cd
7 changed files with 108 additions and 27 deletions
|
@ -147,7 +147,7 @@ func TestIterateObjects(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := IterateObjects(blz, func(data []byte) error {
|
err := IterateObjects(blz, func(_ oid.Address, data []byte) error {
|
||||||
v, ok := mObjs[string(data)]
|
v, ok := mObjs[string(data)]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
|
|
||||||
|
|
|
@ -147,11 +147,11 @@ func (b *Blobovnicza) Iterate(prm IteratePrm) (IterateRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateObjects is a helper function which iterates over Blobovnicza and passes binary objects to f.
|
// IterateObjects is a helper function which iterates over Blobovnicza and passes binary objects to f.
|
||||||
func IterateObjects(blz *Blobovnicza, f func([]byte) error) error {
|
func IterateObjects(blz *Blobovnicza, f func(addr oid.Address, data []byte) error) error {
|
||||||
var prm IteratePrm
|
var prm IteratePrm
|
||||||
|
|
||||||
prm.SetHandler(func(elem IterationElement) error {
|
prm.SetHandler(func(elem IterationElement) error {
|
||||||
return f(elem.ObjectData())
|
return f(elem.Address(), elem.ObjectData())
|
||||||
})
|
})
|
||||||
|
|
||||||
_, err := blz.Iterate(prm)
|
_, err := blz.Iterate(prm)
|
||||||
|
|
|
@ -72,6 +72,7 @@ func addressFromString(s string) (*oid.Address, error) {
|
||||||
type IterationPrm struct {
|
type IterationPrm struct {
|
||||||
handler func(addr oid.Address, data []byte) error
|
handler func(addr oid.Address, data []byte) error
|
||||||
ignoreErrors bool
|
ignoreErrors bool
|
||||||
|
errorHandler func(oid.Address, error) error
|
||||||
lazyHandler func(oid.Address, func() ([]byte, error)) error
|
lazyHandler func(oid.Address, func() ([]byte, error)) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,6 +93,11 @@ func (p *IterationPrm) WithIgnoreErrors(ignore bool) {
|
||||||
p.ignoreErrors = ignore
|
p.ignoreErrors = ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithErrorHandler sets error handler for objects that cannot be read or unmarshaled.
|
||||||
|
func (p *IterationPrm) WithErrorHandler(f func(oid.Address, error) error) {
|
||||||
|
p.errorHandler = f
|
||||||
|
}
|
||||||
|
|
||||||
// Iterate iterates over all stored objects.
|
// Iterate iterates over all stored objects.
|
||||||
func (t *FSTree) Iterate(prm IterationPrm) error {
|
func (t *FSTree) Iterate(prm IterationPrm) error {
|
||||||
return t.iterate(0, []string{t.RootPath}, prm)
|
return t.iterate(0, []string{t.RootPath}, prm)
|
||||||
|
@ -141,6 +147,9 @@ func (t *FSTree) iterate(depth int, curPath []string, prm IterationPrm) error {
|
||||||
data, err = os.ReadFile(filepath.Join(curPath...))
|
data, err = os.ReadFile(filepath.Join(curPath...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if prm.ignoreErrors {
|
if prm.ignoreErrors {
|
||||||
|
if prm.errorHandler != nil {
|
||||||
|
return prm.errorHandler(*addr, err)
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -5,14 +5,16 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// IterationElement represents a unit of elements through which Iterate operation passes.
|
// IterationElement represents a unit of elements through which Iterate operation passes.
|
||||||
type IterationElement struct {
|
type IterationElement struct {
|
||||||
data []byte
|
data []byte
|
||||||
|
|
||||||
|
addr oid.Address
|
||||||
|
|
||||||
blzID *blobovnicza.ID
|
blzID *blobovnicza.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,6 +29,11 @@ func (x IterationElement) BlobovniczaID() *blobovnicza.ID {
|
||||||
return x.blzID
|
return x.blzID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Address returns the object address.
|
||||||
|
func (x IterationElement) Address() oid.Address {
|
||||||
|
return x.addr
|
||||||
|
}
|
||||||
|
|
||||||
// IterationHandler is a generic processor of IterationElement.
|
// IterationHandler is a generic processor of IterationElement.
|
||||||
type IterationHandler func(IterationElement) error
|
type IterationHandler func(IterationElement) error
|
||||||
|
|
||||||
|
@ -34,6 +41,7 @@ type IterationHandler func(IterationElement) error
|
||||||
type IteratePrm struct {
|
type IteratePrm struct {
|
||||||
handler IterationHandler
|
handler IterationHandler
|
||||||
ignoreErrors bool
|
ignoreErrors bool
|
||||||
|
errorHandler func(oid.Address, error) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateRes groups the resulting values of Iterate operation.
|
// IterateRes groups the resulting values of Iterate operation.
|
||||||
|
@ -49,6 +57,11 @@ func (i *IteratePrm) IgnoreErrors() {
|
||||||
i.ignoreErrors = true
|
i.ignoreErrors = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetErrorHandler sets error handler for objects that cannot be read or unmarshaled.
|
||||||
|
func (i *IteratePrm) SetErrorHandler(f func(oid.Address, error) error) {
|
||||||
|
i.errorHandler = f
|
||||||
|
}
|
||||||
|
|
||||||
// Iterate traverses the storage over the stored objects and calls the handler
|
// Iterate traverses the storage over the stored objects and calls the handler
|
||||||
// on each element.
|
// on each element.
|
||||||
//
|
//
|
||||||
|
@ -60,18 +73,22 @@ func (b *BlobStor) Iterate(prm IteratePrm) (IterateRes, error) {
|
||||||
var elem IterationElement
|
var elem IterationElement
|
||||||
|
|
||||||
err := b.blobovniczas.iterateBlobovniczas(prm.ignoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
err := b.blobovniczas.iterateBlobovniczas(prm.ignoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||||
err := blobovnicza.IterateObjects(blz, func(data []byte) error {
|
err := blobovnicza.IterateObjects(blz, func(addr oid.Address, data []byte) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// decompress the data
|
// decompress the data
|
||||||
elem.data, err = b.decompressor(data)
|
elem.data, err = b.decompressor(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if prm.ignoreErrors {
|
if prm.ignoreErrors {
|
||||||
|
if prm.errorHandler != nil {
|
||||||
|
return prm.errorHandler(addr, err)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not decompress object data: %w", err)
|
return fmt.Errorf("could not decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
elem.addr = addr
|
||||||
elem.blzID = blobovnicza.NewIDFromBytes([]byte(p))
|
elem.blzID = blobovnicza.NewIDFromBytes([]byte(p))
|
||||||
|
|
||||||
return prm.handler(elem)
|
return prm.handler(elem)
|
||||||
|
@ -90,16 +107,21 @@ func (b *BlobStor) Iterate(prm IteratePrm) (IterateRes, error) {
|
||||||
|
|
||||||
var fsPrm fstree.IterationPrm
|
var fsPrm fstree.IterationPrm
|
||||||
fsPrm.WithIgnoreErrors(prm.ignoreErrors)
|
fsPrm.WithIgnoreErrors(prm.ignoreErrors)
|
||||||
fsPrm.WithHandler(func(_ oid.Address, data []byte) error {
|
fsPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
||||||
// decompress the data
|
// decompress the data
|
||||||
elem.data, err = b.decompressor(data)
|
elem.data, err = b.decompressor(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if prm.ignoreErrors {
|
if prm.ignoreErrors {
|
||||||
|
if prm.errorHandler != nil {
|
||||||
|
return prm.errorHandler(addr, err)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not decompress object data: %w", err)
|
return fmt.Errorf("could not decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
elem.addr = addr
|
||||||
|
|
||||||
return prm.handler(elem)
|
return prm.handler(elem)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -113,31 +135,22 @@ func (b *BlobStor) Iterate(prm IteratePrm) (IterateRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f.
|
// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f.
|
||||||
func IterateBinaryObjects(blz *BlobStor, f func(data []byte, blzID *blobovnicza.ID) error) error {
|
// Errors related to object reading and unmarshaling are logged and skipped.
|
||||||
|
func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, blzID *blobovnicza.ID) error) error {
|
||||||
var prm IteratePrm
|
var prm IteratePrm
|
||||||
|
|
||||||
prm.SetIterationHandler(func(elem IterationElement) error {
|
prm.SetIterationHandler(func(elem IterationElement) error {
|
||||||
return f(elem.ObjectData(), elem.BlobovniczaID())
|
return f(elem.Address(), elem.ObjectData(), elem.BlobovniczaID())
|
||||||
|
})
|
||||||
|
prm.IgnoreErrors()
|
||||||
|
prm.SetErrorHandler(func(addr oid.Address, err error) error {
|
||||||
|
blz.log.Warn("error occurred during the iteration",
|
||||||
|
zap.Stringer("address", addr),
|
||||||
|
zap.String("err", err.Error()))
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
_, err := blz.Iterate(prm)
|
_, err := blz.Iterate(prm)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateObjects is a helper function which iterates over BlobStor and passes decoded objects to f.
|
|
||||||
func IterateObjects(blz *BlobStor, f func(obj *object.Object, blzID *blobovnicza.ID) error) error {
|
|
||||||
var obj *object.Object
|
|
||||||
|
|
||||||
return IterateBinaryObjects(blz, func(data []byte, blzID *blobovnicza.ID) error {
|
|
||||||
if obj == nil {
|
|
||||||
obj = object.New()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
|
||||||
return fmt.Errorf("could not unmarshal the object: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return f(obj, blzID)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
|
@ -76,7 +76,7 @@ func TestIterateObjects(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := IterateBinaryObjects(blobStor, func(data []byte, blzID *blobovnicza.ID) error {
|
err := IterateBinaryObjects(blobStor, func(_ oid.Address, data []byte, blzID *blobovnicza.ID) error {
|
||||||
v, ok := mObjs[string(data)]
|
v, ok := mObjs[string(data)]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Open opens all Shard's components.
|
// Open opens all Shard's components.
|
||||||
|
@ -80,7 +81,16 @@ func (s *Shard) refillMetabase() error {
|
||||||
return fmt.Errorf("could not reset metabase: %w", err)
|
return fmt.Errorf("could not reset metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return blobstor.IterateObjects(s.blobStor, func(obj *objectSDK.Object, blzID *blobovnicza.ID) error {
|
obj := objectSDK.New()
|
||||||
|
|
||||||
|
return blobstor.IterateBinaryObjects(s.blobStor, func(addr oid.Address, data []byte, blzID *blobovnicza.ID) error {
|
||||||
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
|
s.log.Warn("could not unmarshal object",
|
||||||
|
zap.Stringer("address", addr),
|
||||||
|
zap.String("err", err.Error()))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
//nolint: exhaustive
|
//nolint: exhaustive
|
||||||
switch obj.Type() {
|
switch obj.Type() {
|
||||||
case objectSDK.TypeTombstone:
|
case objectSDK.TypeTombstone:
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
|
@ -18,6 +19,54 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestRefillMetabaseCorrupted(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
blobOpts := []blobstor.Option{
|
||||||
|
blobstor.WithRootPath(filepath.Join(dir, "blob")),
|
||||||
|
blobstor.WithShallowDepth(1),
|
||||||
|
blobstor.WithSmallSizeLimit(1),
|
||||||
|
blobstor.WithBlobovniczaShallowWidth(1),
|
||||||
|
blobstor.WithBlobovniczaShallowDepth(1)}
|
||||||
|
|
||||||
|
sh := New(
|
||||||
|
WithBlobStorOptions(blobOpts...),
|
||||||
|
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta"))))
|
||||||
|
require.NoError(t, sh.Open())
|
||||||
|
require.NoError(t, sh.Init())
|
||||||
|
|
||||||
|
obj := objecttest.Object()
|
||||||
|
obj.SetType(objectSDK.TypeRegular)
|
||||||
|
obj.SetPayload([]byte{0, 1, 2, 3, 4, 5})
|
||||||
|
|
||||||
|
var putPrm PutPrm
|
||||||
|
putPrm.WithObject(obj)
|
||||||
|
_, err := sh.Put(putPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, sh.Close())
|
||||||
|
|
||||||
|
addr := object.AddressOf(obj)
|
||||||
|
fs := fstree.FSTree{
|
||||||
|
DirNameLen: 2,
|
||||||
|
Depth: 1,
|
||||||
|
Info: sh.blobStor.DumpInfo(),
|
||||||
|
}
|
||||||
|
require.NoError(t, fs.Put(addr, []byte("not an object")))
|
||||||
|
|
||||||
|
sh = New(
|
||||||
|
WithBlobStorOptions(blobOpts...),
|
||||||
|
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new"))),
|
||||||
|
WithRefillMetabase(true))
|
||||||
|
require.NoError(t, sh.Open())
|
||||||
|
require.NoError(t, sh.Init())
|
||||||
|
|
||||||
|
var getPrm GetPrm
|
||||||
|
getPrm.WithAddress(addr)
|
||||||
|
_, err = sh.Get(getPrm)
|
||||||
|
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
|
||||||
|
require.NoError(t, sh.Close())
|
||||||
|
}
|
||||||
|
|
||||||
func TestRefillMetabase(t *testing.T) {
|
func TestRefillMetabase(t *testing.T) {
|
||||||
p := t.Name()
|
p := t.Name()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue