[#1867] services/control: Allow to provide multiple shard IDs to some commands

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
Evgenii Stratonikov 2022-10-10 20:54:14 +03:00 committed by fyrchik
parent 74d2f2c8d3
commit 19c0a74e94
14 changed files with 163 additions and 96 deletions

View file

@ -20,7 +20,7 @@ func evacuateShard(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)}
req.Body.Shard_ID = getShardID(cmd)
req.Body.Shard_ID = [][]byte{getShardID(cmd)}
req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(dumpIgnoreErrorsFlag)
signRequest(cmd, pk, req)

View file

@ -20,7 +20,7 @@ func flushCache(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.FlushCacheRequest{Body: new(control.FlushCacheRequest_Body)}
req.Body.Shard_ID = getShardID(cmd)
req.Body.Shard_ID = [][]byte{getShardID(cmd)}
signRequest(cmd, pk, req)

View file

@ -71,7 +71,7 @@ func setShardMode(cmd *cobra.Command, _ []string) {
req.SetBody(body)
body.SetMode(mode)
body.SetShardID(getShardID(cmd))
body.SetShardIDList([][]byte{getShardID(cmd)})
reset, _ := cmd.Flags().GetBool(shardClearErrorsFlag)
body.ClearErrorCounter(reset)

View file

@ -15,7 +15,7 @@ import (
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct {
shardID *shard.ID
shardID []*shard.ID
handler func(oid.Address, *objectSDK.Object) error
ignoreErrors bool
}
@ -25,8 +25,8 @@ type EvacuateShardRes struct {
count int
}
// WithShardID sets shard ID.
func (p *EvacuateShardPrm) WithShardID(id *shard.ID) {
// WithShardIDList sets shard ID.
func (p *EvacuateShardPrm) WithShardIDList(id []*shard.ID) {
p.shardID = id
}
@ -53,29 +53,34 @@ type pooledShard struct {
pool util.WorkerPool
}
var errMustHaveTwoShards = errors.New("amount of shards must be > 2")
var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) {
sid := prm.shardID.String()
sidList := make([]string, len(prm.shardID))
for i := range prm.shardID {
sidList[i] = prm.shardID[i].String()
}
e.mtx.RLock()
sh, ok := e.shards[sid]
for i := range sidList {
sh, ok := e.shards[sidList[i]]
if !ok {
e.mtx.RUnlock()
return EvacuateShardRes{}, errShardNotFound
}
if len(e.shards) < 2 && prm.handler == nil {
e.mtx.RUnlock()
return EvacuateShardRes{}, errMustHaveTwoShards
}
if !sh.GetMode().ReadOnly() {
e.mtx.RUnlock()
return EvacuateShardRes{}, shard.ErrMustBeReadOnly
}
}
if len(e.shards)-len(sidList) < 1 && prm.handler == nil {
e.mtx.RUnlock()
return EvacuateShardRes{}, errMustHaveTwoShards
}
// We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase.
@ -94,20 +99,34 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
weights = append(weights, e.shardWeight(shards[i].Shard))
}
shardMap := make(map[string]*shard.Shard)
for i := range sidList {
for j := range shards {
if shards[j].ID().String() == sidList[i] {
shardMap[sidList[i]] = shards[j].Shard
}
}
}
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
var c *meta.Cursor
var res EvacuateShardRes
mainLoop:
for n := range sidList {
sh := shardMap[sidList[n]]
var c *meta.Cursor
for {
listPrm.WithCursor(c)
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.Shard.ListWithCursor(listPrm)
listRes, err := sh.ListWithCursor(listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) {
return res, nil
continue mainLoop
}
return res, err
}
@ -130,14 +149,14 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(lst[i].EncodeToString())))
for j := range shards {
if shards[j].ID().String() == sid {
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, lst[i], getRes.Object())
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
zap.String("from", sid),
zap.String("from", sidList[n]),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", lst[i]))
@ -163,3 +182,6 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
c = listRes.Cursor()
}
}
return res, nil
}

View file

@ -90,7 +90,7 @@ func TestEvacuateShard(t *testing.T) {
checkHasObjects(t)
var prm EvacuateShardPrm
prm.WithShardID(ids[2])
prm.WithShardIDList(ids[2:3])
t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(prm)
@ -154,7 +154,7 @@ func TestEvacuateNetwork(t *testing.T) {
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.shardID = ids[0]
prm.shardID = ids[0:1]
res, err := e.Evacuate(prm)
require.ErrorIs(t, err, errMustHaveTwoShards)
@ -166,14 +166,14 @@ func TestEvacuateNetwork(t *testing.T) {
require.ErrorIs(t, err, errReplication)
require.Equal(t, 2, res.Count())
})
t.Run("multiple shards", func(t *testing.T) {
t.Run("multiple shards, evacuate one", func(t *testing.T) {
e, ids, objects := newEngineEvacuate(t, 2, 3)
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.shardID = ids[1]
prm.shardID = ids[1:2]
prm.handler = acceptOneOf(objects, 2)
res, err := e.Evacuate(prm)
@ -188,4 +188,36 @@ func TestEvacuateNetwork(t *testing.T) {
require.Equal(t, 3, res.Count())
})
})
t.Run("multiple shards, evacuate many", func(t *testing.T) {
e, ids, objects := newEngineEvacuate(t, 4, 5)
evacuateIDs := ids[0:3]
var totalCount int
for i := range evacuateIDs {
res, err := e.shards[ids[i].String()].List()
require.NoError(t, err)
totalCount += len(res.AddressList())
}
for i := range ids {
require.NoError(t, e.shards[ids[i].String()].SetMode(mode.ReadOnly))
}
var prm EvacuateShardPrm
prm.shardID = evacuateIDs
prm.handler = acceptOneOf(objects, totalCount-1)
res, err := e.Evacuate(prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, totalCount-1, res.Count())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(prm)
require.NoError(t, err)
require.Equal(t, totalCount, res.Count())
})
})
}

View file

@ -9,7 +9,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
@ -26,10 +25,8 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ
return nil, status.Error(codes.PermissionDenied, err.Error())
}
shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID())
var prm engine.EvacuateShardPrm
prm.WithShardID(shardID)
prm.WithShardIDList(getShardIDList(req.GetBody().GetShard_ID()))
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
prm.WithFaultHandler(s.replicate)

View file

@ -4,7 +4,6 @@ import (
"context"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -16,8 +15,7 @@ func (s *Server) FlushCache(_ context.Context, req *control.FlushCacheRequest) (
return nil, status.Error(codes.PermissionDenied, err.Error())
}
shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID())
for _, shardID := range getShardIDList(req.GetBody().GetShard_ID()) {
var prm engine.FlushWriteCachePrm
prm.SetShardID(shardID)
@ -25,6 +23,7 @@ func (s *Server) FlushCache(_ context.Context, req *control.FlushCacheRequest) (
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
resp := &control.FlushCacheResponse{Body: &control.FlushCacheResponse_Body{}}

View file

@ -0,0 +1,11 @@
package control
import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
func getShardIDList(raw [][]byte) []*shard.ID {
res := make([]*shard.ID, 0, len(raw))
for i := range raw {
res = append(res, shard.NewIDFromBytes(raw[i]))
}
return res
}

View file

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
"google.golang.org/grpc/codes"
@ -22,7 +21,6 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
m mode.Mode
requestedMode = req.GetBody().GetMode()
requestedShard = shard.NewIDFromBytes(req.Body.GetShard_ID())
)
switch requestedMode {
@ -38,10 +36,12 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown shard mode: %s", requestedMode))
}
err = s.s.SetShardMode(requestedShard, m, false)
for _, shardID := range getShardIDList(req.Body.GetShard_ID()) {
err = s.s.SetShardMode(shardID, m, false)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
// create and fill response
resp := new(control.SetShardModeResponse)

View file

@ -91,8 +91,8 @@ func (x *ListShardsResponse) SetBody(v *ListShardsResponse_Body) {
}
}
// SetShardID sets shard ID whose mode is requested to be set.
func (x *SetShardModeRequest_Body) SetShardID(v []byte) {
// SetShardIDList sets shard ID whose mode is requested to be set.
func (x *SetShardModeRequest_Body) SetShardIDList(v [][]byte) {
if v != nil {
x.Shard_ID = v
}

Binary file not shown.

View file

@ -160,7 +160,7 @@ message SetShardModeRequest {
// Request body structure.
message Body {
// ID of the shard.
bytes shard_ID = 1;
repeated bytes shard_ID = 1;
// Mode that requested to be set.
ShardMode mode = 2;
@ -294,7 +294,7 @@ message EvacuateShardRequest {
// Request body structure.
message Body {
// ID of the shard.
bytes shard_ID = 1;
repeated bytes shard_ID = 1;
// Flag indicating whether object read errors should be ignored.
bool ignore_errors = 2;
@ -320,7 +320,7 @@ message FlushCacheRequest {
// Request body structure.
message Body {
// ID of the shard.
bytes shard_ID = 1;
repeated bytes shard_ID = 1;
}
Body body = 1;

Binary file not shown.

View file

@ -139,17 +139,23 @@ func TestSetShardModeRequest_Body_StableMarshal(t *testing.T) {
func generateSetShardModeRequestBody() *control.SetShardModeRequest_Body {
body := new(control.SetShardModeRequest_Body)
body.SetShardID([]byte{0, 1, 2, 3, 4})
body.SetShardIDList([][]byte{{0, 1, 2, 3, 4}})
body.SetMode(control.ShardMode_READ_WRITE)
return body
}
func equalSetShardModeRequestBodies(b1, b2 *control.SetShardModeRequest_Body) bool {
if b1.GetMode() != b2.GetMode() || !bytes.Equal(b1.Shard_ID, b2.Shard_ID) {
if b1.GetMode() != b2.GetMode() || len(b1.Shard_ID) != len(b2.Shard_ID) {
return false
}
for i := range b1.Shard_ID {
if !bytes.Equal(b1.Shard_ID[i], b2.Shard_ID[i]) {
return false
}
}
return true
}