From 19c0a74e94fdd4357af9aaede60196ecf05c7158 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 10 Oct 2022 20:54:14 +0300 Subject: [PATCH] [#1867] services/control: Allow to provide multiple shard IDs to some commands Signed-off-by: Evgenii Stratonikov --- .../modules/control/evacuate_shard.go | 2 +- cmd/neofs-cli/modules/control/flush_cache.go | 2 +- .../modules/control/shards_set_mode.go | 2 +- pkg/local_object_storage/engine/evacuate.go | 150 ++++++++++-------- .../engine/evacuate_test.go | 40 ++++- pkg/services/control/server/evacuate.go | 5 +- pkg/services/control/server/flush_cache.go | 15 +- pkg/services/control/server/helpers.go | 11 ++ pkg/services/control/server/set_shard_mode.go | 12 +- pkg/services/control/service.go | 4 +- pkg/services/control/service.pb.go | Bin 110686 -> 110698 bytes pkg/services/control/service.proto | 6 +- pkg/services/control/service_neofs.pb.go | Bin 52624 -> 52672 bytes pkg/services/control/service_test.go | 10 +- 14 files changed, 163 insertions(+), 96 deletions(-) create mode 100644 pkg/services/control/server/helpers.go diff --git a/cmd/neofs-cli/modules/control/evacuate_shard.go b/cmd/neofs-cli/modules/control/evacuate_shard.go index 907eb8d3c..a5aa83afc 100644 --- a/cmd/neofs-cli/modules/control/evacuate_shard.go +++ b/cmd/neofs-cli/modules/control/evacuate_shard.go @@ -20,7 +20,7 @@ func evacuateShard(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)} - req.Body.Shard_ID = getShardID(cmd) + req.Body.Shard_ID = [][]byte{getShardID(cmd)} req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(dumpIgnoreErrorsFlag) signRequest(cmd, pk, req) diff --git a/cmd/neofs-cli/modules/control/flush_cache.go b/cmd/neofs-cli/modules/control/flush_cache.go index 49948cfee..f6c134b12 100644 --- a/cmd/neofs-cli/modules/control/flush_cache.go +++ b/cmd/neofs-cli/modules/control/flush_cache.go @@ -20,7 +20,7 @@ func flushCache(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) req := &control.FlushCacheRequest{Body: new(control.FlushCacheRequest_Body)} - req.Body.Shard_ID = getShardID(cmd) + req.Body.Shard_ID = [][]byte{getShardID(cmd)} signRequest(cmd, pk, req) diff --git a/cmd/neofs-cli/modules/control/shards_set_mode.go b/cmd/neofs-cli/modules/control/shards_set_mode.go index d44caf690..a6af8e36e 100644 --- a/cmd/neofs-cli/modules/control/shards_set_mode.go +++ b/cmd/neofs-cli/modules/control/shards_set_mode.go @@ -71,7 +71,7 @@ func setShardMode(cmd *cobra.Command, _ []string) { req.SetBody(body) body.SetMode(mode) - body.SetShardID(getShardID(cmd)) + body.SetShardIDList([][]byte{getShardID(cmd)}) reset, _ := cmd.Flags().GetBool(shardClearErrorsFlag) body.ClearErrorCounter(reset) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index bc185dacd..e3f32f934 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -15,7 +15,7 @@ import ( // EvacuateShardPrm represents parameters for the EvacuateShard operation. type EvacuateShardPrm struct { - shardID *shard.ID + shardID []*shard.ID handler func(oid.Address, *objectSDK.Object) error ignoreErrors bool } @@ -25,8 +25,8 @@ type EvacuateShardRes struct { count int } -// WithShardID sets shard ID. -func (p *EvacuateShardPrm) WithShardID(id *shard.ID) { +// WithShardIDList sets shard ID. +func (p *EvacuateShardPrm) WithShardIDList(id []*shard.ID) { p.shardID = id } @@ -53,30 +53,35 @@ type pooledShard struct { pool util.WorkerPool } -var errMustHaveTwoShards = errors.New("amount of shards must be > 2") +var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from one shard to the others. // The shard being moved must be in read-only mode. func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) { - sid := prm.shardID.String() + sidList := make([]string, len(prm.shardID)) + for i := range prm.shardID { + sidList[i] = prm.shardID[i].String() + } e.mtx.RLock() - sh, ok := e.shards[sid] - if !ok { - e.mtx.RUnlock() - return EvacuateShardRes{}, errShardNotFound + for i := range sidList { + sh, ok := e.shards[sidList[i]] + if !ok { + e.mtx.RUnlock() + return EvacuateShardRes{}, errShardNotFound + } + + if !sh.GetMode().ReadOnly() { + e.mtx.RUnlock() + return EvacuateShardRes{}, shard.ErrMustBeReadOnly + } } - if len(e.shards) < 2 && prm.handler == nil { + if len(e.shards)-len(sidList) < 1 && prm.handler == nil { e.mtx.RUnlock() return EvacuateShardRes{}, errMustHaveTwoShards } - if !sh.GetMode().ReadOnly() { - e.mtx.RUnlock() - return EvacuateShardRes{}, shard.ErrMustBeReadOnly - } - // We must have all shards, to have correct information about their // indexes in a sorted slice and set appropriate marks in the metabase. // Evacuated shard is skipped during put. @@ -94,72 +99,89 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) weights = append(weights, e.shardWeight(shards[i].Shard)) } + shardMap := make(map[string]*shard.Shard) + for i := range sidList { + for j := range shards { + if shards[j].ID().String() == sidList[i] { + shardMap[sidList[i]] = shards[j].Shard + } + } + } + var listPrm shard.ListWithCursorPrm listPrm.WithCount(defaultEvacuateBatchSize) - var c *meta.Cursor var res EvacuateShardRes - for { - listPrm.WithCursor(c) - // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes - // because ListWithCursor works only with the metabase. - listRes, err := sh.Shard.ListWithCursor(listPrm) - if err != nil { - if errors.Is(err, meta.ErrEndOfListing) { - return res, nil - } - return res, err - } +mainLoop: + for n := range sidList { + sh := shardMap[sidList[n]] - // TODO (@fyrchik): #1731 parallelize the loop - lst := listRes.AddressList() + var c *meta.Cursor + for { + listPrm.WithCursor(c) - loop: - for i := range lst { - var getPrm shard.GetPrm - getPrm.SetAddress(lst[i]) - - getRes, err := sh.Get(getPrm) + // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes + // because ListWithCursor works only with the metabase. + listRes, err := sh.ListWithCursor(listPrm) if err != nil { - if prm.ignoreErrors { - continue + if errors.Is(err, meta.ErrEndOfListing) { + continue mainLoop } return res, err } - hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(lst[i].EncodeToString()))) - for j := range shards { - if shards[j].ID().String() == sid { - continue - } - putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, lst[i], getRes.Object()) - if putDone || exists { - if putDone { - e.log.Debug("object is moved to another shard", - zap.String("from", sid), - zap.Stringer("to", shards[j].ID()), - zap.Stringer("addr", lst[i])) + // TODO (@fyrchik): #1731 parallelize the loop + lst := listRes.AddressList() - res.count++ + loop: + for i := range lst { + var getPrm shard.GetPrm + getPrm.SetAddress(lst[i]) + + getRes, err := sh.Get(getPrm) + if err != nil { + if prm.ignoreErrors { + continue } - continue loop + return res, err } + + hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(lst[i].EncodeToString()))) + for j := range shards { + if _, ok := shardMap[shards[j].ID().String()]; ok { + continue + } + putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, lst[i], getRes.Object()) + if putDone || exists { + if putDone { + e.log.Debug("object is moved to another shard", + zap.String("from", sidList[n]), + zap.Stringer("to", shards[j].ID()), + zap.Stringer("addr", lst[i])) + + res.count++ + } + continue loop + } + } + + if prm.handler == nil { + // Do not check ignoreErrors flag here because + // ignoring errors on put make this command kinda useless. + return res, fmt.Errorf("%w: %s", errPutShard, lst[i]) + } + + err = prm.handler(lst[i], getRes.Object()) + if err != nil { + return res, err + } + res.count++ } - if prm.handler == nil { - // Do not check ignoreErrors flag here because - // ignoring errors on put make this command kinda useless. - return res, fmt.Errorf("%w: %s", errPutShard, lst[i]) - } - - err = prm.handler(lst[i], getRes.Object()) - if err != nil { - return res, err - } - res.count++ + c = listRes.Cursor() } - - c = listRes.Cursor() } + + return res, nil } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 5d0ed74cb..29563217f 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -90,7 +90,7 @@ func TestEvacuateShard(t *testing.T) { checkHasObjects(t) var prm EvacuateShardPrm - prm.WithShardID(ids[2]) + prm.WithShardIDList(ids[2:3]) t.Run("must be read-only", func(t *testing.T) { res, err := e.Evacuate(prm) @@ -154,7 +154,7 @@ func TestEvacuateNetwork(t *testing.T) { require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) var prm EvacuateShardPrm - prm.shardID = ids[0] + prm.shardID = ids[0:1] res, err := e.Evacuate(prm) require.ErrorIs(t, err, errMustHaveTwoShards) @@ -166,14 +166,14 @@ func TestEvacuateNetwork(t *testing.T) { require.ErrorIs(t, err, errReplication) require.Equal(t, 2, res.Count()) }) - t.Run("multiple shards", func(t *testing.T) { + t.Run("multiple shards, evacuate one", func(t *testing.T) { e, ids, objects := newEngineEvacuate(t, 2, 3) require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) var prm EvacuateShardPrm - prm.shardID = ids[1] + prm.shardID = ids[1:2] prm.handler = acceptOneOf(objects, 2) res, err := e.Evacuate(prm) @@ -188,4 +188,36 @@ func TestEvacuateNetwork(t *testing.T) { require.Equal(t, 3, res.Count()) }) }) + t.Run("multiple shards, evacuate many", func(t *testing.T) { + e, ids, objects := newEngineEvacuate(t, 4, 5) + evacuateIDs := ids[0:3] + + var totalCount int + for i := range evacuateIDs { + res, err := e.shards[ids[i].String()].List() + require.NoError(t, err) + + totalCount += len(res.AddressList()) + } + + for i := range ids { + require.NoError(t, e.shards[ids[i].String()].SetMode(mode.ReadOnly)) + } + + var prm EvacuateShardPrm + prm.shardID = evacuateIDs + prm.handler = acceptOneOf(objects, totalCount-1) + + res, err := e.Evacuate(prm) + require.ErrorIs(t, err, errReplication) + require.Equal(t, totalCount-1, res.Count()) + + t.Run("no errors", func(t *testing.T) { + prm.handler = acceptOneOf(objects, totalCount) + + res, err := e.Evacuate(prm) + require.NoError(t, err) + require.Equal(t, totalCount, res.Count()) + }) + }) } diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index aac99667f..0b8fa8de3 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -9,7 +9,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/services/control" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" @@ -26,10 +25,8 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ return nil, status.Error(codes.PermissionDenied, err.Error()) } - shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID()) - var prm engine.EvacuateShardPrm - prm.WithShardID(shardID) + prm.WithShardIDList(getShardIDList(req.GetBody().GetShard_ID())) prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) prm.WithFaultHandler(s.replicate) diff --git a/pkg/services/control/server/flush_cache.go b/pkg/services/control/server/flush_cache.go index baf1eb620..2d19e5c21 100644 --- a/pkg/services/control/server/flush_cache.go +++ b/pkg/services/control/server/flush_cache.go @@ -4,7 +4,6 @@ import ( "context" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/services/control" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -16,14 +15,14 @@ func (s *Server) FlushCache(_ context.Context, req *control.FlushCacheRequest) ( return nil, status.Error(codes.PermissionDenied, err.Error()) } - shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID()) + for _, shardID := range getShardIDList(req.GetBody().GetShard_ID()) { + var prm engine.FlushWriteCachePrm + prm.SetShardID(shardID) - var prm engine.FlushWriteCachePrm - prm.SetShardID(shardID) - - _, err = s.s.FlushWriteCache(prm) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + _, err = s.s.FlushWriteCache(prm) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } } resp := &control.FlushCacheResponse{Body: &control.FlushCacheResponse_Body{}} diff --git a/pkg/services/control/server/helpers.go b/pkg/services/control/server/helpers.go new file mode 100644 index 000000000..e2a7cd5d7 --- /dev/null +++ b/pkg/services/control/server/helpers.go @@ -0,0 +1,11 @@ +package control + +import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + +func getShardIDList(raw [][]byte) []*shard.ID { + res := make([]*shard.ID, 0, len(raw)) + for i := range raw { + res = append(res, shard.NewIDFromBytes(raw[i])) + } + return res +} diff --git a/pkg/services/control/server/set_shard_mode.go b/pkg/services/control/server/set_shard_mode.go index 94bf438c6..e9d85fe8c 100644 --- a/pkg/services/control/server/set_shard_mode.go +++ b/pkg/services/control/server/set_shard_mode.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/services/control" "google.golang.org/grpc/codes" @@ -21,8 +20,7 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques var ( m mode.Mode - requestedMode = req.GetBody().GetMode() - requestedShard = shard.NewIDFromBytes(req.Body.GetShard_ID()) + requestedMode = req.GetBody().GetMode() ) switch requestedMode { @@ -38,9 +36,11 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques return nil, status.Error(codes.Internal, fmt.Sprintf("unknown shard mode: %s", requestedMode)) } - err = s.s.SetShardMode(requestedShard, m, false) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + for _, shardID := range getShardIDList(req.Body.GetShard_ID()) { + err = s.s.SetShardMode(shardID, m, false) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } } // create and fill response diff --git a/pkg/services/control/service.go b/pkg/services/control/service.go index acab74591..7de516bcf 100644 --- a/pkg/services/control/service.go +++ b/pkg/services/control/service.go @@ -91,8 +91,8 @@ func (x *ListShardsResponse) SetBody(v *ListShardsResponse_Body) { } } -// SetShardID sets shard ID whose mode is requested to be set. -func (x *SetShardModeRequest_Body) SetShardID(v []byte) { +// SetShardIDList sets shard ID whose mode is requested to be set. +func (x *SetShardModeRequest_Body) SetShardIDList(v [][]byte) { if v != nil { x.Shard_ID = v } diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index 104d511b6b648bad536518a599193e64d08486db..126d68d2be66185f0fc6ce02720e041c307a4f28 100644 GIT binary patch delta 214 zcmccjfbG=-whhcHCo`^;ntWz4-{jX%Og6`^v|we8j!mj8nS9Y$ezM+6smU86I49>? z3Qt~O$-4Q|x;L80>L>3#m_9k7g?sa=o4zbq^!>XjJvnxF>E<){ezL;!ZGQbk5lQW4 zy_ZE=)4ll_d8P}n0*Uu#jC|8OL>R@Vzn5VY-2P6O@ud<%dOJ`$pJO@;E29KRrO0#v TRYv~l4XTVh+n3ri?koTRDdASt delta 199 zcmaF$fbHG`whhcHCmU>y+nl`8gmv<)U9Ow&tz*#wiA?^#HFfH z+vMH5r6)^2;hD_$mV5HMdsUPBH-HrKOy2OCceD1ZM$PFPY#4c_2k0?!Y(F5(sH}|O gPY3darcbbEWZTYf&KS)xeWwCcC0J%VQ07(v05LFBx&QzG diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index fba8ee99a..a72ee57a5 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -160,7 +160,7 @@ message SetShardModeRequest { // Request body structure. message Body { // ID of the shard. - bytes shard_ID = 1; + repeated bytes shard_ID = 1; // Mode that requested to be set. ShardMode mode = 2; @@ -294,7 +294,7 @@ message EvacuateShardRequest { // Request body structure. message Body { // ID of the shard. - bytes shard_ID = 1; + repeated bytes shard_ID = 1; // Flag indicating whether object read errors should be ignored. bool ignore_errors = 2; @@ -320,7 +320,7 @@ message FlushCacheRequest { // Request body structure. message Body { // ID of the shard. - bytes shard_ID = 1; + repeated bytes shard_ID = 1; } Body body = 1; diff --git a/pkg/services/control/service_neofs.pb.go b/pkg/services/control/service_neofs.pb.go index 00979f012d55631afae8a0366fd9adfc4fd478f1..94b9fcc6b86f3110b3e1c2c66e7d861d7d76a31e 100644 GIT binary patch delta 91 zcmbO*oB6O->?j delta 103 zcmX>wn|Z=)<_(HLld~uAPZmg*oXo{7vN<)Vg=unho%Q7XysDc6Lzx5MveiMHn|qfg rLS*%`csARvNDzX`a&O+h%L^o