package control import ( "crypto/ecdsa" "fmt" "strings" "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "github.com/spf13/cobra" ) const ( awaitFlag = "await" noProgressFlag = "no-progress" ) var evacuationShardCmd = &cobra.Command{ Use: "evacuation", Short: "Objects evacuation from shard", Long: "Objects evacuation from shard to other shards", } var startEvacuationShardCmd = &cobra.Command{ Use: "start", Short: "Start evacuate objects from shard", Long: "Start evacuate objects from shard to other shards", Run: startEvacuateShard, } var getEvacuationShardStatusCmd = &cobra.Command{ Use: "status", Short: "Get evacuate objects from shard status", Long: "Get evacuate objects from shard to other shards status", Run: getEvacuateShardStatus, } var stopEvacuationShardCmd = &cobra.Command{ Use: "stop", Short: "Stop running evacuate process", Long: "Stop running evacuate process from shard to other shards", Run: stopEvacuateShardStatus, } func startEvacuateShard(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) req := &control.StartShardEvacuationRequest{ Body: &control.StartShardEvacuationRequest_Body{ Shard_ID: getShardIDList(cmd), IgnoreErrors: ignoreErrors, }, } signRequest(cmd, pk, req) cli := getClient(cmd, pk) var resp *control.StartShardEvacuationResponse var err error err = cli.ExecRaw(func(client *client.Client) error { resp, err = control.StartShardEvacuation(client, req) return err }) commonCmd.ExitOnErr(cmd, "Start evacuate shards failed, rpc error: %w", err) verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) cmd.Println("Shard evacuation has been successfully started.") if awaitCompletion, _ := cmd.Flags().GetBool(awaitFlag); awaitCompletion { noProgress, _ := cmd.Flags().GetBool(noProgressFlag) waitEvacuateCompletion(cmd, pk, cli, !noProgress, true) } } func getEvacuateShardStatus(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) req := &control.GetShardEvacuationStatusRequest{ Body: &control.GetShardEvacuationStatusRequest_Body{}, } signRequest(cmd, pk, req) cli := getClient(cmd, pk) var resp *control.GetShardEvacuationStatusResponse var err error err = cli.ExecRaw(func(client *client.Client) error { resp, err = control.GetShardEvacuationStatus(client, req) return err }) commonCmd.ExitOnErr(cmd, "Get evacuate shards status failed, rpc error: %w", err) verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) printStatus(cmd, resp) } func stopEvacuateShardStatus(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) req := &control.StopShardEvacuationRequest{ Body: &control.StopShardEvacuationRequest_Body{}, } signRequest(cmd, pk, req) cli := getClient(cmd, pk) var resp *control.StopShardEvacuationResponse var err error err = cli.ExecRaw(func(client *client.Client) error { resp, err = control.StopShardEvacuation(client, req) return err }) commonCmd.ExitOnErr(cmd, "Stop evacuate shards failed, rpc error: %w", err) verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) waitEvacuateCompletion(cmd, pk, cli, false, false) cmd.Println("Evacuation stopped.") } func waitEvacuateCompletion(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *clientSDK.Client, printProgress, printCompleted bool) { const statusPollingInterval = 1 * time.Second const reportIntervalSeconds = 5 var resp *control.GetShardEvacuationStatusResponse reportResponse := new(atomic.Pointer[control.GetShardEvacuationStatusResponse]) pollingCompleted := make(chan struct{}) progressReportCompleted := make(chan struct{}) go func() { defer close(progressReportCompleted) if !printProgress { return } cmd.Printf("Progress will be reported every %d seconds.\n", reportIntervalSeconds) for { select { case <-pollingCompleted: return case <-time.After(reportIntervalSeconds * time.Second): r := reportResponse.Load() if r == nil || r.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_COMPLETED { continue } printStatus(cmd, r) } } }() for { req := &control.GetShardEvacuationStatusRequest{ Body: &control.GetShardEvacuationStatusRequest_Body{}, } signRequest(cmd, pk, req) var err error err = cli.ExecRaw(func(client *client.Client) error { resp, err = control.GetShardEvacuationStatus(client, req) return err }) reportResponse.Store(resp) if err != nil { commonCmd.ExitOnErr(cmd, "Failed to get evacuate status, rpc error: %w", err) return } if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING { break } time.Sleep(statusPollingInterval) } close(pollingCompleted) <-progressReportCompleted if printCompleted { printCompletedStatusMessage(cmd, resp) } } func printCompletedStatusMessage(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) { cmd.Println("Shard evacuation has been completed.") sb := &strings.Builder{} appendShardIDs(sb, resp) appendCounts(sb, resp) appendError(sb, resp) appendStartedAt(sb, resp) appendDuration(sb, resp) cmd.Println(sb.String()) } func printStatus(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) { if resp.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED { cmd.Println("There is no running or completed evacuation.") return } sb := &strings.Builder{} appendShardIDs(sb, resp) appendStatus(sb, resp) appendCounts(sb, resp) appendError(sb, resp) appendStartedAt(sb, resp) appendDuration(sb, resp) appendEstimation(sb, resp) cmd.Println(sb.String()) } func appendEstimation(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING || resp.GetBody().GetDuration() == nil || resp.GetBody().GetTotal() == 0 || resp.GetBody().GetEvacuated()+resp.GetBody().GetFailed()+resp.Body.GetSkipped() == 0 { return } durationSeconds := float64(resp.GetBody().GetDuration().GetSeconds()) evacuated := float64(resp.GetBody().GetEvacuated() + resp.GetBody().GetFailed() + resp.GetBody().GetSkipped()) avgObjEvacuationTimeSeconds := durationSeconds / evacuated objectsLeft := float64(resp.GetBody().GetTotal()) - evacuated leftSeconds := avgObjEvacuationTimeSeconds * objectsLeft leftMinutes := int(leftSeconds / 60) sb.WriteString(fmt.Sprintf(" Estimated time left: %d minutes.", leftMinutes)) } func appendDuration(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { if resp.GetBody().GetDuration() != nil { duration := time.Second * time.Duration(resp.GetBody().GetDuration().GetSeconds()) hour := int(duration.Seconds() / 3600) minute := int(duration.Seconds()/60) % 60 second := int(duration.Seconds()) % 60 sb.WriteString(fmt.Sprintf(" Duration: %02d:%02d:%02d.", hour, minute, second)) } } func appendStartedAt(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { if resp.GetBody().GetStartedAt() != nil { startedAt := time.Unix(resp.GetBody().GetStartedAt().GetValue(), 0).UTC() sb.WriteString(fmt.Sprintf(" Started at: %s UTC.", startedAt.Format(time.RFC3339))) } } func appendError(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { if len(resp.Body.GetErrorMessage()) > 0 { sb.WriteString(fmt.Sprintf(" Error: %s.", resp.Body.GetErrorMessage())) } } func appendStatus(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { var status string switch resp.GetBody().GetStatus() { case control.GetShardEvacuationStatusResponse_Body_COMPLETED: status = "completed" case control.GetShardEvacuationStatusResponse_Body_RUNNING: status = "running" default: status = "undefined" } sb.WriteString(fmt.Sprintf(" Status: %s.", status)) } func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { sb.WriteString("Shard IDs: ") for idx, shardID := range resp.GetBody().GetShard_ID() { shardIDStr := shard.NewIDFromBytes(shardID).String() if idx > 0 { sb.WriteString(", ") } sb.WriteString(shardIDStr) if idx == len(resp.GetBody().GetShard_ID())-1 { sb.WriteString(".") } } } func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { sb.WriteString(fmt.Sprintf(" Evacuated %d objects out of %d, failed to evacuate: %d, skipped: %d.", resp.GetBody().GetEvacuated(), resp.GetBody().GetTotal(), resp.GetBody().GetFailed(), resp.GetBody().GetSkipped())) } func initControlEvacuationShardCmd() { evacuationShardCmd.AddCommand(startEvacuationShardCmd) evacuationShardCmd.AddCommand(getEvacuationShardStatusCmd) evacuationShardCmd.AddCommand(stopEvacuationShardCmd) initControlStartEvacuationShardCmd() initControlFlags(getEvacuationShardStatusCmd) initControlFlags(stopEvacuationShardCmd) } func initControlStartEvacuationShardCmd() { initControlFlags(startEvacuationShardCmd) flags := startEvacuationShardCmd.Flags() flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") flags.Bool(shardAllFlag, false, "Process all shards") flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects") flags.Bool(awaitFlag, false, "Block execution until evacuation is completed") flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) }