From 7df50297cd9122242b2bff2f7f85a6604083e8f8 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 21 Jun 2022 10:52:03 +0300 Subject: [PATCH] [#1520] shard: Ignore errors on metabase refill Signed-off-by: Evgenii Stratonikov --- .../blobovnicza/blobovnicza_test.go | 2 +- .../blobovnicza/iterate.go | 4 +- .../blobstor/fstree/fstree.go | 9 +++ pkg/local_object_storage/blobstor/iterate.go | 57 ++++++++++++------- .../blobstor/iterate_test.go | 2 +- pkg/local_object_storage/shard/control.go | 12 +++- .../shard/control_test.go | 49 ++++++++++++++++ 7 files changed, 108 insertions(+), 27 deletions(-) diff --git a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go index 37c8b5eb..1c077002 100644 --- a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go +++ b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go @@ -147,7 +147,7 @@ func TestIterateObjects(t *testing.T) { require.NoError(t, err) } - err := IterateObjects(blz, func(data []byte) error { + err := IterateObjects(blz, func(_ oid.Address, data []byte) error { v, ok := mObjs[string(data)] require.True(t, ok) diff --git a/pkg/local_object_storage/blobovnicza/iterate.go b/pkg/local_object_storage/blobovnicza/iterate.go index 75b0cd04..5a622f2f 100644 --- a/pkg/local_object_storage/blobovnicza/iterate.go +++ b/pkg/local_object_storage/blobovnicza/iterate.go @@ -147,11 +147,11 @@ func (b *Blobovnicza) Iterate(prm IteratePrm) (IterateRes, error) { } // IterateObjects is a helper function which iterates over Blobovnicza and passes binary objects to f. -func IterateObjects(blz *Blobovnicza, f func([]byte) error) error { +func IterateObjects(blz *Blobovnicza, f func(addr oid.Address, data []byte) error) error { var prm IteratePrm prm.SetHandler(func(elem IterationElement) error { - return f(elem.ObjectData()) + return f(elem.Address(), elem.ObjectData()) }) _, err := blz.Iterate(prm) diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index bedee924..ef5fefa6 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -72,6 +72,7 @@ func addressFromString(s string) (*oid.Address, error) { type IterationPrm struct { handler func(addr oid.Address, data []byte) error ignoreErrors bool + errorHandler func(oid.Address, error) error lazyHandler func(oid.Address, func() ([]byte, error)) error } @@ -92,6 +93,11 @@ func (p *IterationPrm) WithIgnoreErrors(ignore bool) { p.ignoreErrors = ignore } +// WithErrorHandler sets error handler for objects that cannot be read or unmarshaled. +func (p *IterationPrm) WithErrorHandler(f func(oid.Address, error) error) { + p.errorHandler = f +} + // Iterate iterates over all stored objects. func (t *FSTree) Iterate(prm IterationPrm) error { return t.iterate(0, []string{t.RootPath}, prm) @@ -141,6 +147,9 @@ func (t *FSTree) iterate(depth int, curPath []string, prm IterationPrm) error { data, err = os.ReadFile(filepath.Join(curPath...)) if err != nil { if prm.ignoreErrors { + if prm.errorHandler != nil { + return prm.errorHandler(*addr, err) + } continue } return err diff --git a/pkg/local_object_storage/blobstor/iterate.go b/pkg/local_object_storage/blobstor/iterate.go index 1305199d..70c42538 100644 --- a/pkg/local_object_storage/blobstor/iterate.go +++ b/pkg/local_object_storage/blobstor/iterate.go @@ -5,14 +5,16 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" - "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" ) // IterationElement represents a unit of elements through which Iterate operation passes. type IterationElement struct { data []byte + addr oid.Address + blzID *blobovnicza.ID } @@ -27,6 +29,11 @@ func (x IterationElement) BlobovniczaID() *blobovnicza.ID { return x.blzID } +// Address returns the object address. +func (x IterationElement) Address() oid.Address { + return x.addr +} + // IterationHandler is a generic processor of IterationElement. type IterationHandler func(IterationElement) error @@ -34,6 +41,7 @@ type IterationHandler func(IterationElement) error type IteratePrm struct { handler IterationHandler ignoreErrors bool + errorHandler func(oid.Address, error) error } // IterateRes groups the resulting values of Iterate operation. @@ -49,6 +57,11 @@ func (i *IteratePrm) IgnoreErrors() { i.ignoreErrors = true } +// SetErrorHandler sets error handler for objects that cannot be read or unmarshaled. +func (i *IteratePrm) SetErrorHandler(f func(oid.Address, error) error) { + i.errorHandler = f +} + // Iterate traverses the storage over the stored objects and calls the handler // on each element. // @@ -60,18 +73,22 @@ func (b *BlobStor) Iterate(prm IteratePrm) (IterateRes, error) { var elem IterationElement err := b.blobovniczas.iterateBlobovniczas(prm.ignoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error { - err := blobovnicza.IterateObjects(blz, func(data []byte) error { + err := blobovnicza.IterateObjects(blz, func(addr oid.Address, data []byte) error { var err error // decompress the data elem.data, err = b.decompressor(data) if err != nil { if prm.ignoreErrors { + if prm.errorHandler != nil { + return prm.errorHandler(addr, err) + } return nil } return fmt.Errorf("could not decompress object data: %w", err) } + elem.addr = addr elem.blzID = blobovnicza.NewIDFromBytes([]byte(p)) return prm.handler(elem) @@ -90,16 +107,21 @@ func (b *BlobStor) Iterate(prm IteratePrm) (IterateRes, error) { var fsPrm fstree.IterationPrm fsPrm.WithIgnoreErrors(prm.ignoreErrors) - fsPrm.WithHandler(func(_ oid.Address, data []byte) error { + fsPrm.WithHandler(func(addr oid.Address, data []byte) error { // decompress the data elem.data, err = b.decompressor(data) if err != nil { if prm.ignoreErrors { + if prm.errorHandler != nil { + return prm.errorHandler(addr, err) + } return nil } return fmt.Errorf("could not decompress object data: %w", err) } + elem.addr = addr + return prm.handler(elem) }) @@ -113,31 +135,22 @@ func (b *BlobStor) Iterate(prm IteratePrm) (IterateRes, error) { } // IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f. -func IterateBinaryObjects(blz *BlobStor, f func(data []byte, blzID *blobovnicza.ID) error) error { +// Errors related to object reading and unmarshaling are logged and skipped. +func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, blzID *blobovnicza.ID) error) error { var prm IteratePrm prm.SetIterationHandler(func(elem IterationElement) error { - return f(elem.ObjectData(), elem.BlobovniczaID()) + return f(elem.Address(), elem.ObjectData(), elem.BlobovniczaID()) + }) + prm.IgnoreErrors() + prm.SetErrorHandler(func(addr oid.Address, err error) error { + blz.log.Warn("error occurred during the iteration", + zap.Stringer("address", addr), + zap.String("err", err.Error())) + return nil }) _, err := blz.Iterate(prm) return err } - -// IterateObjects is a helper function which iterates over BlobStor and passes decoded objects to f. -func IterateObjects(blz *BlobStor, f func(obj *object.Object, blzID *blobovnicza.ID) error) error { - var obj *object.Object - - return IterateBinaryObjects(blz, func(data []byte, blzID *blobovnicza.ID) error { - if obj == nil { - obj = object.New() - } - - if err := obj.Unmarshal(data); err != nil { - return fmt.Errorf("could not unmarshal the object: %w", err) - } - - return f(obj, blzID) - }) -} diff --git a/pkg/local_object_storage/blobstor/iterate_test.go b/pkg/local_object_storage/blobstor/iterate_test.go index fb26080f..cea45651 100644 --- a/pkg/local_object_storage/blobstor/iterate_test.go +++ b/pkg/local_object_storage/blobstor/iterate_test.go @@ -76,7 +76,7 @@ func TestIterateObjects(t *testing.T) { require.NoError(t, err) } - err := IterateBinaryObjects(blobStor, func(data []byte, blzID *blobovnicza.ID) error { + err := IterateBinaryObjects(blobStor, func(_ oid.Address, data []byte, blzID *blobovnicza.ID) error { v, ok := mObjs[string(data)] require.True(t, ok) diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 9ccdbb2a..1219fc81 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -9,6 +9,7 @@ import ( meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" ) // Open opens all Shard's components. @@ -80,7 +81,16 @@ func (s *Shard) refillMetabase() error { return fmt.Errorf("could not reset metabase: %w", err) } - return blobstor.IterateObjects(s.blobStor, func(obj *objectSDK.Object, blzID *blobovnicza.ID) error { + obj := objectSDK.New() + + return blobstor.IterateBinaryObjects(s.blobStor, func(addr oid.Address, data []byte, blzID *blobovnicza.ID) error { + if err := obj.Unmarshal(data); err != nil { + s.log.Warn("could not unmarshal object", + zap.Stringer("address", addr), + zap.String("err", err.Error())) + return nil + } + //nolint: exhaustive switch obj.Type() { case objectSDK.TypeTombstone: diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index f47c968c..046a7dec 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -18,6 +19,54 @@ import ( "github.com/stretchr/testify/require" ) +func TestRefillMetabaseCorrupted(t *testing.T) { + dir := t.TempDir() + + blobOpts := []blobstor.Option{ + blobstor.WithRootPath(filepath.Join(dir, "blob")), + blobstor.WithShallowDepth(1), + blobstor.WithSmallSizeLimit(1), + blobstor.WithBlobovniczaShallowWidth(1), + blobstor.WithBlobovniczaShallowDepth(1)} + + sh := New( + WithBlobStorOptions(blobOpts...), + WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")))) + require.NoError(t, sh.Open()) + require.NoError(t, sh.Init()) + + obj := objecttest.Object() + obj.SetType(objectSDK.TypeRegular) + obj.SetPayload([]byte{0, 1, 2, 3, 4, 5}) + + var putPrm PutPrm + putPrm.WithObject(obj) + _, err := sh.Put(putPrm) + require.NoError(t, err) + require.NoError(t, sh.Close()) + + addr := object.AddressOf(obj) + fs := fstree.FSTree{ + DirNameLen: 2, + Depth: 1, + Info: sh.blobStor.DumpInfo(), + } + require.NoError(t, fs.Put(addr, []byte("not an object"))) + + sh = New( + WithBlobStorOptions(blobOpts...), + WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new"))), + WithRefillMetabase(true)) + require.NoError(t, sh.Open()) + require.NoError(t, sh.Init()) + + var getPrm GetPrm + getPrm.WithAddress(addr) + _, err = sh.Get(getPrm) + require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) + require.NoError(t, sh.Close()) +} + func TestRefillMetabase(t *testing.T) { p := t.Name()