From cd75638ce30e10fda10df5880add70e86406e4c6 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 20 Jan 2022 19:32:49 +0300 Subject: [PATCH] [#1085] blobstor: allow to ignore errors during iteration Signed-off-by: Evgenii Stratonikov --- .../blobstor/blobovnicza.go | 7 +- pkg/local_object_storage/blobstor/iterate.go | 19 ++- .../blobstor/iterate_test.go | 113 ++++++++++++++++++ 3 files changed, 134 insertions(+), 5 deletions(-) diff --git a/pkg/local_object_storage/blobstor/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovnicza.go index 2e8aad74b..c05accc26 100644 --- a/pkg/local_object_storage/blobstor/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovnicza.go @@ -631,10 +631,13 @@ func (b *blobovniczas) iterateLeaves(f func(string) (bool, error)) error { } // iterator over all blobovniczas in unsorted order. Break on f's error return. -func (b *blobovniczas) iterateBlobovniczas(f func(string, *blobovnicza.Blobovnicza) error) error { +func (b *blobovniczas) iterateBlobovniczas(ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error { return b.iterateLeaves(func(p string) (bool, error) { blz, err := b.openBlobovnicza(p) if err != nil { + if ignoreErrors { + return false, nil + } return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err) } @@ -811,7 +814,7 @@ func (b *blobovniczas) init() error { return zstdD(data) } - return b.iterateBlobovniczas(func(p string, blz *blobovnicza.Blobovnicza) error { + return b.iterateBlobovniczas(false, func(p string, blz *blobovnicza.Blobovnicza) error { if err := blz.Init(); err != nil { return fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err) } diff --git a/pkg/local_object_storage/blobstor/iterate.go b/pkg/local_object_storage/blobstor/iterate.go index 4b8c1bcfe..c0898f5db 100644 --- a/pkg/local_object_storage/blobstor/iterate.go +++ b/pkg/local_object_storage/blobstor/iterate.go @@ -32,7 +32,8 @@ type IterationHandler func(IterationElement) error // IteratePrm groups the parameters of Iterate operation. type IteratePrm struct { - handler IterationHandler + handler IterationHandler + ignoreErrors bool } // IterateRes groups resulting values of Iterate operation. @@ -43,6 +44,11 @@ func (i *IteratePrm) SetIterationHandler(h IterationHandler) { i.handler = h } +// IgnoreErrors sets the flag signifying whether errors should be ignored. +func (i *IteratePrm) IgnoreErrors() { + i.ignoreErrors = true +} + // Iterate traverses the storage over the stored objects and calls the handler // on each element. // @@ -53,13 +59,16 @@ func (i *IteratePrm) SetIterationHandler(h IterationHandler) { func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) { var elem IterationElement - err := b.blobovniczas.iterateBlobovniczas(func(p string, blz *blobovnicza.Blobovnicza) error { + err := b.blobovniczas.iterateBlobovniczas(prm.ignoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error { err := blobovnicza.IterateObjects(blz, func(data []byte) error { var err error // decompress the data elem.data, err = b.decompressor(data) if err != nil { + if prm.ignoreErrors { + return nil + } return fmt.Errorf("could not decompress object data: %w", err) } @@ -83,11 +92,15 @@ func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) { // decompress the data elem.data, err = b.decompressor(data) if err != nil { + if prm.ignoreErrors { + return nil + } return fmt.Errorf("could not decompress object data: %w", err) } return prm.handler(elem) - })) + }).WithIgnoreErrors(prm.ignoreErrors)) + if err != nil { return nil, fmt.Errorf("fs tree iterator failure: %w", err) } diff --git a/pkg/local_object_storage/blobstor/iterate_test.go b/pkg/local_object_storage/blobstor/iterate_test.go index 49bd0fcab..23a35fb08 100644 --- a/pkg/local_object_storage/blobstor/iterate_test.go +++ b/pkg/local_object_storage/blobstor/iterate_test.go @@ -1,10 +1,15 @@ package blobstor import ( + "bytes" "encoding/binary" + "errors" "os" + "path/filepath" + "strconv" "testing" + "github.com/klauspost/compress/zstd" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" "github.com/nspcc-dev/neofs-sdk-go/object" objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" @@ -89,3 +94,111 @@ func TestIterateObjects(t *testing.T) { require.NoError(t, err) require.Empty(t, mObjs) } + +func TestIterate_IgnoreErrors(t *testing.T) { + dir := t.TempDir() + + const ( + smallSize = 512 + objCount = 5 + ) + bsOpts := []Option{ + WithCompressObjects(true), + WithRootPath(dir), + WithSmallSizeLimit(smallSize * 2), // + header + WithBlobovniczaOpenedCacheSize(1), + WithBlobovniczaShallowWidth(1), + WithBlobovniczaShallowDepth(1)} + bs := New(bsOpts...) + require.NoError(t, bs.Open()) + require.NoError(t, bs.Init()) + + addrs := make([]*object.Address, objCount) + for i := range addrs { + addrs[i] = objecttest.Address() + obj := object.NewRaw() + obj.SetContainerID(addrs[i].ContainerID()) + obj.SetID(addrs[i].ObjectID()) + obj.SetPayload(make([]byte, smallSize<<(i%2))) + + objData, err := obj.Marshal() + require.NoError(t, err) + + _, err = bs.PutRaw(addrs[i], objData, true) + require.NoError(t, err) + } + + // Construct corrupted compressed object. + buf := bytes.NewBuffer(nil) + badObject := make([]byte, smallSize/2+1) + enc, err := zstd.NewWriter(buf) + require.NoError(t, err) + rawData := enc.EncodeAll(badObject, nil) + for i := 4; /* magic size */ i < len(rawData); i += 2 { + rawData[i] ^= 0xFF + } + // Will be put uncompressed but fetched as compressed because of magic. + _, err = bs.PutRaw(objecttest.Address(), rawData, false) + require.NoError(t, err) + require.NoError(t, bs.fsTree.Put(objecttest.Address(), rawData)) + + require.NoError(t, bs.Close()) + + // Increase width to have blobovnicza which is definitely empty. + b := New(append(bsOpts, WithBlobovniczaShallowWidth(2))...) + require.NoError(t, b.Open()) + require.NoError(t, b.Init()) + + var p string + for i := 0; i < 2; i++ { + bp := filepath.Join(bs.blzRootPath, "1", strconv.FormatUint(uint64(i), 10)) + if _, ok := bs.blobovniczas.opened.Get(bp); !ok { + p = bp + break + } + } + require.NotEqual(t, "", p, "expected to not have at least 1 blobovnicza in cache") + require.NoError(t, os.Chmod(p, 0)) + + var prm IteratePrm + prm.SetIterationHandler(func(e IterationElement) error { + return nil + }) + _, err = bs.Iterate(prm) + require.Error(t, err) + + prm.IgnoreErrors() + + t.Run("skip invalid objects", func(t *testing.T) { + actual := make([]*object.Address, 0, len(addrs)) + prm.SetIterationHandler(func(e IterationElement) error { + obj := object.New() + err := obj.Unmarshal(e.data) + if err != nil { + return err + } + + addr := object.NewAddress() + addr.SetContainerID(obj.ContainerID()) + addr.SetObjectID(obj.ID()) + actual = append(actual, addr) + return nil + }) + + _, err := bs.Iterate(prm) + require.NoError(t, err) + require.ElementsMatch(t, addrs, actual) + }) + t.Run("return errors from handler", func(t *testing.T) { + n := 0 + expectedErr := errors.New("expected error") + prm.SetIterationHandler(func(e IterationElement) error { + if n++; n == objCount/2 { + return expectedErr + } + return nil + }) + _, err := bs.Iterate(prm) + require.True(t, errors.Is(err, expectedErr), "got: %v") + }) +}