From a49137349be8564c48ba5279c38aae9daf0286f2 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 19 Sep 2022 13:31:55 +0300 Subject: [PATCH] [#1731] engine: Allow to use user handler for evacuated objects Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/engine/evacuate.go | 27 ++++-- .../engine/evacuate_test.go | 95 ++++++++++++++++--- 2 files changed, 102 insertions(+), 20 deletions(-) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 32a441637..8379405bb 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -8,12 +8,15 @@ import ( meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/util" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) // EvacuateShardPrm represents parameters for the EvacuateShard operation. type EvacuateShardPrm struct { shardID *shard.ID + handler func(oid.Address, *objectSDK.Object) error ignoreErrors bool } @@ -32,7 +35,13 @@ func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) { p.ignoreErrors = ignore } +// WithFaultHandler sets handler to call for objects which cannot be saved on other shards. +func (p *EvacuateShardPrm) WithFaultHandler(f func(oid.Address, *objectSDK.Object) error) { + p.handler = f +} + // Count returns amount of evacuated objects. +// Objects for which handler returned no error are also assumed evacuated. func (p EvacuateShardRes) Count() int { return p.count } @@ -58,7 +67,7 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) return EvacuateShardRes{}, errShardNotFound } - if len(e.shards) < 2 { + if len(e.shards) < 2 && prm.handler == nil { e.mtx.RUnlock() return EvacuateShardRes{}, errMustHaveTwoShards } @@ -138,13 +147,17 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) } } - // TODO (@fyrchik): #1731 try replicating to another node. - // The suggestion is to have prm.handler which is called - // if a Put has failed. + if prm.handler == nil { + // Do not check ignoreErrors flag here because + // ignoring errors on put make this command kinda useless. + return res, fmt.Errorf("%w: %s", errPutShard, lst[i]) + } - // Do not check ignoreErrors flag here because - // ignoring errors on put make this command kinda useless. - return res, fmt.Errorf("%w: %s", errPutShard, lst[i]) + err = prm.handler(lst[i], getRes.Object()) + if err != nil { + return res, err + } + res.count++ } c = listRes.Cursor() diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index fecba0568..5d0ed74cb 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -1,6 +1,7 @@ package engine import ( + "errors" "fmt" "os" "path/filepath" @@ -15,11 +16,12 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) -func TestEvacuateShard(t *testing.T) { +func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { dir, err := os.MkdirTemp("", "*") require.NoError(t, err) t.Cleanup(func() { _ = os.RemoveAll(dir) }) @@ -28,19 +30,16 @@ func TestEvacuateShard(t *testing.T) { WithLogger(zaptest.NewLogger(t)), WithShardPoolSize(1)) - var ids [3]*shard.ID - var fsTree *fstree.FSTree + ids := make([]*shard.ID, shardNum) for i := range ids { - fsTree = fstree.New( - fstree.WithPath(filepath.Join(dir, strconv.Itoa(i))), - fstree.WithDepth(1)) - ids[i], err = e.AddShard( shard.WithLogger(zaptest.NewLogger(t)), shard.WithBlobStorOptions( blobstor.WithStorages([]blobstor.SubStorage{{ - Storage: fsTree, + Storage: fstree.New( + fstree.WithPath(filepath.Join(dir, strconv.Itoa(i))), + fstree.WithDepth(1)), }})), shard.WithMetaBaseOptions( meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), @@ -52,10 +51,6 @@ func TestEvacuateShard(t *testing.T) { require.NoError(t, e.Open()) require.NoError(t, e.Init()) - const objPerShard = 3 - - evacuateShardID := ids[2].String() - objects := make([]*objectSDK.Object, 0, objPerShard*len(ids)) for i := 0; ; i++ { objects = append(objects, generateObjectWithCID(t, cidtest.ID())) @@ -66,12 +61,21 @@ func TestEvacuateShard(t *testing.T) { _, err := e.Put(putPrm) require.NoError(t, err) - res, err := e.shards[evacuateShardID].List() + res, err := e.shards[ids[len(ids)-1].String()].List() require.NoError(t, err) if len(res.AddressList()) == objPerShard { break } } + return e, ids, objects +} + +func TestEvacuateShard(t *testing.T) { + const objPerShard = 3 + + e, ids, objects := newEngineEvacuate(t, 3, objPerShard) + + evacuateShardID := ids[2].String() checkHasObjects := func(t *testing.T) { for i := range objects { @@ -120,3 +124,68 @@ func TestEvacuateShard(t *testing.T) { checkHasObjects(t) } + +func TestEvacuateNetwork(t *testing.T) { + var errReplication = errors.New("handler error") + + acceptOneOf := func(objects []*objectSDK.Object, max int) func(oid.Address, *objectSDK.Object) error { + var n int + return func(addr oid.Address, obj *objectSDK.Object) error { + if n == max { + return errReplication + } + + n++ + for i := range objects { + if addr == objectCore.AddressOf(objects[i]) { + require.Equal(t, objects[i], obj) + return nil + } + } + require.FailNow(t, "handler was called with an unexpected object: %s", addr) + panic("unreachable") + } + } + + t.Run("single shard", func(t *testing.T) { + e, ids, objects := newEngineEvacuate(t, 1, 3) + evacuateShardID := ids[0].String() + + require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) + + var prm EvacuateShardPrm + prm.shardID = ids[0] + + res, err := e.Evacuate(prm) + require.ErrorIs(t, err, errMustHaveTwoShards) + require.Equal(t, 0, res.Count()) + + prm.handler = acceptOneOf(objects, 2) + + res, err = e.Evacuate(prm) + require.ErrorIs(t, err, errReplication) + require.Equal(t, 2, res.Count()) + }) + t.Run("multiple shards", func(t *testing.T) { + e, ids, objects := newEngineEvacuate(t, 2, 3) + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + + var prm EvacuateShardPrm + prm.shardID = ids[1] + prm.handler = acceptOneOf(objects, 2) + + res, err := e.Evacuate(prm) + require.ErrorIs(t, err, errReplication) + require.Equal(t, 2, res.Count()) + + t.Run("no errors", func(t *testing.T) { + prm.handler = acceptOneOf(objects, 3) + + res, err := e.Evacuate(prm) + require.NoError(t, err) + require.Equal(t, 3, res.Count()) + }) + }) +}