296 lines
8.8 KiB
Go
296 lines
8.8 KiB
Go
package control
|
|
|
|
import (
|
|
"crypto/ecdsa"
|
|
"fmt"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
|
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
const (
|
|
awaitFlag = "await"
|
|
noProgressFlag = "no-progress"
|
|
)
|
|
|
|
var evacuationShardCmd = &cobra.Command{
|
|
Use: "evacuation",
|
|
Short: "Objects evacuation from shard",
|
|
Long: "Objects evacuation from shard to other shards",
|
|
}
|
|
|
|
var startEvacuationShardCmd = &cobra.Command{
|
|
Use: "start",
|
|
Short: "Start evacuate objects from shard",
|
|
Long: "Start evacuate objects from shard to other shards",
|
|
Run: startEvacuateShard,
|
|
}
|
|
|
|
var getEvacuationShardStatusCmd = &cobra.Command{
|
|
Use: "status",
|
|
Short: "Get evacuate objects from shard status",
|
|
Long: "Get evacuate objects from shard to other shards status",
|
|
Run: getEvacuateShardStatus,
|
|
}
|
|
|
|
var stopEvacuationShardCmd = &cobra.Command{
|
|
Use: "stop",
|
|
Short: "Stop running evacuate process",
|
|
Long: "Stop running evacuate process from shard to other shards",
|
|
Run: stopEvacuateShardStatus,
|
|
}
|
|
|
|
func startEvacuateShard(cmd *cobra.Command, _ []string) {
|
|
pk := key.Get(cmd)
|
|
|
|
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
|
|
|
req := &control.StartShardEvacuationRequest{
|
|
Body: &control.StartShardEvacuationRequest_Body{
|
|
Shard_ID: getShardIDList(cmd),
|
|
IgnoreErrors: ignoreErrors,
|
|
},
|
|
}
|
|
|
|
signRequest(cmd, pk, req)
|
|
|
|
cli := getClient(cmd, pk)
|
|
|
|
var resp *control.StartShardEvacuationResponse
|
|
var err error
|
|
err = cli.ExecRaw(func(client *client.Client) error {
|
|
resp, err = control.StartShardEvacuation(client, req)
|
|
return err
|
|
})
|
|
commonCmd.ExitOnErr(cmd, "Start evacuate shards failed, rpc error: %w", err)
|
|
|
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
|
|
|
cmd.Println("Shard evacuation has been successfully started.")
|
|
|
|
if awaitCompletion, _ := cmd.Flags().GetBool(awaitFlag); awaitCompletion {
|
|
noProgress, _ := cmd.Flags().GetBool(noProgressFlag)
|
|
waitEvacuateCompletion(cmd, pk, cli, !noProgress, true)
|
|
}
|
|
}
|
|
|
|
func getEvacuateShardStatus(cmd *cobra.Command, _ []string) {
|
|
pk := key.Get(cmd)
|
|
req := &control.GetShardEvacuationStatusRequest{
|
|
Body: &control.GetShardEvacuationStatusRequest_Body{},
|
|
}
|
|
|
|
signRequest(cmd, pk, req)
|
|
|
|
cli := getClient(cmd, pk)
|
|
|
|
var resp *control.GetShardEvacuationStatusResponse
|
|
var err error
|
|
err = cli.ExecRaw(func(client *client.Client) error {
|
|
resp, err = control.GetShardEvacuationStatus(client, req)
|
|
return err
|
|
})
|
|
commonCmd.ExitOnErr(cmd, "Get evacuate shards status failed, rpc error: %w", err)
|
|
|
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
|
|
|
printStatus(cmd, resp)
|
|
}
|
|
|
|
func stopEvacuateShardStatus(cmd *cobra.Command, _ []string) {
|
|
pk := key.Get(cmd)
|
|
req := &control.StopShardEvacuationRequest{
|
|
Body: &control.StopShardEvacuationRequest_Body{},
|
|
}
|
|
|
|
signRequest(cmd, pk, req)
|
|
|
|
cli := getClient(cmd, pk)
|
|
|
|
var resp *control.StopShardEvacuationResponse
|
|
var err error
|
|
err = cli.ExecRaw(func(client *client.Client) error {
|
|
resp, err = control.StopShardEvacuation(client, req)
|
|
return err
|
|
})
|
|
commonCmd.ExitOnErr(cmd, "Stop evacuate shards failed, rpc error: %w", err)
|
|
|
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
|
|
|
waitEvacuateCompletion(cmd, pk, cli, false, false)
|
|
|
|
cmd.Println("Evacuation stopped.")
|
|
}
|
|
|
|
func waitEvacuateCompletion(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *clientSDK.Client, printProgress, printCompleted bool) {
|
|
const statusPollingInterval = 1 * time.Second
|
|
const reportIntervalSeconds = 5
|
|
var resp *control.GetShardEvacuationStatusResponse
|
|
reportResponse := new(atomic.Pointer[control.GetShardEvacuationStatusResponse])
|
|
pollingCompleted := make(chan struct{})
|
|
progressReportCompleted := make(chan struct{})
|
|
|
|
go func() {
|
|
defer close(progressReportCompleted)
|
|
if !printProgress {
|
|
return
|
|
}
|
|
cmd.Printf("Progress will be reported every %d seconds.\n", reportIntervalSeconds)
|
|
for {
|
|
select {
|
|
case <-pollingCompleted:
|
|
return
|
|
case <-time.After(reportIntervalSeconds * time.Second):
|
|
r := reportResponse.Load()
|
|
if r == nil || r.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_COMPLETED {
|
|
continue
|
|
}
|
|
printStatus(cmd, r)
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
req := &control.GetShardEvacuationStatusRequest{
|
|
Body: &control.GetShardEvacuationStatusRequest_Body{},
|
|
}
|
|
signRequest(cmd, pk, req)
|
|
|
|
var err error
|
|
err = cli.ExecRaw(func(client *client.Client) error {
|
|
resp, err = control.GetShardEvacuationStatus(client, req)
|
|
return err
|
|
})
|
|
|
|
reportResponse.Store(resp)
|
|
|
|
if err != nil {
|
|
commonCmd.ExitOnErr(cmd, "Failed to get evacuate status, rpc error: %w", err)
|
|
return
|
|
}
|
|
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING {
|
|
break
|
|
}
|
|
|
|
time.Sleep(statusPollingInterval)
|
|
}
|
|
close(pollingCompleted)
|
|
<-progressReportCompleted
|
|
if printCompleted {
|
|
printCompletedStatusMessage(cmd, resp)
|
|
}
|
|
}
|
|
|
|
func printCompletedStatusMessage(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
|
|
cmd.Println("Shard evacuation has been completed.")
|
|
sb := &strings.Builder{}
|
|
appendShardIDs(sb, resp)
|
|
appendCounts(sb, resp)
|
|
appendError(sb, resp)
|
|
appendStartedAt(sb, resp)
|
|
appendDuration(sb, resp)
|
|
cmd.Println(sb.String())
|
|
}
|
|
|
|
func printStatus(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
|
|
if resp.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED {
|
|
cmd.Println("There is no running or completed evacuation.")
|
|
return
|
|
}
|
|
sb := &strings.Builder{}
|
|
appendShardIDs(sb, resp)
|
|
appendStatus(sb, resp)
|
|
appendCounts(sb, resp)
|
|
appendError(sb, resp)
|
|
appendStartedAt(sb, resp)
|
|
appendDuration(sb, resp)
|
|
cmd.Println(sb.String())
|
|
}
|
|
|
|
func appendDuration(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
if resp.GetBody().GetDuration() != nil {
|
|
duration := time.Second * time.Duration(resp.GetBody().GetDuration().GetSeconds())
|
|
hour := int(duration.Seconds() / 3600)
|
|
minute := int(duration.Seconds()/60) % 60
|
|
second := int(duration.Seconds()) % 60
|
|
sb.WriteString(fmt.Sprintf(" Duration: %02d:%02d:%02d.", hour, minute, second))
|
|
}
|
|
}
|
|
|
|
func appendStartedAt(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
if resp.GetBody().GetStartedAt() != nil {
|
|
startedAt := time.Unix(resp.GetBody().GetStartedAt().GetValue(), 0).UTC()
|
|
sb.WriteString(fmt.Sprintf(" Started at: %s UTC.", startedAt.Format(time.RFC3339)))
|
|
}
|
|
}
|
|
|
|
func appendError(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
if len(resp.Body.GetErrorMessage()) > 0 {
|
|
sb.WriteString(fmt.Sprintf(" Error: %s.", resp.Body.GetErrorMessage()))
|
|
}
|
|
}
|
|
|
|
func appendStatus(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
var status string
|
|
switch resp.GetBody().GetStatus() {
|
|
case control.GetShardEvacuationStatusResponse_Body_COMPLETED:
|
|
status = "completed"
|
|
case control.GetShardEvacuationStatusResponse_Body_RUNNING:
|
|
status = "running"
|
|
default:
|
|
status = "undefined"
|
|
}
|
|
sb.WriteString(fmt.Sprintf(" Status: %s.", status))
|
|
}
|
|
|
|
func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
sb.WriteString("Shard IDs: ")
|
|
for idx, shardID := range resp.GetBody().GetShard_ID() {
|
|
shardIDStr := shard.NewIDFromBytes(shardID).String()
|
|
if idx > 0 {
|
|
sb.WriteString(", ")
|
|
}
|
|
sb.WriteString(shardIDStr)
|
|
if idx == len(resp.GetBody().GetShard_ID())-1 {
|
|
sb.WriteString(".")
|
|
}
|
|
}
|
|
}
|
|
|
|
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
sb.WriteString(fmt.Sprintf(" Evacuated %d object out of %d, failed to evacuate %d objects.",
|
|
resp.GetBody().GetEvacuated(),
|
|
resp.Body.GetTotal(),
|
|
resp.Body.GetFailed()))
|
|
}
|
|
|
|
func initControlEvacuationShardCmd() {
|
|
evacuationShardCmd.AddCommand(startEvacuationShardCmd)
|
|
evacuationShardCmd.AddCommand(getEvacuationShardStatusCmd)
|
|
evacuationShardCmd.AddCommand(stopEvacuationShardCmd)
|
|
|
|
initControlStartEvacuationShardCmd()
|
|
initControlFlags(getEvacuationShardStatusCmd)
|
|
initControlFlags(stopEvacuationShardCmd)
|
|
}
|
|
|
|
func initControlStartEvacuationShardCmd() {
|
|
initControlFlags(startEvacuationShardCmd)
|
|
|
|
flags := startEvacuationShardCmd.Flags()
|
|
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
|
flags.Bool(shardAllFlag, false, "Process all shards")
|
|
flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
|
|
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
|
|
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
|
|
|
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
|
}
|