Async evacuate #329

Merged
fyrchik merged 4 commits from dstepanov-yadro/frostfs-node:feat/async-evacuate into master 2023-05-19 08:43:54 +00:00
19 changed files with 1229 additions and 44 deletions

View file

@ -11,10 +11,11 @@ import (
const ignoreErrorsFlag = "no-errors"
var evacuateShardCmd = &cobra.Command{
Use: "evacuate",
Short: "Evacuate objects from shard",
Long: "Evacuate objects from shard to other shards",
Run: evacuateShard,
Use: "evacuate",
Short: "Evacuate objects from shard",
Long: "Evacuate objects from shard to other shards",
Run: evacuateShard,
Deprecated: "use frostfs-cli control shards evacuation start",
}
func evacuateShard(cmd *cobra.Command, _ []string) {

View file

@ -0,0 +1,296 @@
package control
import (
"crypto/ecdsa"
"fmt"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"github.com/spf13/cobra"
"go.uber.org/atomic"
)
const (
awaitFlag = "await"
noProgressFlag = "no-progress"
)
var evacuationShardCmd = &cobra.Command{
Use: "evacuation",
Short: "Objects evacuation from shard",
Long: "Objects evacuation from shard to other shards",
}
var startEvacuationShardCmd = &cobra.Command{
Use: "start",
Short: "Start evacuate objects from shard",
Long: "Start evacuate objects from shard to other shards",
Run: startEvacuateShard,
}
var getEvacuationShardStatusCmd = &cobra.Command{
Use: "status",
Short: "Get evacuate objects from shard status",
Long: "Get evacuate objects from shard to other shards status",
Run: getEvacuateShardStatus,
}
var stopEvacuationShardCmd = &cobra.Command{
Use: "stop",
Short: "Stop running evacuate process",
Long: "Stop running evacuate process from shard to other shards",
Run: stopEvacuateShardStatus,
}
func startEvacuateShard(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
req := &control.StartShardEvacuationRequest{
Body: &control.StartShardEvacuationRequest_Body{
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
},
}
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.StartShardEvacuationResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.StartEvacuateShard(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "Start evacuate shards failed, rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
cmd.Println("Shard evacuation has been successfully started.")
if awaitCompletion, _ := cmd.Flags().GetBool(awaitFlag); awaitCompletion {
noProgress, _ := cmd.Flags().GetBool(noProgressFlag)
waitEvacuateCompletion(cmd, pk, cli, !noProgress, true)
}
}
func getEvacuateShardStatus(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.GetShardEvacuationStatusRequest{
Body: &control.GetShardEvacuationStatusRequest_Body{},
}
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.GetShardEvacuationStatusResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.GetEvacuateShardStatus(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "Get evacuate shards status failed, rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
printStatus(cmd, resp)
}
func stopEvacuateShardStatus(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.StopShardEvacuationRequest{
Body: &control.StopShardEvacuationRequest_Body{},
}
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.StopShardEvacuationResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.StopEvacuateShard(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "Stop evacuate shards failed, rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
waitEvacuateCompletion(cmd, pk, cli, false, false)
cmd.Println("Evacuation stopped.")
}
func waitEvacuateCompletion(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *clientSDK.Client, printProgress, printCompleted bool) {
const statusPollingInterval = 1 * time.Second
const reportIntervalSeconds = 5
var resp *control.GetShardEvacuationStatusResponse
reportResponse := atomic.NewPointer(resp)
pollingCompleted := make(chan struct{})
progressReportCompleted := make(chan struct{})
go func() {
defer close(progressReportCompleted)
if !printProgress {
return
}
cmd.Printf("Progress will be reported every %d seconds.\n", reportIntervalSeconds)
for {
select {
case <-pollingCompleted:
return
case <-time.After(reportIntervalSeconds * time.Second):
r := reportResponse.Load()
if r == nil || r.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_COMPLETED {
continue
}
printStatus(cmd, r)
}
}
}()
for {
req := &control.GetShardEvacuationStatusRequest{
Body: &control.GetShardEvacuationStatusRequest_Body{},
}
signRequest(cmd, pk, req)
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.GetEvacuateShardStatus(client, req)
return err
})
reportResponse.Store(resp)
if err != nil {
commonCmd.ExitOnErr(cmd, "Failed to get evacuate status, rpc error: %w", err)
return
}
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING {
break
}
time.Sleep(statusPollingInterval)
}
close(pollingCompleted)
<-progressReportCompleted
if printCompleted {
printCompletedStatusMessage(cmd, resp)
}
}
func printCompletedStatusMessage(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
cmd.Println("Shard evacuation has been completed.")
sb := &strings.Builder{}
appendShardIDs(sb, resp)
appendCounts(sb, resp)
appendError(sb, resp)
appendStartedAt(sb, resp)
appendDuration(sb, resp)
cmd.Println(sb.String())
}
func printStatus(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
if resp.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED {
cmd.Println("There is no running or completed evacuation.")
return
}
sb := &strings.Builder{}
appendShardIDs(sb, resp)
appendStatus(sb, resp)
appendCounts(sb, resp)
appendError(sb, resp)
appendStartedAt(sb, resp)
appendDuration(sb, resp)
cmd.Println(sb.String())
}
func appendDuration(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
if resp.GetBody().GetDuration() != nil {
duration := time.Second * time.Duration(resp.GetBody().GetDuration().GetSeconds())
hour := int(duration.Seconds() / 3600)
minute := int(duration.Seconds()/60) % 60
second := int(duration.Seconds()) % 60
sb.WriteString(fmt.Sprintf(" Duration: %02d:%02d:%02d.", hour, minute, second))
}
}
func appendStartedAt(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
if resp.GetBody().GetStartedAt() != nil {
startedAt := time.Unix(resp.GetBody().GetStartedAt().GetValue(), 0).UTC()
sb.WriteString(fmt.Sprintf(" Started at: %s UTC.", startedAt.Format(time.RFC3339)))
}
}
func appendError(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
if len(resp.Body.GetErrorMessage()) > 0 {
sb.WriteString(fmt.Sprintf(" Error: %s.", resp.Body.GetErrorMessage()))
}
}
func appendStatus(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
var status string
switch resp.GetBody().GetStatus() {
case control.GetShardEvacuationStatusResponse_Body_COMPLETED:
status = "completed"
case control.GetShardEvacuationStatusResponse_Body_RUNNING:
status = "running"
default:
status = "undefined"
}
sb.WriteString(fmt.Sprintf(" Status: %s.", status))
}
func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
sb.WriteString("Shard IDs: ")
for idx, shardID := range resp.GetBody().GetShard_ID() {
shardIDStr := shard.NewIDFromBytes(shardID).String()
if idx > 0 {
sb.WriteString(", ")
}
sb.WriteString(shardIDStr)
if idx == len(resp.GetBody().GetShard_ID())-1 {
sb.WriteString(".")
}
}
}
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
sb.WriteString(fmt.Sprintf(" Evacuated %d object out of %d, failed to evacuate %d objects.",
resp.GetBody().GetEvacuated(),
resp.Body.GetTotal(),
resp.Body.GetFailed()))
}
func initControlEvacuationShardCmd() {
evacuationShardCmd.AddCommand(startEvacuationShardCmd)
evacuationShardCmd.AddCommand(getEvacuationShardStatusCmd)
evacuationShardCmd.AddCommand(stopEvacuationShardCmd)
initControlStartEvacuationShardCmd()
initControlFlags(getEvacuationShardStatusCmd)
initControlFlags(stopEvacuationShardCmd)
}
func initControlStartEvacuationShardCmd() {
initControlFlags(startEvacuationShardCmd)
flags := startEvacuationShardCmd.Flags()
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
flags.Bool(shardAllFlag, false, "Process all shards")
flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -13,13 +13,14 @@ var shardsCmd = &cobra.Command{
func initControlShardsCmd() {
shardsCmd.AddCommand(listShardsCmd)
shardsCmd.AddCommand(setShardModeCmd)
shardsCmd.AddCommand(evacuateShardCmd)
shardsCmd.AddCommand(evacuationShardCmd)
shardsCmd.AddCommand(flushCacheCmd)
shardsCmd.AddCommand(doctorCmd)
initControlShardsListCmd()
initControlSetShardModeCmd()
initControlEvacuateShardCmd()
initControlEvacuationShardCmd()
initControlFlushCacheCmd()
initControlDoctorCmd()
}

92
docs/evacuation.md Normal file
View file

@ -0,0 +1,92 @@
# Shard data evacuation
## Overview
Evacuation is the process of transferring data from one shard to another. Evacuation is used in case of problems with the shard in order to save data.
To start the evacuation, it is necessary that the shard is in read-only mode (read more [here](./shard-modes.md)).
First of all, by the evacuation the data is transferred to other shards of the same node; if it is not possible, then the data is transferred to other nodes.
Only one running evacuation process is allowed on the node at a time.
`frostfs-cli` utility is used to manage evacuation.
## Commands
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag.
`frostfs-cli control shards evacuation stop` stops running evacuation process.
`frostfs-cli control shards evacuation status` prints evacuation process status.
See commands `--help` output for detailed description.
## Examples
### Set shard mode to read only
```bash
frostfs-cli control shards set-mode --mode read-only --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 8kEBwtvKLU3Hva3PaaodUi
Enter password >
Shard mode update request successfully sent.
```
### Start evacuation and get status
```bash
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 8kEBwtvKLU3Hva3PaaodUi
Enter password >
Shard evacuation has been successfully started.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 14 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:03.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 23 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:05.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: completed. Evacuated 61 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:13.
```
### Stop running evacuation process
```bash
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3
Enter password >
Shard evacuation has been successfully started.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 15 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:03.
frostfs-cli control shards evacuation stop --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Evacuation stopped.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: completed. Evacuated 31 object out of 73, failed to evacuate 0 objects. Error: context canceled. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:07.
```
### Start evacuation and await it completes
```bash
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3 --await
Enter password >
Shard evacuation has been successfully started.
Progress will be reported every 5 seconds.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 18 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:04.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 43 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:09.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 68 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14.
Shard evacuation has been completed.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14.
```
### Start evacuation and await it completes without progress notifications
```bash
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3 --await --no-progress
Enter password >
Shard evacuation has been successfully started.
Shard evacuation has been completed.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:20:00Z UTC. Duration: 00:00:14.
```

View file

@ -484,5 +484,9 @@ const (
ShardGCCollectingExpiredLocksCompleted = "collecting expired locks completed"
ShardGCRemoveGarbageStarted = "garbage remove started"
ShardGCRemoveGarbageCompleted = "garbage remove completed"
EngineShardsEvacuationFailedToCount = "failed to get total objects count to evacuate"
EngineShardsEvacuationFailedToListObjects = "failed to list objects to evacuate"
EngineShardsEvacuationFailedToReadObject = "failed to read object to evacuate"
EngineShardsEvacuationFailedToMoveObject = "failed to evacuate object to other node"
ShardGCFailedToGetExpiredWithLinked = "failed to get expired objects with linked"
)

View file

@ -35,6 +35,7 @@ type StorageEngine struct {
err error
}
evacuateLimiter *evacuationLimiter
}
type shardWrapper struct {
@ -230,6 +231,9 @@ func New(opts ...Option) *StorageEngine {
shardPools: make(map[string]util.WorkerPool),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{
guard: &sync.RWMutex{},
},
}
}

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -14,6 +15,9 @@ import (
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -24,11 +28,23 @@ type EvacuateShardPrm struct {
shardID []*shard.ID
handler func(context.Context, oid.Address, *objectSDK.Object) error
ignoreErrors bool
async bool
}
// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
count int
evacuated *atomic.Uint64
total *atomic.Uint64
failed *atomic.Uint64
}
// NewEvacuateShardRes creates new EvacuateShardRes instance.
func NewEvacuateShardRes() *EvacuateShardRes {
return &EvacuateShardRes{
evacuated: atomic.NewUint64(0),
total: atomic.NewUint64(0),
failed: atomic.NewUint64(0),
}
}
// WithShardIDList sets shard ID.
@ -46,10 +62,46 @@ func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address,
p.handler = f
}
// Count returns amount of evacuated objects.
// WithAsync sets flag to run evacuate async.
func (p *EvacuateShardPrm) WithAsync(async bool) {
p.async = async
}
// Evacuated returns amount of evacuated objects.
// Objects for which handler returned no error are also assumed evacuated.
func (p EvacuateShardRes) Count() int {
return p.count
func (p *EvacuateShardRes) Evacuated() uint64 {
if p == nil {
return 0
}
return p.evacuated.Load()
}
// Total returns total count objects to evacuate.
func (p *EvacuateShardRes) Total() uint64 {
if p == nil {
return 0
}
return p.total.Load()
}
// Failed returns count of failed objects to evacuate.
func (p *EvacuateShardRes) Failed() uint64 {
if p == nil {
return 0
}
return p.failed.Load()
}
// DeepCopy returns deep copy of result instance.
func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
if p == nil {
return nil
}
return &EvacuateShardRes{
evacuated: atomic.NewUint64(p.evacuated.Load()),
total: atomic.NewUint64(p.total.Load()),
failed: atomic.NewUint64(p.failed.Load()),
}
}
const defaultEvacuateBatchSize = 100
@ -63,15 +115,29 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (EvacuateShardRes, error) {
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
shardIDs := make([]string, len(prm.shardID))
for i := range prm.shardID {
shardIDs[i] = prm.shardID[i].String()
}
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.async),
attribute.Bool("ignoreErrors", prm.ignoreErrors),
))
defer span.End()
shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil)
if err != nil {
return EvacuateShardRes{}, err
return nil, err
}
shardsToEvacuate := make(map[string]*shard.Shard)
@ -83,23 +149,91 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (Eva
}
}
res := NewEvacuateShardRes()
ctx = ctxOrBackground(ctx, prm.async)
eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
if err != nil {
return nil, err
}
eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate)
})
if prm.async {
return nil, nil
}
return res, eg.Wait()
}
func ctxOrBackground(ctx context.Context, background bool) context.Context {
if background {
return context.Background()
}
return ctx
}
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
var err error
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.async),
attribute.Bool("ignoreErrors", prm.ignoreErrors),
))
defer func() {
span.End()
e.evacuateLimiter.Complete(err)
}()
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs))
var res EvacuateShardRes
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res)
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err))
return err
}
for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs))
return res, err
return err
}
}
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs))
return res, nil
return nil
}
func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount")
defer span.End()
for _, sh := range shardsToEvacuate {
cnt, err := sh.LogicalObjectsCount(ctx)
if err != nil {
if errors.Is(err, shard.ErrDegradedMode) {
continue
}
return err
}
res.total.Add(cnt)
}
return nil
}
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
trace.WithAttributes(
attribute.String("shardID", shardID),
))
defer span.End()
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
@ -116,6 +250,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
}
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err))
return err
}
@ -168,6 +303,12 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
trace.WithAttributes(
attribute.Int("objects_count", len(toEvacuate)),
))
defer span.End()
for i := range toEvacuate {
select {
case <-ctx.Done():
@ -182,12 +323,14 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.ignoreErrors {
res.failed.Inc()
continue
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
return err
}
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate)
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, weights, shardsToEvacuate, res)
if err != nil {
return err
}
@ -204,15 +347,16 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
err = prm.handler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
return err
}
res.count++
res.evacuated.Inc()
}
return nil
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) (bool, error) {
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) (bool, error) {
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
for j := range shards {
select {
@ -227,11 +371,11 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
if putDone || exists {
if putDone {
res.evacuated.Inc()
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", addr))
res.count++
}
return true, nil
}
@ -239,3 +383,23 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
return false, nil
}
func (e *StorageEngine) GetEvacuationState(ctx context.Context) (*EvacuationState, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return e.evacuateLimiter.GetState(), nil
}
func (e *StorageEngine) EnqueRunningEvacuationStop(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return e.evacuateLimiter.CancelIfRunning()
}

View file

@ -0,0 +1,178 @@
package engine
import (
"context"
"fmt"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"golang.org/x/sync/errgroup"
)
type EvacuateProcessState int
const (
EvacuateProcessStateUndefined EvacuateProcessState = iota
EvacuateProcessStateRunning
EvacuateProcessStateCompleted
)
type EvacuationState struct {
shardIDs []string
processState EvacuateProcessState
startedAt time.Time
finishedAt time.Time
result *EvacuateShardRes
errMessage string
}
func (s *EvacuationState) ShardIDs() []string {
if s == nil {
return nil
}
return s.shardIDs
}
func (s *EvacuationState) Evacuated() uint64 {
if s == nil {
return 0
}
return s.result.Evacuated()
}
func (s *EvacuationState) Total() uint64 {
if s == nil {
return 0
}
return s.result.Total()
}
func (s *EvacuationState) Failed() uint64 {
if s == nil {
return 0
}
return s.result.Failed()
}
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {
if s == nil {
return EvacuateProcessStateUndefined
}
return s.processState
}
func (s *EvacuationState) StartedAt() *time.Time {
if s == nil {
return nil
}
defaultTime := time.Time{}
if s.startedAt == defaultTime {
return nil
}
return &s.startedAt
}
func (s *EvacuationState) FinishedAt() *time.Time {
if s == nil {
return nil
}
defaultTime := time.Time{}
if s.finishedAt == defaultTime {
return nil
}
return &s.finishedAt
}
func (s *EvacuationState) ErrorMessage() string {
if s == nil {
return ""
}
return s.errMessage
}
func (s *EvacuationState) DeepCopy() *EvacuationState {
if s == nil {
return nil
}
shardIDs := make([]string, len(s.shardIDs))
copy(shardIDs, s.shardIDs)
return &EvacuationState{
shardIDs: shardIDs,
processState: s.processState,
startedAt: s.startedAt,
finishedAt: s.finishedAt,
errMessage: s.errMessage,
result: s.result.DeepCopy(),
}
}
type evacuationLimiter struct {
state EvacuationState
eg *errgroup.Group
cancel context.CancelFunc
guard *sync.RWMutex
}
func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, result *EvacuateShardRes) (*errgroup.Group, context.Context, error) {
l.guard.Lock()
defer l.guard.Unlock()
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}
if l.state.processState == EvacuateProcessStateRunning {
return nil, nil, logicerr.New(fmt.Sprintf("evacuate is already running for shard ids %v", l.state.shardIDs))
}
var egCtx context.Context
egCtx, l.cancel = context.WithCancel(ctx)
l.eg, egCtx = errgroup.WithContext(egCtx)
l.state = EvacuationState{
shardIDs: shardIDs,
processState: EvacuateProcessStateRunning,
startedAt: time.Now().UTC(),
result: result,
}
return l.eg, egCtx, nil
}
func (l *evacuationLimiter) Complete(err error) {
l.guard.Lock()
defer l.guard.Unlock()
errMsq := ""
if err != nil {
errMsq = err.Error()
}
l.state.processState = EvacuateProcessStateCompleted
l.state.errMessage = errMsq
l.state.finishedAt = time.Now().UTC()
l.eg = nil
}
func (l *evacuationLimiter) GetState() *EvacuationState {
l.guard.RLock()
defer l.guard.RUnlock()
return l.state.DeepCopy()
}
func (l *evacuationLimiter) CancelIfRunning() error {
l.guard.Lock()
defer l.guard.Unlock()
if l.state.processState != EvacuateProcessStateRunning {
return logicerr.New("there is no running evacuation task")
}
l.cancel()
return nil
}

View file

@ -7,6 +7,7 @@ import (
"path/filepath"
"strconv"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -21,6 +22,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
)
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
@ -103,14 +105,14 @@ func TestEvacuateShard(t *testing.T) {
t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, ErrMustBeReadOnly)
require.Equal(t, 0, res.Count())
require.Equal(t, uint64(0), res.Evacuated())
})
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, objPerShard, res.count)
require.Equal(t, uint64(objPerShard), res.Evacuated())
// We check that all objects are available both before and after shard removal.
// First case is a real-world use-case. It ensures that an object can be put in presense
@ -121,7 +123,7 @@ func TestEvacuateShard(t *testing.T) {
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, 0, res.count)
require.Equal(t, uint64(0), res.Evacuated())
checkHasObjects(t)
@ -138,8 +140,8 @@ func TestEvacuateNetwork(t *testing.T) {
var errReplication = errors.New("handler error")
acceptOneOf := func(objects []*objectSDK.Object, max int) func(context.Context, oid.Address, *objectSDK.Object) error {
var n int
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error {
var n uint64
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error {
if n == max {
return errReplication
@ -169,13 +171,13 @@ func TestEvacuateNetwork(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, 0, res.Count())
require.Equal(t, uint64(0), res.Evacuated())
prm.handler = acceptOneOf(objects, 2)
res, err = e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, 2, res.Count())
require.Equal(t, uint64(2), res.Evacuated())
})
t.Run("multiple shards, evacuate one", func(t *testing.T) {
t.Parallel()
@ -190,14 +192,14 @@ func TestEvacuateNetwork(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, 2, res.Count())
require.Equal(t, uint64(2), res.Evacuated())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, 3)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, 3, res.Count())
require.Equal(t, uint64(3), res.Evacuated())
})
})
t.Run("multiple shards, evacuate many", func(t *testing.T) {
@ -205,12 +207,12 @@ func TestEvacuateNetwork(t *testing.T) {
e, ids, objects := newEngineEvacuate(t, 4, 5)
evacuateIDs := ids[0:3]
var totalCount int
var totalCount uint64
for i := range evacuateIDs {
res, err := e.shards[ids[i].String()].List()
require.NoError(t, err)
totalCount += len(res.AddressList())
totalCount += uint64(len(res.AddressList()))
}
for i := range ids {
@ -223,14 +225,14 @@ func TestEvacuateNetwork(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, totalCount-1, res.Count())
require.Equal(t, totalCount-1, res.Evacuated())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, totalCount, res.Count())
require.Equal(t, totalCount, res.Evacuated())
})
})
}
@ -258,5 +260,114 @@ func TestEvacuateCancellation(t *testing.T) {
res, err := e.Evacuate(ctx, prm)
require.ErrorContains(t, err, "context canceled")
require.Equal(t, 0, res.Count())
require.Equal(t, uint64(0), res.Evacuated())
}
func TestEvacuateSingleProcess(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
blocker := make(chan interface{})
running := make(chan interface{})
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-running:
default:
close(running)
}
<-blocker
return nil
}
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated())
return nil
})
eg.Go(func() error {
<-running
res, err := e.Evacuate(egCtx, prm)
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
require.Equal(t, uint64(0), res.Evacuated())
close(blocker)
return nil
})
require.NoError(t, eg.Wait())
}
func TestEvacuateAsync(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
blocker := make(chan interface{})
running := make(chan interface{})
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-running:
default:
close(running)
}
<-blocker
return nil
}
st, err := e.GetEvacuationState(context.Background())
require.NoError(t, err, "get init state failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid init count")
require.Nil(t, st.StartedAt(), "invalid init started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated())
return nil
})
<-running
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get running state failed")
require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid running count")
require.NotNil(t, st.StartedAt(), "invalid running started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
expectedShardIDs := make([]string, 0, 2)
for _, id := range ids[1:2] {
expectedShardIDs = append(expectedShardIDs, id.String())
}
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid running shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
close(blocker)
require.Eventually(t, func() bool {
st, err = e.GetEvacuationState(context.Background())
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(3), st.Evacuated(), "invalid final count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
require.NoError(t, eg.Wait())
}

View file

@ -0,0 +1,31 @@
package shard
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// LogicalObjectsCount returns logical objects count.
func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
_, span := tracing.StartSpanFromContext(ctx, "Shard.LogicalObjectsCount",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.GetMode().NoMetabase() {
return 0, ErrDegradedMode
}
cc, err := s.metaBase.ObjectCounters()
if err != nil {
return 0, err
}
return cc.Logic(), nil
}

View file

@ -8,15 +8,18 @@ import (
const serviceName = "control.ControlService"
const (
rpcHealthCheck = "HealthCheck"
rpcSetNetmapStatus = "SetNetmapStatus"
rpcDropObjects = "DropObjects"
rpcListShards = "ListShards"
rpcSetShardMode = "SetShardMode"
rpcSynchronizeTree = "SynchronizeTree"
rpcEvacuateShard = "EvacuateShard"
rpcFlushCache = "FlushCache"
rpcDoctor = "Doctor"
rpcHealthCheck = "HealthCheck"
rpcSetNetmapStatus = "SetNetmapStatus"
rpcDropObjects = "DropObjects"
rpcListShards = "ListShards"
rpcSetShardMode = "SetShardMode"
rpcSynchronizeTree = "SynchronizeTree"
rpcEvacuateShard = "EvacuateShard"
rpcStartEvacuateShard = "StartEvacuateShard"
rpcGetEvacuateShardStatus = "GetEvacuateShardStatus"
rpcStopEvacuateShardStatus = "StopEvacuateShard"
rpcFlushCache = "FlushCache"
rpcDoctor = "Doctor"
)
// HealthCheck executes ControlService.HealthCheck RPC.
@ -141,6 +144,45 @@ func EvacuateShard(cli *client.Client, req *EvacuateShardRequest, opts ...client
return wResp.message, nil
}
// StartEvacuateShard executes ControlService.StartEvacuateShard RPC.
func StartEvacuateShard(cli *client.Client, req *StartShardEvacuationRequest, opts ...client.CallOption) (*StartShardEvacuationResponse, error) {
wResp := newResponseWrapper[StartShardEvacuationResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStartEvacuateShard), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}
// GetEvacuateShardStatus executes ControlService.GetEvacuateShardStatus RPC.
func GetEvacuateShardStatus(cli *client.Client, req *GetShardEvacuationStatusRequest, opts ...client.CallOption) (*GetShardEvacuationStatusResponse, error) {
wResp := newResponseWrapper[GetShardEvacuationStatusResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcGetEvacuateShardStatus), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}
// StopEvacuateShard executes ControlService.StopEvacuateShard RPC.
func StopEvacuateShard(cli *client.Client, req *StopShardEvacuationRequest, opts ...client.CallOption) (*StopShardEvacuationResponse, error) {
wResp := newResponseWrapper[StopShardEvacuationResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStopEvacuateShardStatus), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}
// FlushCache executes ControlService.FlushCache RPC.
func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallOption) (*FlushCacheResponse, error) {
wResp := newResponseWrapper[FlushCacheResponse]()

View file

@ -0,0 +1,60 @@
package control
import (
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"github.com/mr-tron/base58"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuationStatusResponse, error) {
shardIDs := make([][]byte, 0, len(state.ShardIDs()))
for _, shID := range state.ShardIDs() {
id, err := base58.Decode(shID)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("invalid shard id format: %s", shID))
}
shardIDs = append(shardIDs, id)
}
var evacStatus control.GetShardEvacuationStatusResponse_Body_Status
switch state.ProcessingStatus() {
case engine.EvacuateProcessStateRunning:
evacStatus = control.GetShardEvacuationStatusResponse_Body_RUNNING
case engine.EvacuateProcessStateCompleted:
evacStatus = control.GetShardEvacuationStatusResponse_Body_COMPLETED
default:
evacStatus = control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED
}
var startedAt *control.GetShardEvacuationStatusResponse_Body_UnixTimestamp
if state.StartedAt() != nil {
startedAt = &control.GetShardEvacuationStatusResponse_Body_UnixTimestamp{
Value: state.StartedAt().Unix(),
}
}
var duration *control.GetShardEvacuationStatusResponse_Body_Duration
if state.StartedAt() != nil {
end := time.Now().UTC()
if state.FinishedAt() != nil {
end = *state.FinishedAt()
}
duration = &control.GetShardEvacuationStatusResponse_Body_Duration{
Seconds: int64(end.Sub(*state.StartedAt()).Seconds()),
}
}
return &control.GetShardEvacuationStatusResponse{
Body: &control.GetShardEvacuationStatusResponse_Body{
Shard_ID: shardIDs,
Evacuated: state.Evacuated(),
Total: state.Total(),
Failed: state.Failed(),
Status: evacStatus,
StartedAt: startedAt,
Duration: duration,
ErrorMessage: state.ErrorMessage(),
},
}, nil
}

View file

@ -37,7 +37,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
resp := &control.EvacuateShardResponse{
Body: &control.EvacuateShardResponse_Body{
Count: uint32(res.Count()),
Count: uint32(res.Evacuated()),
},
}

View file

@ -0,0 +1,97 @@
package control
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
var prm engine.EvacuateShardPrm
prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID()))
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
prm.WithFaultHandler(s.replicate)
prm.WithAsync(true)
_, err = s.s.Evacuate(ctx, prm)
if err != nil {
var logicalErr logicerr.Logical
if errors.As(err, &logicalErr) {
return nil, status.Error(codes.Aborted, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.StartShardEvacuationResponse{
Body: &control.StartShardEvacuationResponse_Body{},
}
err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}
func (s *Server) GetShardEvacuationStatus(ctx context.Context, req *control.GetShardEvacuationStatusRequest) (*control.GetShardEvacuationStatusResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
state, err := s.s.GetEvacuationState(ctx)
if err != nil {
var logicalErr logicerr.Logical
if errors.As(err, &logicalErr) {
return nil, status.Error(codes.Aborted, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
resp, err := stateToResponse(state)
if err != nil {
return nil, err
}
err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}
func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShardEvacuationRequest) (*control.StopShardEvacuationResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
err = s.s.EnqueRunningEvacuationStop(ctx)
if err != nil {
var logicalErr logicerr.Logical
if errors.As(err, &logicalErr) {
return nil, status.Error(codes.Aborted, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.StopShardEvacuationResponse{
Body: &control.StopShardEvacuationResponse_Body{},
}
err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}

Binary file not shown.

View file

@ -27,8 +27,18 @@ service ControlService {
rpc SynchronizeTree (SynchronizeTreeRequest) returns (SynchronizeTreeResponse);
// EvacuateShard moves all data from one shard to the others.
// Deprecated: Use StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
rpc EvacuateShard (EvacuateShardRequest) returns (EvacuateShardResponse);
// StartShardEvacuation starts moving all data from one shard to the others.
rpc StartShardEvacuation (StartShardEvacuationRequest) returns (StartShardEvacuationResponse);
// GetShardEvacuationStatus returns evacuation status.
rpc GetShardEvacuationStatus (GetShardEvacuationStatusRequest) returns (GetShardEvacuationStatusResponse);
// StopShardEvacuation stops moving all data from one shard to the others.
rpc StopShardEvacuation (StopShardEvacuationRequest) returns (StopShardEvacuationResponse);
// FlushCache moves all data from one shard to the others.
rpc FlushCache (FlushCacheRequest) returns (FlushCacheResponse);
@ -298,3 +308,97 @@ message DoctorResponse {
Body body = 1;
Signature signature = 2;
}
// StartShardEvacuation request.
message StartShardEvacuationRequest {
// Request body structure.
message Body {
// IDs of the shards.
repeated bytes shard_ID = 1;
// Flag indicating whether object read errors should be ignored.
bool ignore_errors = 2;
}
Body body = 1;
Signature signature = 2;
}
// StartShardEvacuation response.
message StartShardEvacuationResponse {
// Response body structure.
message Body {}
Body body = 1;
Signature signature = 2;
}
// GetShardEvacuationStatus request.
message GetShardEvacuationStatusRequest {
// Request body structure.
message Body {}
Body body = 1;
Signature signature = 2;
}
// GetShardEvacuationStatus response.
message GetShardEvacuationStatusResponse {
// Response body structure.
message Body {
// Evacuate status enum.
enum Status {
EVACUATE_SHARD_STATUS_UNDEFINED = 0;
RUNNING = 1;
COMPLETED = 2;
}
// Unix timestamp value.
message UnixTimestamp {
int64 value = 1;
}
// Duration in seconds.
message Duration {
int64 seconds = 1;
}
// Total objects to evacuate count. The value is approximate, so evacuated + failed == total is not guaranteed after completion.
uint64 total = 1;
// Evacuated objects count.
uint64 evacuated = 2;
// Failed objects count.
uint64 failed = 3;
// Shard IDs.
repeated bytes shard_ID = 4;
// Evacuation process status.
Status status = 5;
// Evacuation process duration.
Duration duration = 6;
// Evacuation process started at timestamp.
UnixTimestamp started_at = 7;
// Error message if evacuation failed.
string error_message = 8;
}
Body body = 1;
Signature signature = 2;
}
// StopShardEvacuation request.
message StopShardEvacuationRequest {
// Request body structure.
message Body {}
Body body = 1;
Signature signature = 2;
}
// StopShardEvacuation response.
message StopShardEvacuationResponse {
// Response body structure.
message Body {}
Body body = 1;
Signature signature = 2;
}

Binary file not shown.

Binary file not shown.

Binary file not shown.