From c954f0e71b7588aa491e23e70f6f068c5894debb Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 20 Jan 2022 18:53:29 +0300 Subject: [PATCH] [#1085] fstree: allow to ignore errors during iteration Signed-off-by: Evgenii Stratonikov --- .../blobstor/fstree/fstree.go | 39 ++++++++++++--- .../blobstor/fstree/fstree_test.go | 49 +++++++++++++++++-- pkg/local_object_storage/blobstor/iterate.go | 5 +- pkg/local_object_storage/writecache/flush.go | 5 +- .../writecache/iterate.go | 5 +- 5 files changed, 87 insertions(+), 16 deletions(-) diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index aff4ebf69..a4d7a4c8f 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -69,15 +69,36 @@ func addressFromString(s string) (*objectSDK.Address, error) { return addr, nil } -// Iterate iterates over all stored objects. -func (t *FSTree) Iterate(f func(addr *objectSDK.Address, data []byte) error) error { - return t.iterate(0, []string{t.RootPath}, f) +// IterationPrm contains iteraction parameters. +type IterationPrm struct { + handler func(addr *objectSDK.Address, data []byte) error + ignoreErrors bool } -func (t *FSTree) iterate(depth int, curPath []string, f func(*objectSDK.Address, []byte) error) error { +// WithHandler sets a function to call on each object. +func (p *IterationPrm) WithHandler(f func(addr *objectSDK.Address, data []byte) error) *IterationPrm { + p.handler = f + return p +} + +// WithIgnoreErrors sets a flag indicating whether errors should be ignored. +func (p *IterationPrm) WithIgnoreErrors(ignore bool) *IterationPrm { + p.ignoreErrors = ignore + return p +} + +// Iterate iterates over all stored objects. +func (t *FSTree) Iterate(prm *IterationPrm) error { + return t.iterate(0, []string{t.RootPath}, prm) +} + +func (t *FSTree) iterate(depth int, curPath []string, prm *IterationPrm) error { curName := strings.Join(curPath[1:], "") des, err := os.ReadDir(path.Join(curPath...)) if err != nil { + if prm.ignoreErrors { + return nil + } return err } @@ -89,8 +110,10 @@ func (t *FSTree) iterate(depth int, curPath []string, f func(*objectSDK.Address, curPath[l] = des[i].Name() if !isLast && des[i].IsDir() { - err := t.iterate(depth+1, curPath, f) + err := t.iterate(depth+1, curPath, prm) if err != nil { + // Must be error from handler in case errors are ignored. + // Need to report. return err } } @@ -106,10 +129,14 @@ func (t *FSTree) iterate(depth int, curPath []string, f func(*objectSDK.Address, data, err := os.ReadFile(path.Join(curPath...)) if err != nil { + if prm.ignoreErrors { + continue + } return err } - if err := f(addr, data); err != nil { + if err := prm.handler(addr, data); err != nil { + // Error occurred in handler, outside of our scope, needs to be reported. return err } } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index 208c2dcf1..59fb084d8 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -6,10 +6,13 @@ import ( "errors" "os" "path" + "path/filepath" "testing" + "github.com/nspcc-dev/neofs-node/pkg/util" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" "github.com/stretchr/testify/require" ) @@ -91,13 +94,13 @@ func TestFSTree(t *testing.T) { t.Run("iterate", func(t *testing.T) { n := 0 - err := fs.Iterate(func(addr *objectSDK.Address, data []byte) error { + err := fs.Iterate(new(IterationPrm).WithHandler(func(addr *objectSDK.Address, data []byte) error { n++ expected, ok := store[addr.String()] require.True(t, ok, "object %s was not found", addr.String()) require.Equal(t, data, expected) return nil - }) + })) require.NoError(t, err) require.Equal(t, count, n) @@ -105,16 +108,54 @@ func TestFSTree(t *testing.T) { t.Run("leave early", func(t *testing.T) { n := 0 errStop := errors.New("stop") - err := fs.Iterate(func(addr *objectSDK.Address, data []byte) error { + err := fs.Iterate(new(IterationPrm).WithHandler(func(addr *objectSDK.Address, data []byte) error { if n++; n == count-1 { return errStop } return nil - }) + })) require.True(t, errors.Is(err, errStop)) require.Equal(t, count-1, n) }) + + t.Run("ignore errors", func(t *testing.T) { + n := 0 + + // Unreadable directory. + require.NoError(t, os.Mkdir(filepath.Join(fs.RootPath, "ZZ"), 0)) + + // Unreadable file. + p := fs.treePath(objecttest.Address()) + require.NoError(t, util.MkdirAllX(filepath.Dir(p), fs.Permissions)) + require.NoError(t, os.WriteFile(p, []byte{1, 2, 3}, 0)) + + // Invalid address. + p = fs.treePath(objecttest.Address()) + ".invalid" + require.NoError(t, util.MkdirAllX(filepath.Dir(p), fs.Permissions)) + require.NoError(t, os.WriteFile(p, []byte{1, 2, 3}, fs.Permissions)) + + err := fs.Iterate(new(IterationPrm).WithHandler(func(addr *objectSDK.Address, data []byte) error { + n++ + return nil + }).WithIgnoreErrors(true)) + require.NoError(t, err) + require.Equal(t, count, n) + + t.Run("error from handler is returned", func(t *testing.T) { + expectedErr := errors.New("expected error") + n := 0 + err := fs.Iterate(new(IterationPrm).WithHandler(func(addr *objectSDK.Address, data []byte) error { + n++ + if n == count/2 { // process some iterations + return expectedErr + } + return nil + }).WithIgnoreErrors(true)) + require.True(t, errors.Is(err, expectedErr), "got: %v") + require.Equal(t, count/2, n) + }) + }) }) t.Run("delete", func(t *testing.T) { diff --git a/pkg/local_object_storage/blobstor/iterate.go b/pkg/local_object_storage/blobstor/iterate.go index 2bbd400ee..4b8c1bcfe 100644 --- a/pkg/local_object_storage/blobstor/iterate.go +++ b/pkg/local_object_storage/blobstor/iterate.go @@ -5,6 +5,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" ) @@ -78,7 +79,7 @@ func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) { elem.blzID = nil - err = b.fsTree.Iterate(func(_ *objectSDK.Address, data []byte) error { + err = b.fsTree.Iterate(new(fstree.IterationPrm).WithHandler(func(_ *objectSDK.Address, data []byte) error { // decompress the data elem.data, err = b.decompressor(data) if err != nil { @@ -86,7 +87,7 @@ func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) { } return prm.handler(elem) - }) + })) if err != nil { return nil, fmt.Errorf("fs tree iterator failure: %w", err) } diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index bf42938e1..0ec0511a5 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" "go.etcd.io/bbolt" @@ -132,7 +133,7 @@ func (c *cache) flushBigObjects() { } evictNum := 0 - _ = c.fsTree.Iterate(func(addr *objectSDK.Address, data []byte) error { + _ = c.fsTree.Iterate(new(fstree.IterationPrm).WithHandler(func(addr *objectSDK.Address, data []byte) error { sAddr := addr.String() if _, ok := c.store.flushed.Peek(sAddr); ok { @@ -160,7 +161,7 @@ func (c *cache) flushBigObjects() { evictNum++ return nil - }) + })) // evict objects which were successfully written to BlobStor c.evictObjects(evictNum) diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index 3bdf87763..e5434ebb9 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-sdk-go/object" "go.etcd.io/bbolt" ) @@ -34,12 +35,12 @@ func (c *cache) Iterate(f func([]byte) error) error { return err } - return c.fsTree.Iterate(func(addr *object.Address, data []byte) error { + return c.fsTree.Iterate(new(fstree.IterationPrm).WithHandler(func(addr *object.Address, data []byte) error { if _, ok := c.flushed.Peek(addr.String()); ok { return nil } return f(data) - }) + })) } // IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.