Add await flag to frostfs-cli control set-status command #1194

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:feat/control_set_status_await into master 2024-06-24 11:12:26 +00:00
11 changed files with 1837 additions and 1160 deletions

View File

@ -50,6 +50,9 @@ const (
TracingFlag = "trace"
TracingFlagUsage = "Generate trace ID and print it."
AwaitFlag = "await"
AwaitFlagUsage = "Wait for the operation to complete"
)
// Init adds common flags to the command:

View File

@ -1,7 +1,10 @@
package control
import (
"crypto/ecdsa"
"errors"
"fmt"
"time"
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
@ -9,6 +12,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"github.com/spf13/cobra"
)
@ -18,8 +22,13 @@ const (
netmapStatusOnline = "online"
netmapStatusOffline = "offline"
netmapStatusMaintenance = "maintenance"
maxSetStatusMaxWaitTime = 30 * time.Minute
setStatusWaitTimeout = 30 * time.Second
)
var errNetmapStatusAwaitFailed = errors.New("netmap status hasn't changed for 30 minutes")
var setNetmapStatusCmd = &cobra.Command{
Use: "set-status",
Short: "Set status of the storage node in FrostFS network map",
@ -43,6 +52,8 @@ func initControlSetNetmapStatusCmd() {
flags.BoolP(commonflags.ForceFlag, commonflags.ForceFlagShorthand, false,
"Force turning to local maintenance")
flags.Bool(commonflags.AwaitFlag, false, commonflags.AwaitFlagUsage)
}
func setNetmapStatus(cmd *cobra.Command, _ []string) {
@ -56,15 +67,19 @@ func setNetmapStatus(cmd *cobra.Command, _ []string) {
}
}
await, _ := cmd.Flags().GetBool(commonflags.AwaitFlag)
var targetStatus control.NetmapStatus
switch st, _ := cmd.Flags().GetString(netmapStatusFlag); st {
default:
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("unsupported status %s", st))
case netmapStatusOnline:
body.SetStatus(control.NetmapStatus_ONLINE)
printIgnoreForce(control.NetmapStatus_ONLINE)
targetStatus = control.NetmapStatus_ONLINE
case netmapStatusOffline:
body.SetStatus(control.NetmapStatus_OFFLINE)
printIgnoreForce(control.NetmapStatus_OFFLINE)
targetStatus = control.NetmapStatus_OFFLINE
case netmapStatusMaintenance:
body.SetStatus(control.NetmapStatus_MAINTENANCE)
@ -72,6 +87,7 @@ func setNetmapStatus(cmd *cobra.Command, _ []string) {
body.SetForceMaintenance()
common.PrintVerbose(cmd, "Local maintenance will be forced.")
}
targetStatus = control.NetmapStatus_MAINTENANCE
}
req := new(control.SetNetmapStatusRequest)
@ -92,4 +108,52 @@ func setNetmapStatus(cmd *cobra.Command, _ []string) {
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
cmd.Println("Network status update request successfully sent.")
if await {
awaitSetNetmapStatus(cmd, pk, cli, targetStatus)
}
}
func awaitSetNetmapStatus(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *client.Client, targetStatus control.NetmapStatus) {
req := &control.GetNetmapStatusRequest{
Body: &control.GetNetmapStatusRequest_Body{},
}
signRequest(cmd, pk, req)
var epoch uint64
var status control.NetmapStatus
startTime := time.Now()
cmd.Println("Wait until epoch and netmap status change...")
for {
var resp *control.GetNetmapStatusResponse
var err error
err = cli.ExecRaw(func(client *rawclient.Client) error {
resp, err = control.GetNetmapStatus(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "failed to get current netmap status: %w", err)
if epoch == 0 {
epoch = resp.GetBody().GetEpoch()
}
status = resp.GetBody().GetStatus()
if resp.GetBody().GetEpoch() > epoch {
epoch = resp.GetBody().GetEpoch()
cmd.Printf("Epoch changed to %d\n", resp.GetBody().GetEpoch())
}
if status == targetStatus {
break
}
if time.Since(startTime) > maxSetStatusMaxWaitTime {
commonCmd.ExitOnErr(cmd, "failed to wait netmap status: %w", errNetmapStatusAwaitFailed)
return
}
time.Sleep(setStatusWaitTimeout)
cmd.Printf("Current netmap status '%s', target status '%s'\n", status.String(), targetStatus.String())
}
cmd.Printf("Netmap status changed to '%s' successfully.\n", status.String())
}

View File

@ -374,6 +374,15 @@ func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error {
return c.updateNetMapState(func(*nmClient.UpdatePeerPrm) {})
}
func (c *cfg) GetNetmapStatus() (control.NetmapStatus, uint64, error) {
epoch, err := c.netMapSource.Epoch()
if err != nil {
return control.NetmapStatus_STATUS_UNDEFINED, 0, fmt.Errorf("failed to get current epoch: %w", err)
}
st := c.NetmapStatus()
return st, epoch, nil
}
func (c *cfg) ForceMaintenance() error {
return c.setMaintenanceStatus(true)
}

View File

@ -10,6 +10,7 @@ const serviceName = "control.ControlService"
const (
rpcHealthCheck = "HealthCheck"
rpcSetNetmapStatus = "SetNetmapStatus"
rpcGetNetmapStatus = "GetNetmapStatus"
rpcDropObjects = "DropObjects"
rpcListShards = "ListShards"
rpcSetShardMode = "SetShardMode"
@ -70,6 +71,26 @@ func SetNetmapStatus(
return wResp.message, nil
}
// GetNetmapStatus executes ControlService.GetNetmapStatus RPC.
func GetNetmapStatus(
cli *client.Client,
req *GetNetmapStatusRequest,
opts ...client.CallOption,
) (*GetNetmapStatusResponse, error) {
wResp := newResponseWrapper[GetNetmapStatusResponse]()
wReq := &requestWrapper{
m: req,
}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcGetNetmapStatus), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}
// DropObjects executes ControlService.DropObjects RPC.
func DropObjects(
cli *client.Client,

View File

@ -242,6 +242,17 @@ func (a *auditService) SetNetmapStatus(ctx context.Context, req *ctl.SetNetmapSt
return res, err
}
// GetNetmapStatus implements control.ControlServiceServer.
func (a *auditService) GetNetmapStatus(ctx context.Context, req *ctl.GetNetmapStatusRequest) (*ctl.GetNetmapStatusResponse, error) {
res, err := a.next.GetNetmapStatus(ctx, req)
if !a.enabled.Load() {
return res, err
}
audit.LogRequestWithKey(a.log, ctl.ControlService_GetNetmapStatus_FullMethodName, req.GetSignature().GetKey(),
nil, err == nil)
return res, err
}
// SetShardMode implements control.ControlServiceServer.
func (a *auditService) SetShardMode(ctx context.Context, req *ctl.SetShardModeRequest) (*ctl.SetShardModeResponse, error) {
res, err := a.next.SetShardMode(ctx, req)

View File

@ -0,0 +1,35 @@
package control
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// GetNetmapStatus gets node status in FrostFS network.
func (s *Server) GetNetmapStatus(_ context.Context, req *control.GetNetmapStatusRequest) (*control.GetNetmapStatusResponse, error) {
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
st, epoch, err := s.nodeState.GetNetmapStatus()
if err != nil {
return nil, err
}
resp := &control.GetNetmapStatusResponse{
Body: &control.GetNetmapStatusResponse_Body{
Status: st,
Epoch: epoch,
},
}
if err := ctrlmessage.Sign(s.key, resp); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}

View File

@ -50,6 +50,8 @@ type NodeState interface {
// ForceMaintenance works like SetNetmapStatus(control.NetmapStatus_MAINTENANCE)
// but starts local maintenance regardless of the network settings.
ForceMaintenance() error
GetNetmapStatus() (control.NetmapStatus, uint64, error)
}
// LocalOverrideStorageDecorator interface provides methods to decorate LocalOverrideEngine

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,9 @@ service ControlService {
// Sets status of the storage node in FrostFS network map.
rpc SetNetmapStatus(SetNetmapStatusRequest) returns (SetNetmapStatusResponse);
// Gets status of the storage node in FrostFS network map.
rpc GetNetmapStatus(GetNetmapStatusRequest) returns (GetNetmapStatusResponse);
// Mark objects to be removed from node's local object storage.
rpc DropObjects(DropObjectsRequest) returns (DropObjectsResponse);
@ -156,6 +159,34 @@ message SetNetmapStatusResponse {
Signature signature = 2;
}
// Get netmap status request.
message GetNetmapStatusRequest {
message Body {}
// Body of set netmap status request message.
Body body = 1;
// Body signature.
Signature signature = 2;
}
// Get netmap status response.
message GetNetmapStatusResponse {
message Body {
// Storage node status in FrostFS network map.
NetmapStatus status = 1;
// Network map epoch.
uint64 epoch = 2;
}
// Body of get netmap status response message.
Body body = 1;
// Body signature.
Signature signature = 2;
}
// Request to drop the objects.
message DropObjectsRequest {
// Request body structure.

View File

@ -334,6 +334,171 @@ func (x *SetNetmapStatusResponse) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetNetmapStatusRequest_Body) StableSize() (size int) {
if x == nil {
return 0
}
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetNetmapStatusRequest_Body) StableMarshal(buf []byte) []byte {
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetNetmapStatusRequest) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetNetmapStatusRequest) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *GetNetmapStatusRequest) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *GetNetmapStatusRequest) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *GetNetmapStatusRequest) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetNetmapStatusResponse_Body) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.EnumSize(1, int32(x.Status))
size += proto.UInt64Size(2, x.Epoch)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetNetmapStatusResponse_Body) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.EnumMarshal(1, buf[offset:], int32(x.Status))
offset += proto.UInt64Marshal(2, buf[offset:], x.Epoch)
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetNetmapStatusResponse) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetNetmapStatusResponse) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *GetNetmapStatusResponse) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *GetNetmapStatusResponse) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *GetNetmapStatusResponse) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.

View File

@ -21,6 +21,7 @@ const _ = grpc.SupportPackageIsVersion7
const (
ControlService_HealthCheck_FullMethodName = "/control.ControlService/HealthCheck"
ControlService_SetNetmapStatus_FullMethodName = "/control.ControlService/SetNetmapStatus"
ControlService_GetNetmapStatus_FullMethodName = "/control.ControlService/GetNetmapStatus"
ControlService_DropObjects_FullMethodName = "/control.ControlService/DropObjects"
ControlService_ListShards_FullMethodName = "/control.ControlService/ListShards"
ControlService_SetShardMode_FullMethodName = "/control.ControlService/SetShardMode"
@ -50,6 +51,8 @@ type ControlServiceClient interface {
HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
// Sets status of the storage node in FrostFS network map.
SetNetmapStatus(ctx context.Context, in *SetNetmapStatusRequest, opts ...grpc.CallOption) (*SetNetmapStatusResponse, error)
// Gets status of the storage node in FrostFS network map.
GetNetmapStatus(ctx context.Context, in *GetNetmapStatusRequest, opts ...grpc.CallOption) (*GetNetmapStatusResponse, error)
// Mark objects to be removed from node's local object storage.
DropObjects(ctx context.Context, in *DropObjectsRequest, opts ...grpc.CallOption) (*DropObjectsResponse, error)
// Returns list that contains information about all shards of a node.
@ -122,6 +125,15 @@ func (c *controlServiceClient) SetNetmapStatus(ctx context.Context, in *SetNetma
return out, nil
}
func (c *controlServiceClient) GetNetmapStatus(ctx context.Context, in *GetNetmapStatusRequest, opts ...grpc.CallOption) (*GetNetmapStatusResponse, error) {
out := new(GetNetmapStatusResponse)
err := c.cc.Invoke(ctx, ControlService_GetNetmapStatus_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlServiceClient) DropObjects(ctx context.Context, in *DropObjectsRequest, opts ...grpc.CallOption) (*DropObjectsResponse, error) {
out := new(DropObjectsResponse)
err := c.cc.Invoke(ctx, ControlService_DropObjects_FullMethodName, in, out, opts...)
@ -301,6 +313,8 @@ type ControlServiceServer interface {
HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
// Sets status of the storage node in FrostFS network map.
SetNetmapStatus(context.Context, *SetNetmapStatusRequest) (*SetNetmapStatusResponse, error)
// Gets status of the storage node in FrostFS network map.
GetNetmapStatus(context.Context, *GetNetmapStatusRequest) (*GetNetmapStatusResponse, error)
// Mark objects to be removed from node's local object storage.
DropObjects(context.Context, *DropObjectsRequest) (*DropObjectsResponse, error)
// Returns list that contains information about all shards of a node.
@ -357,6 +371,9 @@ func (UnimplementedControlServiceServer) HealthCheck(context.Context, *HealthChe
func (UnimplementedControlServiceServer) SetNetmapStatus(context.Context, *SetNetmapStatusRequest) (*SetNetmapStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SetNetmapStatus not implemented")
}
func (UnimplementedControlServiceServer) GetNetmapStatus(context.Context, *GetNetmapStatusRequest) (*GetNetmapStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetNetmapStatus not implemented")
}
func (UnimplementedControlServiceServer) DropObjects(context.Context, *DropObjectsRequest) (*DropObjectsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method DropObjects not implemented")
}
@ -462,6 +479,24 @@ func _ControlService_SetNetmapStatus_Handler(srv interface{}, ctx context.Contex
return interceptor(ctx, in, info, handler)
}
func _ControlService_GetNetmapStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetNetmapStatusRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlServiceServer).GetNetmapStatus(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlService_GetNetmapStatus_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).GetNetmapStatus(ctx, req.(*GetNetmapStatusRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlService_DropObjects_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DropObjectsRequest)
if err := dec(in); err != nil {
@ -819,6 +854,10 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
MethodName: "SetNetmapStatus",
Handler: _ControlService_SetNetmapStatus_Handler,
},
{
MethodName: "GetNetmapStatus",
Handler: _ControlService_GetNetmapStatus_Handler,
},
{
MethodName: "DropObjects",
Handler: _ControlService_DropObjects_Handler,