Anton Nikiforov
34e6a309c6
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 1m34s
DCO action / DCO (pull_request) Successful in 1m45s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m43s
Vulncheck / Vulncheck (pull_request) Successful in 2m35s
Tests and linters / Staticcheck (pull_request) Successful in 2m44s
Build / Build Components (pull_request) Successful in 3m19s
Tests and linters / gopls check (pull_request) Successful in 3m26s
Tests and linters / Lint (pull_request) Successful in 3m37s
Tests and linters / Tests with -race (pull_request) Successful in 4m44s
Tests and linters / Tests (pull_request) Successful in 5m13s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
385 lines
13 KiB
Go
385 lines
13 KiB
Go
package control
|
|
|
|
import (
|
|
"crypto/ecdsa"
|
|
"fmt"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
|
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
const (
|
|
awaitFlag = "await"
|
|
noProgressFlag = "no-progress"
|
|
scopeFlag = "scope"
|
|
|
|
containerWorkerCountFlag = "container-worker-count"
|
|
objectWorkerCountFlag = "object-worker-count"
|
|
|
|
scopeAll = "all"
|
|
scopeObjects = "objects"
|
|
scopeTrees = "trees"
|
|
)
|
|
|
|
var evacuationShardCmd = &cobra.Command{
|
|
Use: "evacuation",
|
|
Short: "Objects evacuation from shard",
|
|
Long: "Objects evacuation from shard to other shards",
|
|
}
|
|
|
|
var startEvacuationShardCmd = &cobra.Command{
|
|
Use: "start",
|
|
Short: "Start evacuate objects from shard",
|
|
Long: "Start evacuate objects from shard to other shards",
|
|
Run: startEvacuateShard,
|
|
}
|
|
|
|
var getEvacuationShardStatusCmd = &cobra.Command{
|
|
Use: "status",
|
|
Short: "Get evacuate objects from shard status",
|
|
Long: "Get evacuate objects from shard to other shards status",
|
|
Run: getEvacuateShardStatus,
|
|
}
|
|
|
|
var stopEvacuationShardCmd = &cobra.Command{
|
|
Use: "stop",
|
|
Short: "Stop running evacuate process",
|
|
Long: "Stop running evacuate process from shard to other shards",
|
|
Run: stopEvacuateShardStatus,
|
|
}
|
|
|
|
var resetEvacuationStatusShardCmd = &cobra.Command{
|
|
Use: "reset",
|
|
Short: "Reset evacuate objects from shard status",
|
|
Long: "Reset evacuate objects from shard to other shards status",
|
|
Run: resetEvacuateShardStatus,
|
|
}
|
|
|
|
func startEvacuateShard(cmd *cobra.Command, _ []string) {
|
|
pk := key.Get(cmd)
|
|
|
|
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
|
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
|
|
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
|
|
|
|
req := &control.StartShardEvacuationRequest{
|
|
Body: &control.StartShardEvacuationRequest_Body{
|
|
Shard_ID: getShardIDList(cmd),
|
|
IgnoreErrors: ignoreErrors,
|
|
Scope: getEvacuationScope(cmd),
|
|
ContainerWorkerCount: containerWorkerCount,
|
|
ObjectWorkerCount: objectWorkerCount,
|
|
},
|
|
}
|
|
|
|
signRequest(cmd, pk, req)
|
|
|
|
cli := getClient(cmd, pk)
|
|
|
|
var resp *control.StartShardEvacuationResponse
|
|
var err error
|
|
err = cli.ExecRaw(func(client *client.Client) error {
|
|
resp, err = control.StartShardEvacuation(client, req)
|
|
return err
|
|
})
|
|
commonCmd.ExitOnErr(cmd, "Start evacuate shards failed, rpc error: %w", err)
|
|
|
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
|
|
|
cmd.Println("Shard evacuation has been successfully started.")
|
|
|
|
if awaitCompletion, _ := cmd.Flags().GetBool(awaitFlag); awaitCompletion {
|
|
noProgress, _ := cmd.Flags().GetBool(noProgressFlag)
|
|
waitEvacuateCompletion(cmd, pk, cli, !noProgress, true)
|
|
}
|
|
}
|
|
|
|
func getEvacuationScope(cmd *cobra.Command) uint32 {
|
|
rawScope, err := cmd.Flags().GetString(scopeFlag)
|
|
commonCmd.ExitOnErr(cmd, "Invalid scope value: %w", err)
|
|
switch rawScope {
|
|
case scopeAll:
|
|
return uint32(control.StartShardEvacuationRequest_Body_OBJECTS) | uint32(control.StartShardEvacuationRequest_Body_TREES)
|
|
case scopeObjects:
|
|
return uint32(control.StartShardEvacuationRequest_Body_OBJECTS)
|
|
case scopeTrees:
|
|
return uint32(control.StartShardEvacuationRequest_Body_TREES)
|
|
default:
|
|
commonCmd.ExitOnErr(cmd, "Invalid scope value: %w", fmt.Errorf("unknown scope %s", rawScope))
|
|
}
|
|
return uint32(control.StartShardEvacuationRequest_Body_NONE)
|
|
}
|
|
|
|
func getEvacuateShardStatus(cmd *cobra.Command, _ []string) {
|
|
pk := key.Get(cmd)
|
|
req := &control.GetShardEvacuationStatusRequest{
|
|
Body: &control.GetShardEvacuationStatusRequest_Body{},
|
|
}
|
|
|
|
signRequest(cmd, pk, req)
|
|
|
|
cli := getClient(cmd, pk)
|
|
|
|
var resp *control.GetShardEvacuationStatusResponse
|
|
var err error
|
|
err = cli.ExecRaw(func(client *client.Client) error {
|
|
resp, err = control.GetShardEvacuationStatus(client, req)
|
|
return err
|
|
})
|
|
commonCmd.ExitOnErr(cmd, "Get evacuate shards status failed, rpc error: %w", err)
|
|
|
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
|
|
|
printStatus(cmd, resp)
|
|
}
|
|
|
|
func stopEvacuateShardStatus(cmd *cobra.Command, _ []string) {
|
|
pk := key.Get(cmd)
|
|
req := &control.StopShardEvacuationRequest{
|
|
Body: &control.StopShardEvacuationRequest_Body{},
|
|
}
|
|
|
|
signRequest(cmd, pk, req)
|
|
|
|
cli := getClient(cmd, pk)
|
|
|
|
var resp *control.StopShardEvacuationResponse
|
|
var err error
|
|
err = cli.ExecRaw(func(client *client.Client) error {
|
|
resp, err = control.StopShardEvacuation(client, req)
|
|
return err
|
|
})
|
|
commonCmd.ExitOnErr(cmd, "Stop evacuate shards failed, rpc error: %w", err)
|
|
|
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
|
|
|
waitEvacuateCompletion(cmd, pk, cli, false, false)
|
|
|
|
cmd.Println("Evacuation stopped.")
|
|
}
|
|
|
|
func resetEvacuateShardStatus(cmd *cobra.Command, _ []string) {
|
|
pk := key.Get(cmd)
|
|
req := &control.ResetShardEvacuationStatusRequest{
|
|
Body: &control.ResetShardEvacuationStatusRequest_Body{},
|
|
}
|
|
|
|
signRequest(cmd, pk, req)
|
|
|
|
cli := getClient(cmd, pk)
|
|
|
|
var resp *control.ResetShardEvacuationStatusResponse
|
|
var err error
|
|
err = cli.ExecRaw(func(client *client.Client) error {
|
|
resp, err = control.ResetShardEvacuationStatus(client, req)
|
|
return err
|
|
})
|
|
commonCmd.ExitOnErr(cmd, "Reset shards evacuation status failed, rpc error: %w", err)
|
|
|
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
|
|
|
cmd.Println("Shards evacuation status has been reset.")
|
|
}
|
|
|
|
func waitEvacuateCompletion(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *clientSDK.Client, printProgress, printCompleted bool) {
|
|
const statusPollingInterval = 1 * time.Second
|
|
const reportIntervalSeconds = 5
|
|
var resp *control.GetShardEvacuationStatusResponse
|
|
reportResponse := new(atomic.Pointer[control.GetShardEvacuationStatusResponse])
|
|
pollingCompleted := make(chan struct{})
|
|
progressReportCompleted := make(chan struct{})
|
|
|
|
go func() {
|
|
defer close(progressReportCompleted)
|
|
if !printProgress {
|
|
return
|
|
}
|
|
cmd.Printf("Progress will be reported every %d seconds.\n", reportIntervalSeconds)
|
|
for {
|
|
select {
|
|
case <-pollingCompleted:
|
|
return
|
|
case <-time.After(reportIntervalSeconds * time.Second):
|
|
r := reportResponse.Load()
|
|
if r == nil || r.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_COMPLETED {
|
|
continue
|
|
}
|
|
printStatus(cmd, r)
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
req := &control.GetShardEvacuationStatusRequest{
|
|
Body: &control.GetShardEvacuationStatusRequest_Body{},
|
|
}
|
|
signRequest(cmd, pk, req)
|
|
|
|
var err error
|
|
err = cli.ExecRaw(func(client *client.Client) error {
|
|
resp, err = control.GetShardEvacuationStatus(client, req)
|
|
return err
|
|
})
|
|
|
|
reportResponse.Store(resp)
|
|
|
|
if err != nil {
|
|
commonCmd.ExitOnErr(cmd, "Failed to get evacuate status, rpc error: %w", err)
|
|
return
|
|
}
|
|
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING {
|
|
break
|
|
}
|
|
|
|
time.Sleep(statusPollingInterval)
|
|
}
|
|
close(pollingCompleted)
|
|
<-progressReportCompleted
|
|
if printCompleted {
|
|
printCompletedStatusMessage(cmd, resp)
|
|
}
|
|
}
|
|
|
|
func printCompletedStatusMessage(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
|
|
cmd.Println("Shard evacuation has been completed.")
|
|
sb := &strings.Builder{}
|
|
appendShardIDs(sb, resp)
|
|
appendCounts(sb, resp)
|
|
appendError(sb, resp)
|
|
appendStartedAt(sb, resp)
|
|
appendDuration(sb, resp)
|
|
cmd.Println(sb.String())
|
|
}
|
|
|
|
func printStatus(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
|
|
if resp.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED {
|
|
cmd.Println("There is no running or completed evacuation.")
|
|
return
|
|
}
|
|
sb := &strings.Builder{}
|
|
appendShardIDs(sb, resp)
|
|
appendStatus(sb, resp)
|
|
appendCounts(sb, resp)
|
|
appendError(sb, resp)
|
|
appendStartedAt(sb, resp)
|
|
appendDuration(sb, resp)
|
|
appendEstimation(sb, resp)
|
|
cmd.Println(sb.String())
|
|
}
|
|
|
|
func appendEstimation(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING ||
|
|
resp.GetBody().GetDuration() == nil ||
|
|
(resp.GetBody().GetTotalObjects() == 0 && resp.GetBody().GetTotalTrees() == 0) ||
|
|
(resp.GetBody().GetEvacuatedObjects()+resp.GetBody().GetFailedObjects()+resp.GetBody().GetSkippedObjects() == 0 &&
|
|
resp.GetBody().GetEvacuatedTrees()+resp.GetBody().GetFailedTrees() == 0) {
|
|
return
|
|
}
|
|
|
|
durationSeconds := float64(resp.GetBody().GetDuration().GetSeconds())
|
|
evacuated := float64(resp.GetBody().GetEvacuatedObjects() + resp.GetBody().GetFailedObjects() + resp.GetBody().GetSkippedObjects() +
|
|
resp.GetBody().GetEvacuatedTrees() + resp.GetBody().GetFailedTrees())
|
|
avgObjEvacuationTimeSeconds := durationSeconds / evacuated
|
|
objectsLeft := float64(resp.GetBody().GetTotalObjects()+resp.GetBody().GetTotalTrees()) - evacuated
|
|
leftSeconds := avgObjEvacuationTimeSeconds * objectsLeft
|
|
leftMinutes := int(leftSeconds / 60)
|
|
|
|
sb.WriteString(fmt.Sprintf(" Estimated time left: %d minutes.", leftMinutes))
|
|
}
|
|
|
|
func appendDuration(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
if resp.GetBody().GetDuration() != nil {
|
|
duration := time.Second * time.Duration(resp.GetBody().GetDuration().GetSeconds())
|
|
hour := int(duration.Seconds() / 3600)
|
|
minute := int(duration.Seconds()/60) % 60
|
|
second := int(duration.Seconds()) % 60
|
|
sb.WriteString(fmt.Sprintf(" Duration: %02d:%02d:%02d.", hour, minute, second))
|
|
}
|
|
}
|
|
|
|
func appendStartedAt(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
if resp.GetBody().GetStartedAt() != nil {
|
|
startedAt := time.Unix(resp.GetBody().GetStartedAt().GetValue(), 0).UTC()
|
|
sb.WriteString(fmt.Sprintf(" Started at: %s UTC.", startedAt.Format(time.RFC3339)))
|
|
}
|
|
}
|
|
|
|
func appendError(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
if len(resp.GetBody().GetErrorMessage()) > 0 {
|
|
sb.WriteString(fmt.Sprintf(" Error: %s.", resp.GetBody().GetErrorMessage()))
|
|
}
|
|
}
|
|
|
|
func appendStatus(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
var status string
|
|
switch resp.GetBody().GetStatus() {
|
|
case control.GetShardEvacuationStatusResponse_Body_COMPLETED:
|
|
status = "completed"
|
|
case control.GetShardEvacuationStatusResponse_Body_RUNNING:
|
|
status = "running"
|
|
default:
|
|
status = "undefined"
|
|
}
|
|
sb.WriteString(fmt.Sprintf(" Status: %s.", status))
|
|
}
|
|
|
|
func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
sb.WriteString("Shard IDs: ")
|
|
for idx, shardID := range resp.GetBody().GetShard_ID() {
|
|
shardIDStr := shard.NewIDFromBytes(shardID).String()
|
|
if idx > 0 {
|
|
sb.WriteString(", ")
|
|
}
|
|
sb.WriteString(shardIDStr)
|
|
if idx == len(resp.GetBody().GetShard_ID())-1 {
|
|
sb.WriteString(".")
|
|
}
|
|
}
|
|
}
|
|
|
|
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
|
sb.WriteString(fmt.Sprintf(" Evacuated %d objects out of %d, failed to evacuate: %d, skipped: %d; evacuated %d trees out of %d, failed to evacuate: %d.",
|
|
resp.GetBody().GetEvacuatedObjects(),
|
|
resp.GetBody().GetTotalObjects(),
|
|
resp.GetBody().GetFailedObjects(),
|
|
resp.GetBody().GetSkippedObjects(),
|
|
resp.GetBody().GetEvacuatedTrees(),
|
|
resp.GetBody().GetTotalTrees(),
|
|
resp.GetBody().GetFailedTrees()))
|
|
}
|
|
|
|
func initControlEvacuationShardCmd() {
|
|
evacuationShardCmd.AddCommand(startEvacuationShardCmd)
|
|
evacuationShardCmd.AddCommand(getEvacuationShardStatusCmd)
|
|
evacuationShardCmd.AddCommand(stopEvacuationShardCmd)
|
|
evacuationShardCmd.AddCommand(resetEvacuationStatusShardCmd)
|
|
|
|
initControlStartEvacuationShardCmd()
|
|
initControlFlags(getEvacuationShardStatusCmd)
|
|
initControlFlags(stopEvacuationShardCmd)
|
|
initControlFlags(resetEvacuationStatusShardCmd)
|
|
}
|
|
|
|
func initControlStartEvacuationShardCmd() {
|
|
initControlFlags(startEvacuationShardCmd)
|
|
|
|
flags := startEvacuationShardCmd.Flags()
|
|
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
|
flags.Bool(shardAllFlag, false, "Process all shards")
|
|
flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
|
|
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
|
|
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
|
|
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
|
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
|
|
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
|
|
|
|
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
|
}
|