forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
4 changed files with 354 additions and 14 deletions
|
@ -11,10 +11,11 @@ import (
|
||||||
const ignoreErrorsFlag = "no-errors"
|
const ignoreErrorsFlag = "no-errors"
|
||||||
|
|
||||||
var evacuateShardCmd = &cobra.Command{
|
var evacuateShardCmd = &cobra.Command{
|
||||||
Use: "evacuate",
|
Use: "evacuate",
|
||||||
Short: "Evacuate objects from shard",
|
Short: "Evacuate objects from shard",
|
||||||
Long: "Evacuate objects from shard to other shards",
|
Long: "Evacuate objects from shard to other shards",
|
||||||
Run: evacuateShard,
|
Run: evacuateShard,
|
||||||
|
Deprecated: "use frostfs-cli control shards evacuation start",
|
||||||
}
|
}
|
||||||
|
|
||||||
func evacuateShard(cmd *cobra.Command, _ []string) {
|
func evacuateShard(cmd *cobra.Command, _ []string) {
|
||||||
|
|
296
cmd/frostfs-cli/modules/control/evacuation.go
Normal file
296
cmd/frostfs-cli/modules/control/evacuation.go
Normal file
|
@ -0,0 +1,296 @@
|
||||||
|
package control
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
|
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
awaitFlag = "await"
|
||||||
|
noProgressFlag = "no-progress"
|
||||||
|
)
|
||||||
|
|
||||||
|
var evacuationShardCmd = &cobra.Command{
|
||||||
|
Use: "evacuation",
|
||||||
|
Short: "Objects evacuation from shard",
|
||||||
|
Long: "Objects evacuation from shard to other shards",
|
||||||
|
}
|
||||||
|
|
||||||
|
var startEvacuationShardCmd = &cobra.Command{
|
||||||
|
Use: "start",
|
||||||
|
Short: "Start evacuate objects from shard",
|
||||||
|
Long: "Start evacuate objects from shard to other shards",
|
||||||
|
Run: startEvacuateShard,
|
||||||
|
}
|
||||||
|
|
||||||
|
var getEvacuationShardStatusCmd = &cobra.Command{
|
||||||
|
Use: "status",
|
||||||
|
Short: "Get evacuate objects from shard status",
|
||||||
|
Long: "Get evacuate objects from shard to other shards status",
|
||||||
|
Run: getEvacuateShardStatus,
|
||||||
|
}
|
||||||
|
|
||||||
|
var stopEvacuationShardCmd = &cobra.Command{
|
||||||
|
Use: "stop",
|
||||||
|
Short: "Stop running evacuate process",
|
||||||
|
Long: "Stop running evacuate process from shard to other shards",
|
||||||
|
Run: stopEvacuateShardStatus,
|
||||||
|
}
|
||||||
|
|
||||||
|
func startEvacuateShard(cmd *cobra.Command, _ []string) {
|
||||||
|
pk := key.Get(cmd)
|
||||||
|
|
||||||
|
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||||
|
|
||||||
|
req := &control.StartShardEvacuationRequest{
|
||||||
|
Body: &control.StartShardEvacuationRequest_Body{
|
||||||
|
Shard_ID: getShardIDList(cmd),
|
||||||
|
IgnoreErrors: ignoreErrors,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
signRequest(cmd, pk, req)
|
||||||
|
|
||||||
|
cli := getClient(cmd, pk)
|
||||||
|
|
||||||
|
var resp *control.StartShardEvacuationResponse
|
||||||
|
var err error
|
||||||
|
err = cli.ExecRaw(func(client *client.Client) error {
|
||||||
|
resp, err = control.StartEvacuateShard(client, req)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
commonCmd.ExitOnErr(cmd, "Start evacuate shards failed, rpc error: %w", err)
|
||||||
|
|
||||||
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||||
|
|
||||||
|
cmd.Println("Shard evacuation has been successfully started.")
|
||||||
|
|
||||||
|
if awaitCompletion, _ := cmd.Flags().GetBool(awaitFlag); awaitCompletion {
|
||||||
|
noProgress, _ := cmd.Flags().GetBool(noProgressFlag)
|
||||||
|
waitEvacuateCompletion(cmd, pk, cli, !noProgress, true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEvacuateShardStatus(cmd *cobra.Command, _ []string) {
|
||||||
|
pk := key.Get(cmd)
|
||||||
|
req := &control.GetShardEvacuationStatusRequest{
|
||||||
|
Body: &control.GetShardEvacuationStatusRequest_Body{},
|
||||||
|
}
|
||||||
|
|
||||||
|
signRequest(cmd, pk, req)
|
||||||
|
|
||||||
|
cli := getClient(cmd, pk)
|
||||||
|
|
||||||
|
var resp *control.GetShardEvacuationStatusResponse
|
||||||
|
var err error
|
||||||
|
err = cli.ExecRaw(func(client *client.Client) error {
|
||||||
|
resp, err = control.GetEvacuateShardStatus(client, req)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
commonCmd.ExitOnErr(cmd, "Get evacuate shards status failed, rpc error: %w", err)
|
||||||
|
|
||||||
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||||
|
|
||||||
|
printStatus(cmd, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func stopEvacuateShardStatus(cmd *cobra.Command, _ []string) {
|
||||||
|
pk := key.Get(cmd)
|
||||||
|
req := &control.StopShardEvacuationRequest{
|
||||||
|
Body: &control.StopShardEvacuationRequest_Body{},
|
||||||
|
}
|
||||||
|
|
||||||
|
signRequest(cmd, pk, req)
|
||||||
|
|
||||||
|
cli := getClient(cmd, pk)
|
||||||
|
|
||||||
|
var resp *control.StopShardEvacuationResponse
|
||||||
|
var err error
|
||||||
|
err = cli.ExecRaw(func(client *client.Client) error {
|
||||||
|
resp, err = control.StopEvacuateShard(client, req)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
commonCmd.ExitOnErr(cmd, "Stop evacuate shards failed, rpc error: %w", err)
|
||||||
|
|
||||||
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||||
|
|
||||||
|
waitEvacuateCompletion(cmd, pk, cli, false, false)
|
||||||
|
|
||||||
|
cmd.Println("Evacuation stopped.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitEvacuateCompletion(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *clientSDK.Client, printProgress, printCompleted bool) {
|
||||||
|
const statusPollingInterval = 1 * time.Second
|
||||||
|
const reportIntervalSeconds = 5
|
||||||
|
var resp *control.GetShardEvacuationStatusResponse
|
||||||
|
reportResponse := atomic.NewPointer(resp)
|
||||||
|
pollingCompleted := make(chan struct{})
|
||||||
|
progressReportCompleted := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(progressReportCompleted)
|
||||||
|
if !printProgress {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cmd.Printf("Progress will be reported every %d seconds.\n", reportIntervalSeconds)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-pollingCompleted:
|
||||||
|
return
|
||||||
|
case <-time.After(reportIntervalSeconds * time.Second):
|
||||||
|
r := reportResponse.Load()
|
||||||
|
if r == nil || r.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_COMPLETED {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
printStatus(cmd, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
req := &control.GetShardEvacuationStatusRequest{
|
||||||
|
Body: &control.GetShardEvacuationStatusRequest_Body{},
|
||||||
|
}
|
||||||
|
signRequest(cmd, pk, req)
|
||||||
|
|
||||||
|
var err error
|
||||||
|
err = cli.ExecRaw(func(client *client.Client) error {
|
||||||
|
resp, err = control.GetEvacuateShardStatus(client, req)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
reportResponse.Store(resp)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
commonCmd.ExitOnErr(cmd, "Failed to get evacuate status, rpc error: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(statusPollingInterval)
|
||||||
|
}
|
||||||
|
close(pollingCompleted)
|
||||||
|
<-progressReportCompleted
|
||||||
|
if printCompleted {
|
||||||
|
printCompletedStatusMessage(cmd, resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func printCompletedStatusMessage(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
|
||||||
|
cmd.Println("Shard evacuation has been completed.")
|
||||||
|
sb := &strings.Builder{}
|
||||||
|
appendShardIDs(sb, resp)
|
||||||
|
appendCounts(sb, resp)
|
||||||
|
appendError(sb, resp)
|
||||||
|
appendStartedAt(sb, resp)
|
||||||
|
appendDuration(sb, resp)
|
||||||
|
cmd.Println(sb.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func printStatus(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
|
||||||
|
if resp.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED {
|
||||||
|
cmd.Println("There is no running or completed evacuation.")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sb := &strings.Builder{}
|
||||||
|
appendShardIDs(sb, resp)
|
||||||
|
appendStatus(sb, resp)
|
||||||
|
appendCounts(sb, resp)
|
||||||
|
appendError(sb, resp)
|
||||||
|
appendStartedAt(sb, resp)
|
||||||
|
appendDuration(sb, resp)
|
||||||
|
cmd.Println(sb.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendDuration(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||||
|
if resp.GetBody().GetDuration() != nil {
|
||||||
|
duration := time.Second * time.Duration(resp.GetBody().GetDuration().GetSeconds())
|
||||||
|
hour := int(duration.Seconds() / 3600)
|
||||||
|
minute := int(duration.Seconds()/60) % 60
|
||||||
|
second := int(duration.Seconds()) % 60
|
||||||
|
sb.WriteString(fmt.Sprintf(" Duration: %02d:%02d:%02d.", hour, minute, second))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendStartedAt(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||||
|
if resp.GetBody().GetStartedAt() != nil {
|
||||||
|
startedAt := time.Unix(resp.GetBody().GetStartedAt().GetValue(), 0).UTC()
|
||||||
|
sb.WriteString(fmt.Sprintf(" Started at: %s UTC.", startedAt.Format(time.RFC3339)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendError(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||||
|
if len(resp.Body.GetErrorMessage()) > 0 {
|
||||||
|
sb.WriteString(fmt.Sprintf(" Error: %s.", resp.Body.GetErrorMessage()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendStatus(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||||
|
var status string
|
||||||
|
switch resp.GetBody().GetStatus() {
|
||||||
|
case control.GetShardEvacuationStatusResponse_Body_COMPLETED:
|
||||||
|
status = "completed"
|
||||||
|
case control.GetShardEvacuationStatusResponse_Body_RUNNING:
|
||||||
|
status = "running"
|
||||||
|
default:
|
||||||
|
status = "undefined"
|
||||||
|
}
|
||||||
|
sb.WriteString(fmt.Sprintf(" Status: %s.", status))
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||||
|
sb.WriteString("Shard IDs: ")
|
||||||
|
for idx, shardID := range resp.GetBody().GetShard_ID() {
|
||||||
|
shardIDStr := shard.NewIDFromBytes(shardID).String()
|
||||||
|
if idx > 0 {
|
||||||
|
sb.WriteString(", ")
|
||||||
|
}
|
||||||
|
sb.WriteString(shardIDStr)
|
||||||
|
if idx == len(resp.GetBody().GetShard_ID())-1 {
|
||||||
|
sb.WriteString(".")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||||
|
sb.WriteString(fmt.Sprintf(" Evacuated %d object out of %d, failed to evacuate %d objects.",
|
||||||
|
resp.GetBody().GetEvacuated(),
|
||||||
|
resp.Body.GetTotal(),
|
||||||
|
resp.Body.GetFailed()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func initControlEvacuationShardCmd() {
|
||||||
|
evacuationShardCmd.AddCommand(startEvacuationShardCmd)
|
||||||
|
evacuationShardCmd.AddCommand(getEvacuationShardStatusCmd)
|
||||||
|
evacuationShardCmd.AddCommand(stopEvacuationShardCmd)
|
||||||
|
|
||||||
|
initControlStartEvacuationShardCmd()
|
||||||
|
initControlFlags(getEvacuationShardStatusCmd)
|
||||||
|
initControlFlags(stopEvacuationShardCmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
func initControlStartEvacuationShardCmd() {
|
||||||
|
initControlFlags(startEvacuationShardCmd)
|
||||||
|
|
||||||
|
flags := startEvacuationShardCmd.Flags()
|
||||||
|
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||||
|
flags.Bool(shardAllFlag, false, "Process all shards")
|
||||||
|
flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
|
||||||
|
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
|
||||||
|
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
||||||
|
|
||||||
|
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||||
|
}
|
|
@ -13,13 +13,14 @@ var shardsCmd = &cobra.Command{
|
||||||
func initControlShardsCmd() {
|
func initControlShardsCmd() {
|
||||||
shardsCmd.AddCommand(listShardsCmd)
|
shardsCmd.AddCommand(listShardsCmd)
|
||||||
shardsCmd.AddCommand(setShardModeCmd)
|
shardsCmd.AddCommand(setShardModeCmd)
|
||||||
shardsCmd.AddCommand(evacuateShardCmd)
|
shardsCmd.AddCommand(evacuationShardCmd)
|
||||||
shardsCmd.AddCommand(flushCacheCmd)
|
shardsCmd.AddCommand(flushCacheCmd)
|
||||||
shardsCmd.AddCommand(doctorCmd)
|
shardsCmd.AddCommand(doctorCmd)
|
||||||
|
|
||||||
initControlShardsListCmd()
|
initControlShardsListCmd()
|
||||||
initControlSetShardModeCmd()
|
initControlSetShardModeCmd()
|
||||||
initControlEvacuateShardCmd()
|
initControlEvacuateShardCmd()
|
||||||
|
initControlEvacuationShardCmd()
|
||||||
initControlFlushCacheCmd()
|
initControlFlushCacheCmd()
|
||||||
initControlDoctorCmd()
|
initControlDoctorCmd()
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,15 +8,18 @@ import (
|
||||||
const serviceName = "control.ControlService"
|
const serviceName = "control.ControlService"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
rpcHealthCheck = "HealthCheck"
|
rpcHealthCheck = "HealthCheck"
|
||||||
rpcSetNetmapStatus = "SetNetmapStatus"
|
rpcSetNetmapStatus = "SetNetmapStatus"
|
||||||
rpcDropObjects = "DropObjects"
|
rpcDropObjects = "DropObjects"
|
||||||
rpcListShards = "ListShards"
|
rpcListShards = "ListShards"
|
||||||
rpcSetShardMode = "SetShardMode"
|
rpcSetShardMode = "SetShardMode"
|
||||||
rpcSynchronizeTree = "SynchronizeTree"
|
rpcSynchronizeTree = "SynchronizeTree"
|
||||||
rpcEvacuateShard = "EvacuateShard"
|
rpcEvacuateShard = "EvacuateShard"
|
||||||
rpcFlushCache = "FlushCache"
|
rpcStartEvacuateShard = "StartEvacuateShard"
|
||||||
rpcDoctor = "Doctor"
|
rpcGetEvacuateShardStatus = "GetEvacuateShardStatus"
|
||||||
|
rpcStopEvacuateShardStatus = "StopEvacuateShard"
|
||||||
|
rpcFlushCache = "FlushCache"
|
||||||
|
rpcDoctor = "Doctor"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||||
|
@ -141,6 +144,45 @@ func EvacuateShard(cli *client.Client, req *EvacuateShardRequest, opts ...client
|
||||||
return wResp.message, nil
|
return wResp.message, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StartEvacuateShard executes ControlService.StartEvacuateShard RPC.
|
||||||
|
func StartEvacuateShard(cli *client.Client, req *StartShardEvacuationRequest, opts ...client.CallOption) (*StartShardEvacuationResponse, error) {
|
||||||
|
wResp := newResponseWrapper[StartShardEvacuationResponse]()
|
||||||
|
wReq := &requestWrapper{m: req}
|
||||||
|
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStartEvacuateShard), wReq, wResp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return wResp.message, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEvacuateShardStatus executes ControlService.GetEvacuateShardStatus RPC.
|
||||||
|
func GetEvacuateShardStatus(cli *client.Client, req *GetShardEvacuationStatusRequest, opts ...client.CallOption) (*GetShardEvacuationStatusResponse, error) {
|
||||||
|
wResp := newResponseWrapper[GetShardEvacuationStatusResponse]()
|
||||||
|
wReq := &requestWrapper{m: req}
|
||||||
|
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcGetEvacuateShardStatus), wReq, wResp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return wResp.message, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopEvacuateShard executes ControlService.StopEvacuateShard RPC.
|
||||||
|
func StopEvacuateShard(cli *client.Client, req *StopShardEvacuationRequest, opts ...client.CallOption) (*StopShardEvacuationResponse, error) {
|
||||||
|
wResp := newResponseWrapper[StopShardEvacuationResponse]()
|
||||||
|
wReq := &requestWrapper{m: req}
|
||||||
|
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStopEvacuateShardStatus), wReq, wResp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return wResp.message, nil
|
||||||
|
}
|
||||||
|
|
||||||
// FlushCache executes ControlService.FlushCache RPC.
|
// FlushCache executes ControlService.FlushCache RPC.
|
||||||
func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallOption) (*FlushCacheResponse, error) {
|
func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallOption) (*FlushCacheResponse, error) {
|
||||||
wResp := newResponseWrapper[FlushCacheResponse]()
|
wResp := newResponseWrapper[FlushCacheResponse]()
|
||||||
|
|
Loading…
Reference in a new issue