Remove sync evacuation code #1549

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:feat/drop_sync_evacuation into master 2024-12-11 06:03:30 +00:00
10 changed files with 225 additions and 401 deletions

View file

@ -1,56 +0,0 @@
package control
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"github.com/spf13/cobra"
)
const ignoreErrorsFlag = "no-errors"
var evacuateShardCmd = &cobra.Command{
Use: "evacuate",
Short: "Evacuate objects from shard",
Long: "Evacuate objects from shard to other shards",
Run: evacuateShard,
Deprecated: "use frostfs-cli control shards evacuation start",
}
func evacuateShard(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)}
req.Body.Shard_ID = getShardIDList(cmd)
req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(ignoreErrorsFlag)
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.EvacuateShardResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.EvacuateShard(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
cmd.Printf("Objects moved: %d\n", resp.GetBody().GetCount())
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
cmd.Println("Shard has successfully been evacuated.")
}
func initControlEvacuateShardCmd() {
initControlFlags(evacuateShardCmd)
flags := evacuateShardCmd.Flags()
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
flags.Bool(shardAllFlag, false, "Process all shards")
flags.Bool(ignoreErrorsFlag, false, "Skip invalid/unreadable objects")
evacuateShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -21,6 +21,7 @@ const (
noProgressFlag = "no-progress"
scopeFlag = "scope"
repOneOnlyFlag = "rep-one-only"
ignoreErrorsFlag = "no-errors"
containerWorkerCountFlag = "container-worker-count"
objectWorkerCountFlag = "object-worker-count"

View file

@ -13,7 +13,6 @@ var shardsCmd = &cobra.Command{
func initControlShardsCmd() {
shardsCmd.AddCommand(listShardsCmd)
shardsCmd.AddCommand(setShardModeCmd)
shardsCmd.AddCommand(evacuateShardCmd)
shardsCmd.AddCommand(evacuationShardCmd)
shardsCmd.AddCommand(flushCacheCmd)
shardsCmd.AddCommand(doctorCmd)
@ -23,7 +22,6 @@ func initControlShardsCmd() {
initControlShardsListCmd()
initControlSetShardModeCmd()
initControlEvacuateShardCmd()
initControlEvacuationShardCmd()
initControlFlushCacheCmd()
initControlDoctorCmd()

View file

@ -86,7 +86,6 @@ type EvacuateShardPrm struct {
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error)
TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error)
IgnoreErrors bool
Async bool
Scope EvacuateScope
RepOneOnly bool
@ -211,10 +210,10 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) {
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) error {
select {
case <-ctx.Done():
return nil, ctx.Err()
return ctx.Err()
default:
}
@ -226,7 +225,6 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
))
@ -234,7 +232,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
shards, err := e.getActualShards(shardIDs, prm)
if err != nil {
return nil, err
return err
}
shardsToEvacuate := make(map[string]*shard.Shard)
@ -247,10 +245,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
}
res := NewEvacuateShardRes()
ctx = ctxOrBackground(ctx, prm.Async)
eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
ctx = context.WithoutCancel(ctx)
eg, ctx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
if err != nil {
return nil, err
return err
}
var mtx sync.RWMutex
@ -262,21 +260,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
return t
}
eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate)
return e.evacuateShards(ctx, shardIDs, prm, res, copyShards, shardsToEvacuate)
})
if prm.Async {
return nil, nil
}
return res, eg.Wait()
}
func ctxOrBackground(ctx context.Context, background bool) context.Context {
if background {
return context.Background()
}
return ctx
return nil
}
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
@ -286,7 +273,6 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
attribute.Bool("repOneOnly", prm.RepOneOnly),

View file

@ -140,16 +140,17 @@ func TestEvacuateShardObjects(t *testing.T) {
prm.Scope = EvacuateScopeObjects
t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, ErrMustBeReadOnly)
require.Equal(t, uint64(0), res.ObjectsEvacuated())
})
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(objPerShard), res.ObjectsEvacuated())
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(objPerShard), st.ObjectsEvacuated())
// We check that all objects are available both before and after shard removal.
// First case is a real-world use-case. It ensures that an object can be put in presense
@ -186,9 +187,10 @@ func TestEvacuateShardObjects(t *testing.T) {
}
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(0), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(0), st.ObjectsEvacuated())
checkHasObjects(t)
@ -200,6 +202,17 @@ func TestEvacuateShardObjects(t *testing.T) {
checkHasObjects(t)
}
func testWaitForEvacuationCompleted(t *testing.T, e *StorageEngine) *EvacuationState {
var st *EvacuationState
var err error
require.Eventually(t, func() bool {
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err)
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond)
return st
}
func TestEvacuateObjectsNetwork(t *testing.T) {
t.Parallel()
@ -242,15 +255,15 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm)
err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, uint64(0), res.ObjectsEvacuated())
prm.ObjectsHandler = acceptOneOf(objects, 2)
res, err = e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, uint64(2), st.ObjectsEvacuated())
})
t.Run("multiple shards, evacuate one", func(t *testing.T) {
t.Parallel()
@ -267,16 +280,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, 2)
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, uint64(2), st.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, 3)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(3), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(3), st.ObjectsEvacuated())
})
})
t.Run("multiple shards, evacuate many", func(t *testing.T) {
@ -305,16 +320,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, totalCount-1)
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, totalCount-1, res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, totalCount-1, st.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, totalCount, res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, totalCount, st.ObjectsEvacuated())
})
})
}
@ -344,9 +361,8 @@ func TestEvacuateCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
res, err := e.Evacuate(ctx, prm)
err := e.Evacuate(ctx, prm)
require.ErrorContains(t, err, "context canceled")
require.Equal(t, uint64(0), res.ObjectsEvacuated())
}
func TestEvacuateCancellationByError(t *testing.T) {
@ -375,8 +391,9 @@ func TestEvacuateCancellationByError(t *testing.T) {
prm.ObjectWorkerCount = 2
prm.ContainerWorkerCount = 2
_, err := e.Evacuate(context.Background(), prm)
require.ErrorContains(t, err, "test error")
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Contains(t, st.ErrorMessage(), "test error")
}
func TestEvacuateSingleProcess(t *testing.T) {
@ -406,20 +423,19 @@ func TestEvacuateSingleProcess(t *testing.T) {
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed")
return nil
})
eg.Go(func() error {
<-running
res, err := e.Evacuate(egCtx, prm)
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
require.Equal(t, uint64(0), res.ObjectsEvacuated())
require.ErrorContains(t, e.Evacuate(egCtx, prm), "evacuate is already running for shard ids", "second evacuation not failed")
close(blocker)
return nil
})
require.NoError(t, eg.Wait())
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.ObjectsEvacuated())
require.Equal(t, st.ErrorMessage(), "")
}
func TestEvacuateObjectsAsync(t *testing.T) {
@ -458,9 +474,9 @@ func TestEvacuateObjectsAsync(t *testing.T) {
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed")
st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
return nil
})
@ -483,12 +499,7 @@ func TestEvacuateObjectsAsync(t *testing.T) {
close(blocker)
require.Eventually(t, func() bool {
st, err = e.GetEvacuationState(context.Background())
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
require.NoError(t, err, "get final state failed")
st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
@ -534,14 +545,9 @@ func TestEvacuateTreesLocal(t *testing.T) {
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm)
require.NotNil(t, res, "sync evacuation result must be not nil")
require.NoError(t, err, "evacuation failed")
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed")
st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
@ -632,15 +638,9 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm)
require.NotNil(t, res, "sync evacuation must return not nil")
require.NoError(t, err, "evacuation failed")
require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed")
st = testWaitForEvacuationCompleted(t, e)
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
@ -754,11 +754,12 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(4), res.ObjectsEvacuated())
require.Equal(t, uint64(8), res.ObjectsSkipped())
require.Equal(t, uint64(0), res.ObjectsFailed())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, "", st.ErrorMessage())
require.Equal(t, uint64(4), st.ObjectsEvacuated())
require.Equal(t, uint64(8), st.ObjectsSkipped())
require.Equal(t, uint64(0), st.ObjectsFailed())
}
func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
@ -812,7 +813,8 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
start := time.Now()
_, err := e.Evacuate(context.Background(), prm)
err := e.Evacuate(context.Background(), prm)
testWaitForEvacuationCompleted(t, e)
t.Logf("evacuate took %v\n", time.Since(start))
require.NoError(t, err)
}

View file

@ -15,7 +15,6 @@ const (
rpcListShards = "ListShards"
rpcSetShardMode = "SetShardMode"
rpcSynchronizeTree = "SynchronizeTree"
rpcEvacuateShard = "EvacuateShard"
rpcStartShardEvacuation = "StartShardEvacuation"
rpcGetShardEvacuationStatus = "GetShardEvacuationStatus"
rpcResetShardEvacuationStatus = "ResetShardEvacuationStatus"
@ -162,19 +161,6 @@ func SynchronizeTree(cli *client.Client, req *SynchronizeTreeRequest, opts ...cl
return wResp.message, nil
}
// EvacuateShard executes ControlService.EvacuateShard RPC.
func EvacuateShard(cli *client.Client, req *EvacuateShardRequest, opts ...client.CallOption) (*EvacuateShardResponse, error) {
wResp := newResponseWrapper[EvacuateShardResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcEvacuateShard), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}
// StartShardEvacuation executes ControlService.StartShardEvacuation RPC.
func StartShardEvacuation(cli *client.Client, req *StartShardEvacuationRequest, opts ...client.CallOption) (*StartShardEvacuationResponse, error) {
wResp := newResponseWrapper[StartShardEvacuationResponse]()

View file

@ -1,188 +0,0 @@
package control
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var errFailedToBuildListOfContainerNodes = errors.New("can't build a list of container nodes")
func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRequest) (*control.EvacuateShardResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
prm := engine.EvacuateShardPrm{
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject,
Scope: engine.EvacuateScopeObjects,
}
res, err := s.s.Evacuate(ctx, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.EvacuateShardResponse{
Body: &control.EvacuateShardResponse_Body{
Count: uint32(res.ObjectsEvacuated()),
},
}
err = ctrlmessage.Sign(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}
func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
cid, ok := obj.ContainerID()
if !ok {
// Return nil to prevent situations where a shard can't be evacuated
// because of a single bad/corrupted object.
return false, nil
}
nodes, err := s.getContainerNodes(cid)
if err != nil {
return false, err
}
if len(nodes) == 0 {
return false, nil
}
var res replicatorResult
task := replicator.Task{
NumCopies: 1,
Addr: addr,
Obj: obj,
Nodes: nodes,
}
s.replicator.HandleReplicationTask(ctx, task, &res)
if res.count == 0 {
return false, errors.New("object was not replicated")
}
return true, nil
}
func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (bool, string, error) {
nodes, err := s.getContainerNodes(contID)
if err != nil {
return false, "", err
}
if len(nodes) == 0 {
return false, "", nil
}
for _, node := range nodes {
err = s.replicateTreeToNode(ctx, forest, contID, treeID, node)
if err == nil {
return true, hex.EncodeToString(node.PublicKey()), nil
}
}
return false, "", err
}
func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error {
rawCID := make([]byte, sha256.Size)
contID.Encode(rawCID)
var height uint64
for {
op, err := forest.TreeGetOpLog(ctx, contID, treeID, height)
if err != nil {
return err
}
if op.Time == 0 {
return nil
}
req := &tree.ApplyRequest{
Body: &tree.ApplyRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Operation: &tree.LogMove{
ParentId: op.Parent,
Meta: op.Meta.Bytes(),
ChildId: op.Child,
},
},
}
err = tree.SignMessage(req, s.key)
if err != nil {
return fmt.Errorf("can't message apply request: %w", err)
}
err = s.treeService.ReplicateTreeOp(ctx, node, req)
if err != nil {
return err
}
height = op.Time + 1
}
}
func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) {
nm, err := s.netMapSrc.GetNetMap(0)
if err != nil {
return nil, err
}
c, err := s.cnrSrc.Get(contID)
if err != nil {
return nil, err
}
binCnr := make([]byte, sha256.Size)
contID.Encode(binCnr)
ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
if err != nil {
return nil, errFailedToBuildListOfContainerNodes
}
nodes := placement.FlattenNodes(ns)
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
for i := 0; i < len(nodes); i++ { // don't use range, slice mutates in body
if bytes.Equal(nodes[i].PublicKey(), bs) {
copy(nodes[i:], nodes[i+1:])
nodes = nodes[:len(nodes)-1]
}
}
return nodes, nil
}
type replicatorResult struct {
count int
}
// SubmitSuccessfulReplication implements the replicator.TaskResult interface.
func (r *replicatorResult) SubmitSuccessfulReplication(_ netmap.NodeInfo) {
r.count++
}

View file

@ -1,17 +1,32 @@
package control
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var errFailedToBuildListOfContainerNodes = errors.New("can't build a list of container nodes")
func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) {
err := s.isValidRequest(req)
if err != nil {
@ -27,15 +42,13 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject,
TreeHandler: s.replicateTree,
Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
RepOneOnly: req.GetBody().GetRepOneOnly(),
}
_, err = s.s.Evacuate(ctx, prm)
if err != nil {
if err = s.s.Evacuate(ctx, prm); err != nil {
var logicalErr logicerr.Logical
if errors.As(err, &logicalErr) {
return nil, status.Error(codes.Aborted, err.Error())
@ -135,3 +148,133 @@ func (s *Server) ResetShardEvacuationStatus(ctx context.Context, req *control.Re
}
return resp, nil
}
func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
cid, ok := obj.ContainerID()
if !ok {
// Return nil to prevent situations where a shard can't be evacuated
// because of a single bad/corrupted object.
return false, nil
}
nodes, err := s.getContainerNodes(cid)
if err != nil {
return false, err
}
if len(nodes) == 0 {
return false, nil
}
var res replicatorResult
task := replicator.Task{
NumCopies: 1,
Addr: addr,
Obj: obj,
Nodes: nodes,
}
s.replicator.HandleReplicationTask(ctx, task, &res)
if res.count == 0 {
return false, errors.New("object was not replicated")
}
return true, nil
}
func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (bool, string, error) {
nodes, err := s.getContainerNodes(contID)
if err != nil {
return false, "", err
}
if len(nodes) == 0 {
return false, "", nil
}
for _, node := range nodes {
err = s.replicateTreeToNode(ctx, forest, contID, treeID, node)
if err == nil {
return true, hex.EncodeToString(node.PublicKey()), nil
}
}
return false, "", err
}
func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error {
rawCID := make([]byte, sha256.Size)
contID.Encode(rawCID)
var height uint64
for {
op, err := forest.TreeGetOpLog(ctx, contID, treeID, height)
if err != nil {
return err
}
if op.Time == 0 {
return nil
}
req := &tree.ApplyRequest{
Body: &tree.ApplyRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Operation: &tree.LogMove{
ParentId: op.Parent,
Meta: op.Meta.Bytes(),
ChildId: op.Child,
},
},
}
err = tree.SignMessage(req, s.key)
if err != nil {
return fmt.Errorf("can't message apply request: %w", err)
}
err = s.treeService.ReplicateTreeOp(ctx, node, req)
if err != nil {
return err
}
height = op.Time + 1
}
}
func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) {
nm, err := s.netMapSrc.GetNetMap(0)
if err != nil {
return nil, err
}
c, err := s.cnrSrc.Get(contID)
if err != nil {
return nil, err
}
binCnr := make([]byte, sha256.Size)
contID.Encode(binCnr)
ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
if err != nil {
return nil, errFailedToBuildListOfContainerNodes
}
nodes := placement.FlattenNodes(ns)
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
for i := 0; i < len(nodes); i++ { // don't use range, slice mutates in body
if bytes.Equal(nodes[i].PublicKey(), bs) {
copy(nodes[i:], nodes[i+1:])
nodes = nodes[:len(nodes)-1]
}
}
return nodes, nil
}
type replicatorResult struct {
count int
}
// SubmitSuccessfulReplication implements the replicator.TaskResult interface.
func (r *replicatorResult) SubmitSuccessfulReplication(_ netmap.NodeInfo) {
r.count++
}

View file

@ -30,11 +30,6 @@ service ControlService {
// Synchronizes all log operations for the specified tree.
rpc SynchronizeTree(SynchronizeTreeRequest) returns (SynchronizeTreeResponse);
// EvacuateShard moves all data from one shard to the others.
// Deprecated: Use
// StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
rpc EvacuateShard(EvacuateShardRequest) returns (EvacuateShardResponse);
// StartShardEvacuation starts moving all data from one shard to the others.
rpc StartShardEvacuation(StartShardEvacuationRequest)
returns (StartShardEvacuationResponse);

View file

@ -26,7 +26,6 @@ const (
ControlService_ListShards_FullMethodName = "/control.ControlService/ListShards"
ControlService_SetShardMode_FullMethodName = "/control.ControlService/SetShardMode"
ControlService_SynchronizeTree_FullMethodName = "/control.ControlService/SynchronizeTree"
ControlService_EvacuateShard_FullMethodName = "/control.ControlService/EvacuateShard"
ControlService_StartShardEvacuation_FullMethodName = "/control.ControlService/StartShardEvacuation"
ControlService_GetShardEvacuationStatus_FullMethodName = "/control.ControlService/GetShardEvacuationStatus"
ControlService_ResetShardEvacuationStatus_FullMethodName = "/control.ControlService/ResetShardEvacuationStatus"
@ -62,10 +61,6 @@ type ControlServiceClient interface {
SetShardMode(ctx context.Context, in *SetShardModeRequest, opts ...grpc.CallOption) (*SetShardModeResponse, error)
// Synchronizes all log operations for the specified tree.
SynchronizeTree(ctx context.Context, in *SynchronizeTreeRequest, opts ...grpc.CallOption) (*SynchronizeTreeResponse, error)
// EvacuateShard moves all data from one shard to the others.
// Deprecated: Use
// StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
EvacuateShard(ctx context.Context, in *EvacuateShardRequest, opts ...grpc.CallOption) (*EvacuateShardResponse, error)
// StartShardEvacuation starts moving all data from one shard to the others.
StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error)
// GetShardEvacuationStatus returns evacuation status.
@ -173,15 +168,6 @@ func (c *controlServiceClient) SynchronizeTree(ctx context.Context, in *Synchron
return out, nil
}
func (c *controlServiceClient) EvacuateShard(ctx context.Context, in *EvacuateShardRequest, opts ...grpc.CallOption) (*EvacuateShardResponse, error) {
out := new(EvacuateShardResponse)
err := c.cc.Invoke(ctx, ControlService_EvacuateShard_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlServiceClient) StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error) {
out := new(StartShardEvacuationResponse)
err := c.cc.Invoke(ctx, ControlService_StartShardEvacuation_FullMethodName, in, out, opts...)
@ -335,10 +321,6 @@ type ControlServiceServer interface {
SetShardMode(context.Context, *SetShardModeRequest) (*SetShardModeResponse, error)
// Synchronizes all log operations for the specified tree.
SynchronizeTree(context.Context, *SynchronizeTreeRequest) (*SynchronizeTreeResponse, error)
// EvacuateShard moves all data from one shard to the others.
// Deprecated: Use
// StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
EvacuateShard(context.Context, *EvacuateShardRequest) (*EvacuateShardResponse, error)
// StartShardEvacuation starts moving all data from one shard to the others.
StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error)
// GetShardEvacuationStatus returns evacuation status.
@ -400,9 +382,6 @@ func (UnimplementedControlServiceServer) SetShardMode(context.Context, *SetShard
func (UnimplementedControlServiceServer) SynchronizeTree(context.Context, *SynchronizeTreeRequest) (*SynchronizeTreeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SynchronizeTree not implemented")
}
func (UnimplementedControlServiceServer) EvacuateShard(context.Context, *EvacuateShardRequest) (*EvacuateShardResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method EvacuateShard not implemented")
}
func (UnimplementedControlServiceServer) StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StartShardEvacuation not implemented")
}
@ -586,24 +565,6 @@ func _ControlService_SynchronizeTree_Handler(srv interface{}, ctx context.Contex
return interceptor(ctx, in, info, handler)
}
func _ControlService_EvacuateShard_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(EvacuateShardRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlServiceServer).EvacuateShard(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlService_EvacuateShard_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).EvacuateShard(ctx, req.(*EvacuateShardRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlService_StartShardEvacuation_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StartShardEvacuationRequest)
if err := dec(in); err != nil {
@ -909,10 +870,6 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
MethodName: "SynchronizeTree",
Handler: _ControlService_SynchronizeTree_Handler,
},
{
MethodName: "EvacuateShard",
Handler: _ControlService_EvacuateShard_Handler,
},
{
MethodName: "StartShardEvacuation",
Handler: _ControlService_StartShardEvacuation_Handler,