From a51b76056e73a0b0a7334cfdc58b9e4f92236abd Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 12 Sep 2022 14:48:21 +0300 Subject: [PATCH] [#1731] engine: Add Evacuate command Make it possible to move all data from 1 shard to other shards. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/engine/evacuate.go | 150 ++++++++++++++++++ .../engine/evacuate_test.go | 115 ++++++++++++++ 2 files changed, 265 insertions(+) create mode 100644 pkg/local_object_storage/engine/evacuate.go create mode 100644 pkg/local_object_storage/engine/evacuate_test.go diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go new file mode 100644 index 000000000..dd1ad15e2 --- /dev/null +++ b/pkg/local_object_storage/engine/evacuate.go @@ -0,0 +1,150 @@ +package engine + +import ( + "errors" + "fmt" + + "github.com/nspcc-dev/hrw" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/util" + "go.uber.org/zap" +) + +// EvacuateShardPrm represents parameters for the EvacuateShard operation. +type EvacuateShardPrm struct { + shardID *shard.ID + ignoreErrors bool +} + +// EvacuateShardRes represents result of the EvacuateShard operation. +type EvacuateShardRes struct { + count int +} + +// WithShardID sets shard ID. +func (p *EvacuateShardPrm) WithShardID(id *shard.ID) { + p.shardID = id +} + +// WithIgnoreErrors sets flag to ignore errors. +func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) { + p.ignoreErrors = ignore +} + +// Count returns amount of evacuated objects. +func (p EvacuateShardRes) Count() int { + return p.count +} + +const defaultEvacuateBatchSize = 100 + +type pooledShard struct { + hashedShard + pool util.WorkerPool +} + +var errMustHaveTwoShards = errors.New("amount of shards must be > 2") + +// Evacuate moves data from one shard to the others. +// The shard being moved must be in read-only mode. +func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) { + sid := prm.shardID.String() + + e.mtx.RLock() + sh, ok := e.shards[sid] + if !ok { + e.mtx.RUnlock() + return EvacuateShardRes{}, errShardNotFound + } + + if len(e.shards) < 2 { + e.mtx.RUnlock() + return EvacuateShardRes{}, errMustHaveTwoShards + } + + if !sh.GetMode().ReadOnly() { + e.mtx.RUnlock() + return EvacuateShardRes{}, shard.ErrMustBeReadOnly + } + + // We must have all shards, to have correct information about their + // indexes in a sorted slice and set appropriate marks in the metabase. + // Evacuated shard is skipped during put. + shards := make([]pooledShard, 0, len(e.shards)) + for id := range e.shards { + shards = append(shards, pooledShard{ + hashedShard: hashedShard(e.shards[id]), + pool: e.shardPools[id], + }) + } + e.mtx.RUnlock() + + weights := make([]float64, 0, len(shards)) + for i := range shards { + weights = append(weights, e.shardWeight(shards[i].Shard)) + } + + var listPrm shard.ListWithCursorPrm + listPrm.WithCount(defaultEvacuateBatchSize) + + var c *meta.Cursor + var res EvacuateShardRes + for { + listPrm.WithCursor(c) + + // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes + // because ListWithCursor works only with the metabase. + listRes, err := sh.Shard.ListWithCursor(listPrm) + if err != nil { + if errors.Is(err, meta.ErrEndOfListing) { + return res, nil + } + return res, err + } + + // TODO (@fyrchik): #1731 parallelize the loop + lst := listRes.AddressList() + + loop: + for i := range lst { + var getPrm shard.GetPrm + getPrm.SetAddress(lst[i]) + + getRes, err := sh.Get(getPrm) + if err != nil { + if prm.ignoreErrors { + continue + } + return res, err + } + + hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(lst[i].EncodeToString()))) + for j := range shards { + if shards[j].ID().String() == sid { + continue + } + ok := e.putToShard(shards[j].Shard, j, shards[j].pool, lst[i], getRes.Object()) + if ok { + e.log.Debug("object is moved to another shard", + zap.String("from", sid), + zap.Stringer("to", shards[j].ID()), + zap.Stringer("addr", lst[i])) + + res.count++ + continue loop + } + } + + // TODO (@fyrchik): #1731 try replicating to another node. + // The suggestion is to have prm.handler which is called + // if a Put has failed. + + // Do not check ignoreErrors flag here because + // ignoring errors on put make this command kinda useless. + return res, fmt.Errorf("%w: %s", errPutShard, lst[i]) + } + + c = listRes.Cursor() + } +} diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go new file mode 100644 index 000000000..5617d8c55 --- /dev/null +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -0,0 +1,115 @@ +package engine + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + + objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestEvacuateShard(t *testing.T) { + dir, err := os.MkdirTemp("", "*") + require.NoError(t, err) + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + + e := New( + WithLogger(zaptest.NewLogger(t)), + WithShardPoolSize(1)) + + var ids [3]*shard.ID + var fsTree *fstree.FSTree + + for i := range ids { + fsTree = fstree.New( + fstree.WithPath(filepath.Join(dir, strconv.Itoa(i))), + fstree.WithDepth(1)) + + ids[i], err = e.AddShard( + shard.WithLogger(zaptest.NewLogger(t)), + shard.WithBlobStorOptions( + blobstor.WithStorages([]blobstor.SubStorage{{ + Storage: fsTree, + }})), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), + meta.WithPermissions(0700), + meta.WithEpochState(epochState{}), + )) + require.NoError(t, err) + } + require.NoError(t, e.Open()) + require.NoError(t, e.Init()) + + const objPerShard = 3 + + evacuateShardID := ids[2].String() + + objects := make([]*objectSDK.Object, 0, objPerShard*len(ids)) + for i := 0; ; i++ { + objects = append(objects, generateObjectWithCID(t, cidtest.ID())) + + var putPrm PutPrm + putPrm.WithObject(objects[i]) + + _, err := e.Put(putPrm) + require.NoError(t, err) + + res, err := e.shards[evacuateShardID].List() + require.NoError(t, err) + if len(res.AddressList()) == objPerShard { + break + } + } + + checkHasObjects := func(t *testing.T) { + for i := range objects { + var prm GetPrm + prm.WithAddress(objectCore.AddressOf(objects[i])) + + _, err := e.Get(prm) + require.NoError(t, err) + } + } + + checkHasObjects(t) + + var prm EvacuateShardPrm + prm.WithShardID(ids[2]) + + t.Run("must be read-only", func(t *testing.T) { + res, err := e.Evacuate(prm) + require.ErrorIs(t, err, shard.ErrMustBeReadOnly) + require.Equal(t, 0, res.Count()) + }) + + require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) + + res, err := e.Evacuate(prm) + require.NoError(t, err) + require.Equal(t, objPerShard, res.count) + + // We check that all objects are available both before and after shard removal. + // First case is a real-world use-case. It ensures that an object can be put in presense + // of all metabase checks/marks. + // Second case ensures that all objects are indeed moved and available. + checkHasObjects(t) + + e.mtx.Lock() + delete(e.shards, evacuateShardID) + delete(e.shardPools, evacuateShardID) + e.mtx.Unlock() + + checkHasObjects(t) +}