forked from TrueCloudLab/frostfs-node
[#1085] fstree: allow to ignore errors during iteration
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
e53ad2f468
commit
c954f0e71b
5 changed files with 87 additions and 16 deletions
|
@ -69,15 +69,36 @@ func addressFromString(s string) (*objectSDK.Address, error) {
|
||||||
return addr, nil
|
return addr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate iterates over all stored objects.
|
// IterationPrm contains iteraction parameters.
|
||||||
func (t *FSTree) Iterate(f func(addr *objectSDK.Address, data []byte) error) error {
|
type IterationPrm struct {
|
||||||
return t.iterate(0, []string{t.RootPath}, f)
|
handler func(addr *objectSDK.Address, data []byte) error
|
||||||
|
ignoreErrors bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *FSTree) iterate(depth int, curPath []string, f func(*objectSDK.Address, []byte) error) error {
|
// WithHandler sets a function to call on each object.
|
||||||
|
func (p *IterationPrm) WithHandler(f func(addr *objectSDK.Address, data []byte) error) *IterationPrm {
|
||||||
|
p.handler = f
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithIgnoreErrors sets a flag indicating whether errors should be ignored.
|
||||||
|
func (p *IterationPrm) WithIgnoreErrors(ignore bool) *IterationPrm {
|
||||||
|
p.ignoreErrors = ignore
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate iterates over all stored objects.
|
||||||
|
func (t *FSTree) Iterate(prm *IterationPrm) error {
|
||||||
|
return t.iterate(0, []string{t.RootPath}, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *FSTree) iterate(depth int, curPath []string, prm *IterationPrm) error {
|
||||||
curName := strings.Join(curPath[1:], "")
|
curName := strings.Join(curPath[1:], "")
|
||||||
des, err := os.ReadDir(path.Join(curPath...))
|
des, err := os.ReadDir(path.Join(curPath...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if prm.ignoreErrors {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,8 +110,10 @@ func (t *FSTree) iterate(depth int, curPath []string, f func(*objectSDK.Address,
|
||||||
curPath[l] = des[i].Name()
|
curPath[l] = des[i].Name()
|
||||||
|
|
||||||
if !isLast && des[i].IsDir() {
|
if !isLast && des[i].IsDir() {
|
||||||
err := t.iterate(depth+1, curPath, f)
|
err := t.iterate(depth+1, curPath, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// Must be error from handler in case errors are ignored.
|
||||||
|
// Need to report.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -106,10 +129,14 @@ func (t *FSTree) iterate(depth int, curPath []string, f func(*objectSDK.Address,
|
||||||
|
|
||||||
data, err := os.ReadFile(path.Join(curPath...))
|
data, err := os.ReadFile(path.Join(curPath...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if prm.ignoreErrors {
|
||||||
|
continue
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := f(addr, data); err != nil {
|
if err := prm.handler(addr, data); err != nil {
|
||||||
|
// Error occurred in handler, outside of our scope, needs to be reported.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,10 +6,13 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
|
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -91,13 +94,13 @@ func TestFSTree(t *testing.T) {
|
||||||
|
|
||||||
t.Run("iterate", func(t *testing.T) {
|
t.Run("iterate", func(t *testing.T) {
|
||||||
n := 0
|
n := 0
|
||||||
err := fs.Iterate(func(addr *objectSDK.Address, data []byte) error {
|
err := fs.Iterate(new(IterationPrm).WithHandler(func(addr *objectSDK.Address, data []byte) error {
|
||||||
n++
|
n++
|
||||||
expected, ok := store[addr.String()]
|
expected, ok := store[addr.String()]
|
||||||
require.True(t, ok, "object %s was not found", addr.String())
|
require.True(t, ok, "object %s was not found", addr.String())
|
||||||
require.Equal(t, data, expected)
|
require.Equal(t, data, expected)
|
||||||
return nil
|
return nil
|
||||||
})
|
}))
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, count, n)
|
require.Equal(t, count, n)
|
||||||
|
@ -105,16 +108,54 @@ func TestFSTree(t *testing.T) {
|
||||||
t.Run("leave early", func(t *testing.T) {
|
t.Run("leave early", func(t *testing.T) {
|
||||||
n := 0
|
n := 0
|
||||||
errStop := errors.New("stop")
|
errStop := errors.New("stop")
|
||||||
err := fs.Iterate(func(addr *objectSDK.Address, data []byte) error {
|
err := fs.Iterate(new(IterationPrm).WithHandler(func(addr *objectSDK.Address, data []byte) error {
|
||||||
if n++; n == count-1 {
|
if n++; n == count-1 {
|
||||||
return errStop
|
return errStop
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
}))
|
||||||
|
|
||||||
require.True(t, errors.Is(err, errStop))
|
require.True(t, errors.Is(err, errStop))
|
||||||
require.Equal(t, count-1, n)
|
require.Equal(t, count-1, n)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("ignore errors", func(t *testing.T) {
|
||||||
|
n := 0
|
||||||
|
|
||||||
|
// Unreadable directory.
|
||||||
|
require.NoError(t, os.Mkdir(filepath.Join(fs.RootPath, "ZZ"), 0))
|
||||||
|
|
||||||
|
// Unreadable file.
|
||||||
|
p := fs.treePath(objecttest.Address())
|
||||||
|
require.NoError(t, util.MkdirAllX(filepath.Dir(p), fs.Permissions))
|
||||||
|
require.NoError(t, os.WriteFile(p, []byte{1, 2, 3}, 0))
|
||||||
|
|
||||||
|
// Invalid address.
|
||||||
|
p = fs.treePath(objecttest.Address()) + ".invalid"
|
||||||
|
require.NoError(t, util.MkdirAllX(filepath.Dir(p), fs.Permissions))
|
||||||
|
require.NoError(t, os.WriteFile(p, []byte{1, 2, 3}, fs.Permissions))
|
||||||
|
|
||||||
|
err := fs.Iterate(new(IterationPrm).WithHandler(func(addr *objectSDK.Address, data []byte) error {
|
||||||
|
n++
|
||||||
|
return nil
|
||||||
|
}).WithIgnoreErrors(true))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, count, n)
|
||||||
|
|
||||||
|
t.Run("error from handler is returned", func(t *testing.T) {
|
||||||
|
expectedErr := errors.New("expected error")
|
||||||
|
n := 0
|
||||||
|
err := fs.Iterate(new(IterationPrm).WithHandler(func(addr *objectSDK.Address, data []byte) error {
|
||||||
|
n++
|
||||||
|
if n == count/2 { // process some iterations
|
||||||
|
return expectedErr
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}).WithIgnoreErrors(true))
|
||||||
|
require.True(t, errors.Is(err, expectedErr), "got: %v")
|
||||||
|
require.Equal(t, count/2, n)
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("delete", func(t *testing.T) {
|
t.Run("delete", func(t *testing.T) {
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -78,7 +79,7 @@ func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) {
|
||||||
|
|
||||||
elem.blzID = nil
|
elem.blzID = nil
|
||||||
|
|
||||||
err = b.fsTree.Iterate(func(_ *objectSDK.Address, data []byte) error {
|
err = b.fsTree.Iterate(new(fstree.IterationPrm).WithHandler(func(_ *objectSDK.Address, data []byte) error {
|
||||||
// decompress the data
|
// decompress the data
|
||||||
elem.data, err = b.decompressor(data)
|
elem.data, err = b.decompressor(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -86,7 +87,7 @@ func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return prm.handler(elem)
|
return prm.handler(elem)
|
||||||
})
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("fs tree iterator failure: %w", err)
|
return nil, fmt.Errorf("fs tree iterator failure: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -132,7 +133,7 @@ func (c *cache) flushBigObjects() {
|
||||||
}
|
}
|
||||||
|
|
||||||
evictNum := 0
|
evictNum := 0
|
||||||
_ = c.fsTree.Iterate(func(addr *objectSDK.Address, data []byte) error {
|
_ = c.fsTree.Iterate(new(fstree.IterationPrm).WithHandler(func(addr *objectSDK.Address, data []byte) error {
|
||||||
sAddr := addr.String()
|
sAddr := addr.String()
|
||||||
|
|
||||||
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
||||||
|
@ -160,7 +161,7 @@ func (c *cache) flushBigObjects() {
|
||||||
evictNum++
|
evictNum++
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
}))
|
||||||
|
|
||||||
// evict objects which were successfully written to BlobStor
|
// evict objects which were successfully written to BlobStor
|
||||||
c.evictObjects(evictNum)
|
c.evictObjects(evictNum)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
@ -34,12 +35,12 @@ func (c *cache) Iterate(f func([]byte) error) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.fsTree.Iterate(func(addr *object.Address, data []byte) error {
|
return c.fsTree.Iterate(new(fstree.IterationPrm).WithHandler(func(addr *object.Address, data []byte) error {
|
||||||
if _, ok := c.flushed.Peek(addr.String()); ok {
|
if _, ok := c.flushed.Peek(addr.String()); ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return f(data)
|
return f(data)
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
|
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
|
||||||
|
|
Loading…
Reference in a new issue