From f7c0b50d70f2d5795c8fe7194bfe6d1d1585659d Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 5 May 2023 10:57:07 +0300 Subject: [PATCH] [#329] cli: Add async evacuate commands Add start, stop evacuate and evacuate status commands. Signed-off-by: Dmitrii Stepanov --- .../modules/control/evacuate_shard.go | 9 +- cmd/frostfs-cli/modules/control/evacuation.go | 296 ++++++++++++++++++ cmd/frostfs-cli/modules/control/shards.go | 3 +- pkg/services/control/rpc.go | 60 +++- 4 files changed, 354 insertions(+), 14 deletions(-) create mode 100644 cmd/frostfs-cli/modules/control/evacuation.go diff --git a/cmd/frostfs-cli/modules/control/evacuate_shard.go b/cmd/frostfs-cli/modules/control/evacuate_shard.go index b72ff6301..458e4cc0b 100644 --- a/cmd/frostfs-cli/modules/control/evacuate_shard.go +++ b/cmd/frostfs-cli/modules/control/evacuate_shard.go @@ -11,10 +11,11 @@ import ( const ignoreErrorsFlag = "no-errors" var evacuateShardCmd = &cobra.Command{ - Use: "evacuate", - Short: "Evacuate objects from shard", - Long: "Evacuate objects from shard to other shards", - Run: evacuateShard, + Use: "evacuate", + Short: "Evacuate objects from shard", + Long: "Evacuate objects from shard to other shards", + Run: evacuateShard, + Deprecated: "use frostfs-cli control shards evacuation start", } func evacuateShard(cmd *cobra.Command, _ []string) { diff --git a/cmd/frostfs-cli/modules/control/evacuation.go b/cmd/frostfs-cli/modules/control/evacuation.go new file mode 100644 index 000000000..69d2dd8d4 --- /dev/null +++ b/cmd/frostfs-cli/modules/control/evacuation.go @@ -0,0 +1,296 @@ +package control + +import ( + "crypto/ecdsa" + "fmt" + "strings" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + "github.com/spf13/cobra" + "go.uber.org/atomic" +) + +const ( + awaitFlag = "await" + noProgressFlag = "no-progress" +) + +var evacuationShardCmd = &cobra.Command{ + Use: "evacuation", + Short: "Objects evacuation from shard", + Long: "Objects evacuation from shard to other shards", +} + +var startEvacuationShardCmd = &cobra.Command{ + Use: "start", + Short: "Start evacuate objects from shard", + Long: "Start evacuate objects from shard to other shards", + Run: startEvacuateShard, +} + +var getEvacuationShardStatusCmd = &cobra.Command{ + Use: "status", + Short: "Get evacuate objects from shard status", + Long: "Get evacuate objects from shard to other shards status", + Run: getEvacuateShardStatus, +} + +var stopEvacuationShardCmd = &cobra.Command{ + Use: "stop", + Short: "Stop running evacuate process", + Long: "Stop running evacuate process from shard to other shards", + Run: stopEvacuateShardStatus, +} + +func startEvacuateShard(cmd *cobra.Command, _ []string) { + pk := key.Get(cmd) + + ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) + + req := &control.StartShardEvacuationRequest{ + Body: &control.StartShardEvacuationRequest_Body{ + Shard_ID: getShardIDList(cmd), + IgnoreErrors: ignoreErrors, + }, + } + + signRequest(cmd, pk, req) + + cli := getClient(cmd, pk) + + var resp *control.StartShardEvacuationResponse + var err error + err = cli.ExecRaw(func(client *client.Client) error { + resp, err = control.StartEvacuateShard(client, req) + return err + }) + commonCmd.ExitOnErr(cmd, "Start evacuate shards failed, rpc error: %w", err) + + verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) + + cmd.Println("Shard evacuation has been successfully started.") + + if awaitCompletion, _ := cmd.Flags().GetBool(awaitFlag); awaitCompletion { + noProgress, _ := cmd.Flags().GetBool(noProgressFlag) + waitEvacuateCompletion(cmd, pk, cli, !noProgress, true) + } +} + +func getEvacuateShardStatus(cmd *cobra.Command, _ []string) { + pk := key.Get(cmd) + req := &control.GetShardEvacuationStatusRequest{ + Body: &control.GetShardEvacuationStatusRequest_Body{}, + } + + signRequest(cmd, pk, req) + + cli := getClient(cmd, pk) + + var resp *control.GetShardEvacuationStatusResponse + var err error + err = cli.ExecRaw(func(client *client.Client) error { + resp, err = control.GetEvacuateShardStatus(client, req) + return err + }) + commonCmd.ExitOnErr(cmd, "Get evacuate shards status failed, rpc error: %w", err) + + verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) + + printStatus(cmd, resp) +} + +func stopEvacuateShardStatus(cmd *cobra.Command, _ []string) { + pk := key.Get(cmd) + req := &control.StopShardEvacuationRequest{ + Body: &control.StopShardEvacuationRequest_Body{}, + } + + signRequest(cmd, pk, req) + + cli := getClient(cmd, pk) + + var resp *control.StopShardEvacuationResponse + var err error + err = cli.ExecRaw(func(client *client.Client) error { + resp, err = control.StopEvacuateShard(client, req) + return err + }) + commonCmd.ExitOnErr(cmd, "Stop evacuate shards failed, rpc error: %w", err) + + verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) + + waitEvacuateCompletion(cmd, pk, cli, false, false) + + cmd.Println("Evacuation stopped.") +} + +func waitEvacuateCompletion(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *clientSDK.Client, printProgress, printCompleted bool) { + const statusPollingInterval = 1 * time.Second + const reportIntervalSeconds = 5 + var resp *control.GetShardEvacuationStatusResponse + reportResponse := atomic.NewPointer(resp) + pollingCompleted := make(chan struct{}) + progressReportCompleted := make(chan struct{}) + + go func() { + defer close(progressReportCompleted) + if !printProgress { + return + } + cmd.Printf("Progress will be reported every %d seconds.\n", reportIntervalSeconds) + for { + select { + case <-pollingCompleted: + return + case <-time.After(reportIntervalSeconds * time.Second): + r := reportResponse.Load() + if r == nil || r.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_COMPLETED { + continue + } + printStatus(cmd, r) + } + } + }() + + for { + req := &control.GetShardEvacuationStatusRequest{ + Body: &control.GetShardEvacuationStatusRequest_Body{}, + } + signRequest(cmd, pk, req) + + var err error + err = cli.ExecRaw(func(client *client.Client) error { + resp, err = control.GetEvacuateShardStatus(client, req) + return err + }) + + reportResponse.Store(resp) + + if err != nil { + commonCmd.ExitOnErr(cmd, "Failed to get evacuate status, rpc error: %w", err) + return + } + if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING { + break + } + + time.Sleep(statusPollingInterval) + } + close(pollingCompleted) + <-progressReportCompleted + if printCompleted { + printCompletedStatusMessage(cmd, resp) + } +} + +func printCompletedStatusMessage(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) { + cmd.Println("Shard evacuation has been completed.") + sb := &strings.Builder{} + appendShardIDs(sb, resp) + appendCounts(sb, resp) + appendError(sb, resp) + appendStartedAt(sb, resp) + appendDuration(sb, resp) + cmd.Println(sb.String()) +} + +func printStatus(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) { + if resp.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED { + cmd.Println("There is no running or completed evacuation.") + return + } + sb := &strings.Builder{} + appendShardIDs(sb, resp) + appendStatus(sb, resp) + appendCounts(sb, resp) + appendError(sb, resp) + appendStartedAt(sb, resp) + appendDuration(sb, resp) + cmd.Println(sb.String()) +} + +func appendDuration(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { + if resp.GetBody().GetDuration() != nil { + duration := time.Second * time.Duration(resp.GetBody().GetDuration().GetSeconds()) + hour := int(duration.Seconds() / 3600) + minute := int(duration.Seconds()/60) % 60 + second := int(duration.Seconds()) % 60 + sb.WriteString(fmt.Sprintf(" Duration: %02d:%02d:%02d.", hour, minute, second)) + } +} + +func appendStartedAt(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { + if resp.GetBody().GetStartedAt() != nil { + startedAt := time.Unix(resp.GetBody().GetStartedAt().GetValue(), 0).UTC() + sb.WriteString(fmt.Sprintf(" Started at: %s UTC.", startedAt.Format(time.RFC3339))) + } +} + +func appendError(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { + if len(resp.Body.GetErrorMessage()) > 0 { + sb.WriteString(fmt.Sprintf(" Error: %s.", resp.Body.GetErrorMessage())) + } +} + +func appendStatus(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { + var status string + switch resp.GetBody().GetStatus() { + case control.GetShardEvacuationStatusResponse_Body_COMPLETED: + status = "completed" + case control.GetShardEvacuationStatusResponse_Body_RUNNING: + status = "running" + default: + status = "undefined" + } + sb.WriteString(fmt.Sprintf(" Status: %s.", status)) +} + +func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { + sb.WriteString("Shard IDs: ") + for idx, shardID := range resp.GetBody().GetShard_ID() { + shardIDStr := shard.NewIDFromBytes(shardID).String() + if idx > 0 { + sb.WriteString(", ") + } + sb.WriteString(shardIDStr) + if idx == len(resp.GetBody().GetShard_ID())-1 { + sb.WriteString(".") + } + } +} + +func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { + sb.WriteString(fmt.Sprintf(" Evacuated %d object out of %d, failed to evacuate %d objects.", + resp.GetBody().GetEvacuated(), + resp.Body.GetTotal(), + resp.Body.GetFailed())) +} + +func initControlEvacuationShardCmd() { + evacuationShardCmd.AddCommand(startEvacuationShardCmd) + evacuationShardCmd.AddCommand(getEvacuationShardStatusCmd) + evacuationShardCmd.AddCommand(stopEvacuationShardCmd) + + initControlStartEvacuationShardCmd() + initControlFlags(getEvacuationShardStatusCmd) + initControlFlags(stopEvacuationShardCmd) +} + +func initControlStartEvacuationShardCmd() { + initControlFlags(startEvacuationShardCmd) + + flags := startEvacuationShardCmd.Flags() + flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") + flags.Bool(shardAllFlag, false, "Process all shards") + flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects") + flags.Bool(awaitFlag, false, "Block execution until evacuation is completed") + flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) + + startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) +} diff --git a/cmd/frostfs-cli/modules/control/shards.go b/cmd/frostfs-cli/modules/control/shards.go index 8e7ecff8c..742109673 100644 --- a/cmd/frostfs-cli/modules/control/shards.go +++ b/cmd/frostfs-cli/modules/control/shards.go @@ -13,13 +13,14 @@ var shardsCmd = &cobra.Command{ func initControlShardsCmd() { shardsCmd.AddCommand(listShardsCmd) shardsCmd.AddCommand(setShardModeCmd) - shardsCmd.AddCommand(evacuateShardCmd) + shardsCmd.AddCommand(evacuationShardCmd) shardsCmd.AddCommand(flushCacheCmd) shardsCmd.AddCommand(doctorCmd) initControlShardsListCmd() initControlSetShardModeCmd() initControlEvacuateShardCmd() + initControlEvacuationShardCmd() initControlFlushCacheCmd() initControlDoctorCmd() } diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index 31ebfa760..3822c9f70 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -8,15 +8,18 @@ import ( const serviceName = "control.ControlService" const ( - rpcHealthCheck = "HealthCheck" - rpcSetNetmapStatus = "SetNetmapStatus" - rpcDropObjects = "DropObjects" - rpcListShards = "ListShards" - rpcSetShardMode = "SetShardMode" - rpcSynchronizeTree = "SynchronizeTree" - rpcEvacuateShard = "EvacuateShard" - rpcFlushCache = "FlushCache" - rpcDoctor = "Doctor" + rpcHealthCheck = "HealthCheck" + rpcSetNetmapStatus = "SetNetmapStatus" + rpcDropObjects = "DropObjects" + rpcListShards = "ListShards" + rpcSetShardMode = "SetShardMode" + rpcSynchronizeTree = "SynchronizeTree" + rpcEvacuateShard = "EvacuateShard" + rpcStartEvacuateShard = "StartEvacuateShard" + rpcGetEvacuateShardStatus = "GetEvacuateShardStatus" + rpcStopEvacuateShardStatus = "StopEvacuateShard" + rpcFlushCache = "FlushCache" + rpcDoctor = "Doctor" ) // HealthCheck executes ControlService.HealthCheck RPC. @@ -141,6 +144,45 @@ func EvacuateShard(cli *client.Client, req *EvacuateShardRequest, opts ...client return wResp.message, nil } +// StartEvacuateShard executes ControlService.StartEvacuateShard RPC. +func StartEvacuateShard(cli *client.Client, req *StartShardEvacuationRequest, opts ...client.CallOption) (*StartShardEvacuationResponse, error) { + wResp := newResponseWrapper[StartShardEvacuationResponse]() + wReq := &requestWrapper{m: req} + + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStartEvacuateShard), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.message, nil +} + +// GetEvacuateShardStatus executes ControlService.GetEvacuateShardStatus RPC. +func GetEvacuateShardStatus(cli *client.Client, req *GetShardEvacuationStatusRequest, opts ...client.CallOption) (*GetShardEvacuationStatusResponse, error) { + wResp := newResponseWrapper[GetShardEvacuationStatusResponse]() + wReq := &requestWrapper{m: req} + + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcGetEvacuateShardStatus), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.message, nil +} + +// StopEvacuateShard executes ControlService.StopEvacuateShard RPC. +func StopEvacuateShard(cli *client.Client, req *StopShardEvacuationRequest, opts ...client.CallOption) (*StopShardEvacuationResponse, error) { + wResp := newResponseWrapper[StopShardEvacuationResponse]() + wReq := &requestWrapper{m: req} + + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStopEvacuateShardStatus), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.message, nil +} + // FlushCache executes ControlService.FlushCache RPC. func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallOption) (*FlushCacheResponse, error) { wResp := newResponseWrapper[FlushCacheResponse]()