Remove sync evacuation code #1549

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:feat/drop_sync_evacuation into master 2024-12-11 06:03:30 +00:00
10 changed files with 225 additions and 401 deletions

View file

@ -1,56 +0,0 @@
package control
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"github.com/spf13/cobra"
)
const ignoreErrorsFlag = "no-errors"
var evacuateShardCmd = &cobra.Command{
Use: "evacuate",
Short: "Evacuate objects from shard",
Long: "Evacuate objects from shard to other shards",
Run: evacuateShard,
Deprecated: "use frostfs-cli control shards evacuation start",
}
func evacuateShard(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)}
req.Body.Shard_ID = getShardIDList(cmd)
req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(ignoreErrorsFlag)
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.EvacuateShardResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.EvacuateShard(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
cmd.Printf("Objects moved: %d\n", resp.GetBody().GetCount())
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
cmd.Println("Shard has successfully been evacuated.")
}
func initControlEvacuateShardCmd() {
initControlFlags(evacuateShardCmd)
flags := evacuateShardCmd.Flags()
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
flags.Bool(shardAllFlag, false, "Process all shards")
flags.Bool(ignoreErrorsFlag, false, "Skip invalid/unreadable objects")
evacuateShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -21,6 +21,7 @@ const (
noProgressFlag = "no-progress" noProgressFlag = "no-progress"
scopeFlag = "scope" scopeFlag = "scope"
repOneOnlyFlag = "rep-one-only" repOneOnlyFlag = "rep-one-only"
ignoreErrorsFlag = "no-errors"
containerWorkerCountFlag = "container-worker-count" containerWorkerCountFlag = "container-worker-count"
objectWorkerCountFlag = "object-worker-count" objectWorkerCountFlag = "object-worker-count"

View file

@ -13,7 +13,6 @@ var shardsCmd = &cobra.Command{
func initControlShardsCmd() { func initControlShardsCmd() {
shardsCmd.AddCommand(listShardsCmd) shardsCmd.AddCommand(listShardsCmd)
shardsCmd.AddCommand(setShardModeCmd) shardsCmd.AddCommand(setShardModeCmd)
shardsCmd.AddCommand(evacuateShardCmd)
shardsCmd.AddCommand(evacuationShardCmd) shardsCmd.AddCommand(evacuationShardCmd)
shardsCmd.AddCommand(flushCacheCmd) shardsCmd.AddCommand(flushCacheCmd)
shardsCmd.AddCommand(doctorCmd) shardsCmd.AddCommand(doctorCmd)
@ -23,7 +22,6 @@ func initControlShardsCmd() {
initControlShardsListCmd() initControlShardsListCmd()
initControlSetShardModeCmd() initControlSetShardModeCmd()
initControlEvacuateShardCmd()
initControlEvacuationShardCmd() initControlEvacuationShardCmd()
initControlFlushCacheCmd() initControlFlushCacheCmd()
initControlDoctorCmd() initControlDoctorCmd()

View file

@ -86,7 +86,6 @@ type EvacuateShardPrm struct {
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error) ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error)
TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error) TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error)
IgnoreErrors bool IgnoreErrors bool
Async bool
Scope EvacuateScope Scope EvacuateScope
RepOneOnly bool RepOneOnly bool
@ -211,10 +210,10 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others. // Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode. // The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) { func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return ctx.Err()
default: default:
} }
@ -226,7 +225,6 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
trace.WithAttributes( trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs), attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope), attribute.Stringer("scope", prm.Scope),
)) ))
@ -234,7 +232,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
shards, err := e.getActualShards(shardIDs, prm) shards, err := e.getActualShards(shardIDs, prm)
if err != nil { if err != nil {
return nil, err return err
} }
shardsToEvacuate := make(map[string]*shard.Shard) shardsToEvacuate := make(map[string]*shard.Shard)
@ -247,10 +245,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
} }
res := NewEvacuateShardRes() res := NewEvacuateShardRes()
ctx = ctxOrBackground(ctx, prm.Async) ctx = context.WithoutCancel(ctx)
eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res) eg, ctx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
if err != nil { if err != nil {
return nil, err return err
} }
var mtx sync.RWMutex var mtx sync.RWMutex
@ -262,21 +260,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
return t return t
} }
eg.Go(func() error { eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate) return e.evacuateShards(ctx, shardIDs, prm, res, copyShards, shardsToEvacuate)
}) })
if prm.Async { return nil
return nil, nil
}
return res, eg.Wait()
}
func ctxOrBackground(ctx context.Context, background bool) context.Context {
if background {
return context.Background()
}
return ctx
} }
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
@ -286,7 +273,6 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
trace.WithAttributes( trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs), attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope), attribute.Stringer("scope", prm.Scope),
attribute.Bool("repOneOnly", prm.RepOneOnly), attribute.Bool("repOneOnly", prm.RepOneOnly),

View file

@ -140,16 +140,17 @@ func TestEvacuateShardObjects(t *testing.T) {
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
t.Run("must be read-only", func(t *testing.T) { t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm) err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, ErrMustBeReadOnly) require.ErrorIs(t, err, ErrMustBeReadOnly)
require.Equal(t, uint64(0), res.ObjectsEvacuated())
}) })
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm) err := e.Evacuate(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objPerShard), res.ObjectsEvacuated()) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(objPerShard), st.ObjectsEvacuated())
// We check that all objects are available both before and after shard removal. // We check that all objects are available both before and after shard removal.
// First case is a real-world use-case. It ensures that an object can be put in presense // First case is a real-world use-case. It ensures that an object can be put in presense
@ -186,9 +187,10 @@ func TestEvacuateShardObjects(t *testing.T) {
} }
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done. // Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.NoError(t, err) st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(0), res.ObjectsEvacuated()) require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(0), st.ObjectsEvacuated())
checkHasObjects(t) checkHasObjects(t)
@ -200,6 +202,17 @@ func TestEvacuateShardObjects(t *testing.T) {
checkHasObjects(t) checkHasObjects(t)
} }
func testWaitForEvacuationCompleted(t *testing.T, e *StorageEngine) *EvacuationState {
var st *EvacuationState
var err error
require.Eventually(t, func() bool {
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err)
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond)
return st
}
func TestEvacuateObjectsNetwork(t *testing.T) { func TestEvacuateObjectsNetwork(t *testing.T) {
t.Parallel() t.Parallel()
@ -242,15 +255,15 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ShardID = ids[0:1] prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards) require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, uint64(0), res.ObjectsEvacuated())
prm.ObjectsHandler = acceptOneOf(objects, 2) prm.ObjectsHandler = acceptOneOf(objects, 2)
res, err = e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.ErrorIs(t, err, errReplication) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(2), res.ObjectsEvacuated()) require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, uint64(2), st.ObjectsEvacuated())
}) })
t.Run("multiple shards, evacuate one", func(t *testing.T) { t.Run("multiple shards, evacuate one", func(t *testing.T) {
t.Parallel() t.Parallel()
@ -267,16 +280,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, 2) prm.ObjectsHandler = acceptOneOf(objects, 2)
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.ErrorIs(t, err, errReplication) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(2), res.ObjectsEvacuated()) require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, uint64(2), st.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) { t.Run("no errors", func(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, 3) prm.ObjectsHandler = acceptOneOf(objects, 3)
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.NoError(t, err) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), res.ObjectsEvacuated()) require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(3), st.ObjectsEvacuated())
}) })
}) })
t.Run("multiple shards, evacuate many", func(t *testing.T) { t.Run("multiple shards, evacuate many", func(t *testing.T) {
@ -305,16 +320,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, totalCount-1) prm.ObjectsHandler = acceptOneOf(objects, totalCount-1)
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.ErrorIs(t, err, errReplication) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, totalCount-1, res.ObjectsEvacuated()) require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, totalCount-1, st.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) { t.Run("no errors", func(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, totalCount) prm.ObjectsHandler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.NoError(t, err) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, totalCount, res.ObjectsEvacuated()) require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, totalCount, st.ObjectsEvacuated())
}) })
}) })
} }
@ -344,9 +361,8 @@ func TestEvacuateCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
res, err := e.Evacuate(ctx, prm) err := e.Evacuate(ctx, prm)
require.ErrorContains(t, err, "context canceled") require.ErrorContains(t, err, "context canceled")
require.Equal(t, uint64(0), res.ObjectsEvacuated())
} }
func TestEvacuateCancellationByError(t *testing.T) { func TestEvacuateCancellationByError(t *testing.T) {
@ -375,8 +391,9 @@ func TestEvacuateCancellationByError(t *testing.T) {
prm.ObjectWorkerCount = 2 prm.ObjectWorkerCount = 2
prm.ContainerWorkerCount = 2 prm.ContainerWorkerCount = 2
_, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.ErrorContains(t, err, "test error") st := testWaitForEvacuationCompleted(t, e)
require.Contains(t, st.ErrorMessage(), "test error")
} }
func TestEvacuateSingleProcess(t *testing.T) { func TestEvacuateSingleProcess(t *testing.T) {
@ -406,20 +423,19 @@ func TestEvacuateSingleProcess(t *testing.T) {
eg, egCtx := errgroup.WithContext(context.Background()) eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error { eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm) require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed")
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.ObjectsEvacuated())
return nil return nil
}) })
eg.Go(func() error { eg.Go(func() error {
<-running <-running
res, err := e.Evacuate(egCtx, prm) require.ErrorContains(t, e.Evacuate(egCtx, prm), "evacuate is already running for shard ids", "second evacuation not failed")
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
require.Equal(t, uint64(0), res.ObjectsEvacuated())
close(blocker) close(blocker)
return nil return nil
}) })
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.ObjectsEvacuated())
require.Equal(t, st.ErrorMessage(), "")
} }
func TestEvacuateObjectsAsync(t *testing.T) { func TestEvacuateObjectsAsync(t *testing.T) {
@ -458,9 +474,9 @@ func TestEvacuateObjectsAsync(t *testing.T) {
eg, egCtx := errgroup.WithContext(context.Background()) eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error { eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm) require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed")
require.NoError(t, err, "first evacuation failed") st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), res.ObjectsEvacuated()) require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
return nil return nil
}) })
@ -483,12 +499,7 @@ func TestEvacuateObjectsAsync(t *testing.T) {
close(blocker) close(blocker)
require.Eventually(t, func() bool { st = testWaitForEvacuationCompleted(t, e)
st, err = e.GetEvacuationState(context.Background())
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count") require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
require.NotNil(t, st.StartedAt(), "invalid final started at") require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at") require.NotNil(t, st.FinishedAt(), "invalid final finished at")
@ -534,14 +545,9 @@ func TestEvacuateTreesLocal(t *testing.T) {
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message") require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed")
require.NotNil(t, res, "sync evacuation result must be not nil")
require.NoError(t, err, "evacuation failed")
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count") require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count") require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
@ -632,15 +638,9 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message") require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed")
require.NotNil(t, res, "sync evacuation must return not nil") st = testWaitForEvacuationCompleted(t, e)
require.NoError(t, err, "evacuation failed")
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count") require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count") require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
@ -754,11 +754,12 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.NoError(t, err) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(4), res.ObjectsEvacuated()) require.Equal(t, "", st.ErrorMessage())
require.Equal(t, uint64(8), res.ObjectsSkipped()) require.Equal(t, uint64(4), st.ObjectsEvacuated())
require.Equal(t, uint64(0), res.ObjectsFailed()) require.Equal(t, uint64(8), st.ObjectsSkipped())
require.Equal(t, uint64(0), st.ObjectsFailed())
} }
func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) { func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
@ -812,7 +813,8 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
start := time.Now() start := time.Now()
_, err := e.Evacuate(context.Background(), prm) err := e.Evacuate(context.Background(), prm)
testWaitForEvacuationCompleted(t, e)
t.Logf("evacuate took %v\n", time.Since(start)) t.Logf("evacuate took %v\n", time.Since(start))
require.NoError(t, err) require.NoError(t, err)
} }

View file

@ -15,7 +15,6 @@ const (
rpcListShards = "ListShards" rpcListShards = "ListShards"
rpcSetShardMode = "SetShardMode" rpcSetShardMode = "SetShardMode"
rpcSynchronizeTree = "SynchronizeTree" rpcSynchronizeTree = "SynchronizeTree"
rpcEvacuateShard = "EvacuateShard"
rpcStartShardEvacuation = "StartShardEvacuation" rpcStartShardEvacuation = "StartShardEvacuation"
rpcGetShardEvacuationStatus = "GetShardEvacuationStatus" rpcGetShardEvacuationStatus = "GetShardEvacuationStatus"
rpcResetShardEvacuationStatus = "ResetShardEvacuationStatus" rpcResetShardEvacuationStatus = "ResetShardEvacuationStatus"
@ -162,19 +161,6 @@ func SynchronizeTree(cli *client.Client, req *SynchronizeTreeRequest, opts ...cl
return wResp.message, nil return wResp.message, nil
} }
// EvacuateShard executes ControlService.EvacuateShard RPC.
func EvacuateShard(cli *client.Client, req *EvacuateShardRequest, opts ...client.CallOption) (*EvacuateShardResponse, error) {
wResp := newResponseWrapper[EvacuateShardResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcEvacuateShard), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}
// StartShardEvacuation executes ControlService.StartShardEvacuation RPC. // StartShardEvacuation executes ControlService.StartShardEvacuation RPC.
func StartShardEvacuation(cli *client.Client, req *StartShardEvacuationRequest, opts ...client.CallOption) (*StartShardEvacuationResponse, error) { func StartShardEvacuation(cli *client.Client, req *StartShardEvacuationRequest, opts ...client.CallOption) (*StartShardEvacuationResponse, error) {
wResp := newResponseWrapper[StartShardEvacuationResponse]() wResp := newResponseWrapper[StartShardEvacuationResponse]()

View file

@ -1,188 +0,0 @@
package control
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var errFailedToBuildListOfContainerNodes = errors.New("can't build a list of container nodes")
func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRequest) (*control.EvacuateShardResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
prm := engine.EvacuateShardPrm{
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject,
Scope: engine.EvacuateScopeObjects,
}
res, err := s.s.Evacuate(ctx, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.EvacuateShardResponse{
Body: &control.EvacuateShardResponse_Body{
Count: uint32(res.ObjectsEvacuated()),
},
}
err = ctrlmessage.Sign(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}
func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
cid, ok := obj.ContainerID()
if !ok {
// Return nil to prevent situations where a shard can't be evacuated
// because of a single bad/corrupted object.
return false, nil
}
nodes, err := s.getContainerNodes(cid)
if err != nil {
return false, err
}
if len(nodes) == 0 {
return false, nil
}
var res replicatorResult
task := replicator.Task{
NumCopies: 1,
Addr: addr,
Obj: obj,
Nodes: nodes,
}
s.replicator.HandleReplicationTask(ctx, task, &res)
if res.count == 0 {
return false, errors.New("object was not replicated")
}
return true, nil
}
func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (bool, string, error) {
nodes, err := s.getContainerNodes(contID)
if err != nil {
return false, "", err
}
if len(nodes) == 0 {
return false, "", nil
}
for _, node := range nodes {
err = s.replicateTreeToNode(ctx, forest, contID, treeID, node)
if err == nil {
return true, hex.EncodeToString(node.PublicKey()), nil
}
}
return false, "", err
}
func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error {
rawCID := make([]byte, sha256.Size)
contID.Encode(rawCID)
var height uint64
for {
op, err := forest.TreeGetOpLog(ctx, contID, treeID, height)
if err != nil {
return err
}
if op.Time == 0 {
return nil
}
req := &tree.ApplyRequest{
Body: &tree.ApplyRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Operation: &tree.LogMove{
ParentId: op.Parent,
Meta: op.Meta.Bytes(),
ChildId: op.Child,
},
},
}
err = tree.SignMessage(req, s.key)
if err != nil {
return fmt.Errorf("can't message apply request: %w", err)
}
err = s.treeService.ReplicateTreeOp(ctx, node, req)
if err != nil {
return err
}
height = op.Time + 1
}
}
func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) {
nm, err := s.netMapSrc.GetNetMap(0)
if err != nil {
return nil, err
}
c, err := s.cnrSrc.Get(contID)
if err != nil {
return nil, err
}
binCnr := make([]byte, sha256.Size)
contID.Encode(binCnr)
ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
if err != nil {
return nil, errFailedToBuildListOfContainerNodes
}
nodes := placement.FlattenNodes(ns)
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
for i := 0; i < len(nodes); i++ { // don't use range, slice mutates in body
if bytes.Equal(nodes[i].PublicKey(), bs) {
copy(nodes[i:], nodes[i+1:])
nodes = nodes[:len(nodes)-1]
}
}
return nodes, nil
}
type replicatorResult struct {
count int
}
// SubmitSuccessfulReplication implements the replicator.TaskResult interface.
func (r *replicatorResult) SubmitSuccessfulReplication(_ netmap.NodeInfo) {
r.count++
}

View file

@ -1,17 +1,32 @@
package control package control
import ( import (
"bytes"
"context" "context"
"crypto/sha256"
"encoding/hex"
"errors" "errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
var errFailedToBuildListOfContainerNodes = errors.New("can't build a list of container nodes")
func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) { func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) {
err := s.isValidRequest(req) err := s.isValidRequest(req)
if err != nil { if err != nil {
@ -27,15 +42,13 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
IgnoreErrors: req.GetBody().GetIgnoreErrors(), IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject, ObjectsHandler: s.replicateObject,
TreeHandler: s.replicateTree, TreeHandler: s.replicateTree,
Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()), Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(), ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(), ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
RepOneOnly: req.GetBody().GetRepOneOnly(), RepOneOnly: req.GetBody().GetRepOneOnly(),
} }
_, err = s.s.Evacuate(ctx, prm) if err = s.s.Evacuate(ctx, prm); err != nil {
if err != nil {
var logicalErr logicerr.Logical var logicalErr logicerr.Logical
if errors.As(err, &logicalErr) { if errors.As(err, &logicalErr) {
return nil, status.Error(codes.Aborted, err.Error()) return nil, status.Error(codes.Aborted, err.Error())
@ -135,3 +148,133 @@ func (s *Server) ResetShardEvacuationStatus(ctx context.Context, req *control.Re
} }
return resp, nil return resp, nil
} }
func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
cid, ok := obj.ContainerID()
if !ok {
// Return nil to prevent situations where a shard can't be evacuated
// because of a single bad/corrupted object.
return false, nil
}
nodes, err := s.getContainerNodes(cid)
if err != nil {
return false, err
}
if len(nodes) == 0 {
return false, nil
}
var res replicatorResult
task := replicator.Task{
NumCopies: 1,
Addr: addr,
Obj: obj,
Nodes: nodes,
}
s.replicator.HandleReplicationTask(ctx, task, &res)
if res.count == 0 {
return false, errors.New("object was not replicated")
}
return true, nil
}
func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (bool, string, error) {
nodes, err := s.getContainerNodes(contID)
if err != nil {
return false, "", err
}
if len(nodes) == 0 {
return false, "", nil
}
for _, node := range nodes {
err = s.replicateTreeToNode(ctx, forest, contID, treeID, node)
if err == nil {
return true, hex.EncodeToString(node.PublicKey()), nil
}
}
return false, "", err
}
func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error {
rawCID := make([]byte, sha256.Size)
contID.Encode(rawCID)
var height uint64
for {
op, err := forest.TreeGetOpLog(ctx, contID, treeID, height)
if err != nil {
return err
}
if op.Time == 0 {
return nil
}
req := &tree.ApplyRequest{
Body: &tree.ApplyRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Operation: &tree.LogMove{
ParentId: op.Parent,
Meta: op.Meta.Bytes(),
ChildId: op.Child,
},
},
}
err = tree.SignMessage(req, s.key)
if err != nil {
return fmt.Errorf("can't message apply request: %w", err)
}
err = s.treeService.ReplicateTreeOp(ctx, node, req)
if err != nil {
return err
}
height = op.Time + 1
}
}
func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) {
nm, err := s.netMapSrc.GetNetMap(0)
if err != nil {
return nil, err
}
c, err := s.cnrSrc.Get(contID)
if err != nil {
return nil, err
}
binCnr := make([]byte, sha256.Size)
contID.Encode(binCnr)
ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
if err != nil {
return nil, errFailedToBuildListOfContainerNodes
}
nodes := placement.FlattenNodes(ns)
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
for i := 0; i < len(nodes); i++ { // don't use range, slice mutates in body
if bytes.Equal(nodes[i].PublicKey(), bs) {
copy(nodes[i:], nodes[i+1:])
nodes = nodes[:len(nodes)-1]
}
}
return nodes, nil
}
type replicatorResult struct {
count int
}
// SubmitSuccessfulReplication implements the replicator.TaskResult interface.
func (r *replicatorResult) SubmitSuccessfulReplication(_ netmap.NodeInfo) {
r.count++
}

View file

@ -30,11 +30,6 @@ service ControlService {
// Synchronizes all log operations for the specified tree. // Synchronizes all log operations for the specified tree.
rpc SynchronizeTree(SynchronizeTreeRequest) returns (SynchronizeTreeResponse); rpc SynchronizeTree(SynchronizeTreeRequest) returns (SynchronizeTreeResponse);
// EvacuateShard moves all data from one shard to the others.
// Deprecated: Use
// StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
rpc EvacuateShard(EvacuateShardRequest) returns (EvacuateShardResponse);
// StartShardEvacuation starts moving all data from one shard to the others. // StartShardEvacuation starts moving all data from one shard to the others.
rpc StartShardEvacuation(StartShardEvacuationRequest) rpc StartShardEvacuation(StartShardEvacuationRequest)
returns (StartShardEvacuationResponse); returns (StartShardEvacuationResponse);

View file

@ -26,7 +26,6 @@ const (
ControlService_ListShards_FullMethodName = "/control.ControlService/ListShards" ControlService_ListShards_FullMethodName = "/control.ControlService/ListShards"
ControlService_SetShardMode_FullMethodName = "/control.ControlService/SetShardMode" ControlService_SetShardMode_FullMethodName = "/control.ControlService/SetShardMode"
ControlService_SynchronizeTree_FullMethodName = "/control.ControlService/SynchronizeTree" ControlService_SynchronizeTree_FullMethodName = "/control.ControlService/SynchronizeTree"
ControlService_EvacuateShard_FullMethodName = "/control.ControlService/EvacuateShard"
ControlService_StartShardEvacuation_FullMethodName = "/control.ControlService/StartShardEvacuation" ControlService_StartShardEvacuation_FullMethodName = "/control.ControlService/StartShardEvacuation"
ControlService_GetShardEvacuationStatus_FullMethodName = "/control.ControlService/GetShardEvacuationStatus" ControlService_GetShardEvacuationStatus_FullMethodName = "/control.ControlService/GetShardEvacuationStatus"
ControlService_ResetShardEvacuationStatus_FullMethodName = "/control.ControlService/ResetShardEvacuationStatus" ControlService_ResetShardEvacuationStatus_FullMethodName = "/control.ControlService/ResetShardEvacuationStatus"
@ -62,10 +61,6 @@ type ControlServiceClient interface {
SetShardMode(ctx context.Context, in *SetShardModeRequest, opts ...grpc.CallOption) (*SetShardModeResponse, error) SetShardMode(ctx context.Context, in *SetShardModeRequest, opts ...grpc.CallOption) (*SetShardModeResponse, error)
// Synchronizes all log operations for the specified tree. // Synchronizes all log operations for the specified tree.
SynchronizeTree(ctx context.Context, in *SynchronizeTreeRequest, opts ...grpc.CallOption) (*SynchronizeTreeResponse, error) SynchronizeTree(ctx context.Context, in *SynchronizeTreeRequest, opts ...grpc.CallOption) (*SynchronizeTreeResponse, error)
// EvacuateShard moves all data from one shard to the others.
// Deprecated: Use
// StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
EvacuateShard(ctx context.Context, in *EvacuateShardRequest, opts ...grpc.CallOption) (*EvacuateShardResponse, error)
// StartShardEvacuation starts moving all data from one shard to the others. // StartShardEvacuation starts moving all data from one shard to the others.
StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error) StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error)
// GetShardEvacuationStatus returns evacuation status. // GetShardEvacuationStatus returns evacuation status.
@ -173,15 +168,6 @@ func (c *controlServiceClient) SynchronizeTree(ctx context.Context, in *Synchron
return out, nil return out, nil
} }
func (c *controlServiceClient) EvacuateShard(ctx context.Context, in *EvacuateShardRequest, opts ...grpc.CallOption) (*EvacuateShardResponse, error) {
out := new(EvacuateShardResponse)
err := c.cc.Invoke(ctx, ControlService_EvacuateShard_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlServiceClient) StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error) { func (c *controlServiceClient) StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error) {
out := new(StartShardEvacuationResponse) out := new(StartShardEvacuationResponse)
err := c.cc.Invoke(ctx, ControlService_StartShardEvacuation_FullMethodName, in, out, opts...) err := c.cc.Invoke(ctx, ControlService_StartShardEvacuation_FullMethodName, in, out, opts...)
@ -335,10 +321,6 @@ type ControlServiceServer interface {
SetShardMode(context.Context, *SetShardModeRequest) (*SetShardModeResponse, error) SetShardMode(context.Context, *SetShardModeRequest) (*SetShardModeResponse, error)
// Synchronizes all log operations for the specified tree. // Synchronizes all log operations for the specified tree.
SynchronizeTree(context.Context, *SynchronizeTreeRequest) (*SynchronizeTreeResponse, error) SynchronizeTree(context.Context, *SynchronizeTreeRequest) (*SynchronizeTreeResponse, error)
// EvacuateShard moves all data from one shard to the others.
// Deprecated: Use
// StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
EvacuateShard(context.Context, *EvacuateShardRequest) (*EvacuateShardResponse, error)
// StartShardEvacuation starts moving all data from one shard to the others. // StartShardEvacuation starts moving all data from one shard to the others.
StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error) StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error)
// GetShardEvacuationStatus returns evacuation status. // GetShardEvacuationStatus returns evacuation status.
@ -400,9 +382,6 @@ func (UnimplementedControlServiceServer) SetShardMode(context.Context, *SetShard
func (UnimplementedControlServiceServer) SynchronizeTree(context.Context, *SynchronizeTreeRequest) (*SynchronizeTreeResponse, error) { func (UnimplementedControlServiceServer) SynchronizeTree(context.Context, *SynchronizeTreeRequest) (*SynchronizeTreeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SynchronizeTree not implemented") return nil, status.Errorf(codes.Unimplemented, "method SynchronizeTree not implemented")
} }
func (UnimplementedControlServiceServer) EvacuateShard(context.Context, *EvacuateShardRequest) (*EvacuateShardResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method EvacuateShard not implemented")
}
func (UnimplementedControlServiceServer) StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error) { func (UnimplementedControlServiceServer) StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StartShardEvacuation not implemented") return nil, status.Errorf(codes.Unimplemented, "method StartShardEvacuation not implemented")
} }
@ -586,24 +565,6 @@ func _ControlService_SynchronizeTree_Handler(srv interface{}, ctx context.Contex
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _ControlService_EvacuateShard_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(EvacuateShardRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlServiceServer).EvacuateShard(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlService_EvacuateShard_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).EvacuateShard(ctx, req.(*EvacuateShardRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlService_StartShardEvacuation_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _ControlService_StartShardEvacuation_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StartShardEvacuationRequest) in := new(StartShardEvacuationRequest)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -909,10 +870,6 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
MethodName: "SynchronizeTree", MethodName: "SynchronizeTree",
Handler: _ControlService_SynchronizeTree_Handler, Handler: _ControlService_SynchronizeTree_Handler,
}, },
{
MethodName: "EvacuateShard",
Handler: _ControlService_EvacuateShard_Handler,
},
{ {
MethodName: "StartShardEvacuation", MethodName: "StartShardEvacuation",
Handler: _ControlService_StartShardEvacuation_Handler, Handler: _ControlService_StartShardEvacuation_Handler,