[#60] control: Add GetNetmapStatus method

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-06-20 16:03:58 +03:00
parent 9ac74efc41
commit a83eeddb1d
9 changed files with 1770 additions and 1160 deletions

View file

@ -374,6 +374,15 @@ func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error {
return c.updateNetMapState(func(*nmClient.UpdatePeerPrm) {})
}
func (c *cfg) GetNetmapStatus() (control.NetmapStatus, uint64, error) {
epoch, err := c.netMapSource.Epoch()
if err != nil {
return control.NetmapStatus_STATUS_UNDEFINED, 0, fmt.Errorf("failed to get current epoch: %w", err)
}
st := c.NetmapStatus()
return st, epoch, nil
}
func (c *cfg) ForceMaintenance() error {
return c.setMaintenanceStatus(true)
}

View file

@ -10,6 +10,7 @@ const serviceName = "control.ControlService"
const (
rpcHealthCheck = "HealthCheck"
rpcSetNetmapStatus = "SetNetmapStatus"
rpcGetNetmapStatus = "GetNetmapStatus"
rpcDropObjects = "DropObjects"
rpcListShards = "ListShards"
rpcSetShardMode = "SetShardMode"
@ -70,6 +71,26 @@ func SetNetmapStatus(
return wResp.message, nil
}
// GetNetmapStatus executes ControlService.GetNetmapStatus RPC.
func GetNetmapStatus(
cli *client.Client,
req *GetNetmapStatusRequest,
opts ...client.CallOption,
) (*GetNetmapStatusResponse, error) {
wResp := newResponseWrapper[GetNetmapStatusResponse]()
wReq := &requestWrapper{
m: req,
}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcGetNetmapStatus), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}
// DropObjects executes ControlService.DropObjects RPC.
func DropObjects(
cli *client.Client,

View file

@ -242,6 +242,17 @@ func (a *auditService) SetNetmapStatus(ctx context.Context, req *ctl.SetNetmapSt
return res, err
}
// GetNetmapStatus implements control.ControlServiceServer.
func (a *auditService) GetNetmapStatus(ctx context.Context, req *ctl.GetNetmapStatusRequest) (*ctl.GetNetmapStatusResponse, error) {
res, err := a.next.GetNetmapStatus(ctx, req)
if !a.enabled.Load() {
return res, err
}
audit.LogRequestWithKey(a.log, ctl.ControlService_GetNetmapStatus_FullMethodName, req.GetSignature().GetKey(),
nil, err == nil)
return res, err
}
// SetShardMode implements control.ControlServiceServer.
func (a *auditService) SetShardMode(ctx context.Context, req *ctl.SetShardModeRequest) (*ctl.SetShardModeResponse, error) {
res, err := a.next.SetShardMode(ctx, req)

View file

@ -0,0 +1,35 @@
package control
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// GetNetmapStatus gets node status in FrostFS network.
func (s *Server) GetNetmapStatus(_ context.Context, req *control.GetNetmapStatusRequest) (*control.GetNetmapStatusResponse, error) {
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
st, epoch, err := s.nodeState.GetNetmapStatus()
if err != nil {
return nil, err
}
resp := &control.GetNetmapStatusResponse{
Body: &control.GetNetmapStatusResponse_Body{
Status: st,
Epoch: epoch,
},
}
if err := ctrlmessage.Sign(s.key, resp); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}

View file

@ -50,6 +50,8 @@ type NodeState interface {
// ForceMaintenance works like SetNetmapStatus(control.NetmapStatus_MAINTENANCE)
// but starts local maintenance regardless of the network settings.
ForceMaintenance() error
GetNetmapStatus() (control.NetmapStatus, uint64, error)
}
// LocalOverrideStorageDecorator interface provides methods to decorate LocalOverrideEngine

File diff suppressed because it is too large Load diff

View file

@ -15,6 +15,9 @@ service ControlService {
// Sets status of the storage node in FrostFS network map.
rpc SetNetmapStatus(SetNetmapStatusRequest) returns (SetNetmapStatusResponse);
// Gets status of the storage node in FrostFS network map.
rpc GetNetmapStatus(GetNetmapStatusRequest) returns (GetNetmapStatusResponse);
// Mark objects to be removed from node's local object storage.
rpc DropObjects(DropObjectsRequest) returns (DropObjectsResponse);
@ -156,6 +159,34 @@ message SetNetmapStatusResponse {
Signature signature = 2;
}
// Get netmap status request.
message GetNetmapStatusRequest {
message Body {}
// Body of set netmap status request message.
Body body = 1;
// Body signature.
Signature signature = 2;
}
// Get netmap status response.
message GetNetmapStatusResponse {
message Body {
// Storage node status in FrostFS network map.
NetmapStatus status = 1;
// Network map epoch.
uint64 epoch = 2;
}
// Body of get netmap status response message.
Body body = 1;
// Body signature.
Signature signature = 2;
}
// Request to drop the objects.
message DropObjectsRequest {
// Request body structure.

View file

@ -334,6 +334,171 @@ func (x *SetNetmapStatusResponse) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetNetmapStatusRequest_Body) StableSize() (size int) {
if x == nil {
return 0
}
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetNetmapStatusRequest_Body) StableMarshal(buf []byte) []byte {
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetNetmapStatusRequest) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetNetmapStatusRequest) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *GetNetmapStatusRequest) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *GetNetmapStatusRequest) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *GetNetmapStatusRequest) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetNetmapStatusResponse_Body) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.EnumSize(1, int32(x.Status))
size += proto.UInt64Size(2, x.Epoch)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetNetmapStatusResponse_Body) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.EnumMarshal(1, buf[offset:], int32(x.Status))
offset += proto.UInt64Marshal(2, buf[offset:], x.Epoch)
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetNetmapStatusResponse) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetNetmapStatusResponse) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *GetNetmapStatusResponse) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *GetNetmapStatusResponse) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *GetNetmapStatusResponse) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.

View file

@ -21,6 +21,7 @@ const _ = grpc.SupportPackageIsVersion7
const (
ControlService_HealthCheck_FullMethodName = "/control.ControlService/HealthCheck"
ControlService_SetNetmapStatus_FullMethodName = "/control.ControlService/SetNetmapStatus"
ControlService_GetNetmapStatus_FullMethodName = "/control.ControlService/GetNetmapStatus"
ControlService_DropObjects_FullMethodName = "/control.ControlService/DropObjects"
ControlService_ListShards_FullMethodName = "/control.ControlService/ListShards"
ControlService_SetShardMode_FullMethodName = "/control.ControlService/SetShardMode"
@ -50,6 +51,8 @@ type ControlServiceClient interface {
HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
// Sets status of the storage node in FrostFS network map.
SetNetmapStatus(ctx context.Context, in *SetNetmapStatusRequest, opts ...grpc.CallOption) (*SetNetmapStatusResponse, error)
// Gets status of the storage node in FrostFS network map.
GetNetmapStatus(ctx context.Context, in *GetNetmapStatusRequest, opts ...grpc.CallOption) (*GetNetmapStatusResponse, error)
// Mark objects to be removed from node's local object storage.
DropObjects(ctx context.Context, in *DropObjectsRequest, opts ...grpc.CallOption) (*DropObjectsResponse, error)
// Returns list that contains information about all shards of a node.
@ -122,6 +125,15 @@ func (c *controlServiceClient) SetNetmapStatus(ctx context.Context, in *SetNetma
return out, nil
}
func (c *controlServiceClient) GetNetmapStatus(ctx context.Context, in *GetNetmapStatusRequest, opts ...grpc.CallOption) (*GetNetmapStatusResponse, error) {
out := new(GetNetmapStatusResponse)
err := c.cc.Invoke(ctx, ControlService_GetNetmapStatus_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlServiceClient) DropObjects(ctx context.Context, in *DropObjectsRequest, opts ...grpc.CallOption) (*DropObjectsResponse, error) {
out := new(DropObjectsResponse)
err := c.cc.Invoke(ctx, ControlService_DropObjects_FullMethodName, in, out, opts...)
@ -301,6 +313,8 @@ type ControlServiceServer interface {
HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
// Sets status of the storage node in FrostFS network map.
SetNetmapStatus(context.Context, *SetNetmapStatusRequest) (*SetNetmapStatusResponse, error)
// Gets status of the storage node in FrostFS network map.
GetNetmapStatus(context.Context, *GetNetmapStatusRequest) (*GetNetmapStatusResponse, error)
// Mark objects to be removed from node's local object storage.
DropObjects(context.Context, *DropObjectsRequest) (*DropObjectsResponse, error)
// Returns list that contains information about all shards of a node.
@ -357,6 +371,9 @@ func (UnimplementedControlServiceServer) HealthCheck(context.Context, *HealthChe
func (UnimplementedControlServiceServer) SetNetmapStatus(context.Context, *SetNetmapStatusRequest) (*SetNetmapStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SetNetmapStatus not implemented")
}
func (UnimplementedControlServiceServer) GetNetmapStatus(context.Context, *GetNetmapStatusRequest) (*GetNetmapStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetNetmapStatus not implemented")
}
func (UnimplementedControlServiceServer) DropObjects(context.Context, *DropObjectsRequest) (*DropObjectsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method DropObjects not implemented")
}
@ -462,6 +479,24 @@ func _ControlService_SetNetmapStatus_Handler(srv interface{}, ctx context.Contex
return interceptor(ctx, in, info, handler)
}
func _ControlService_GetNetmapStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetNetmapStatusRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlServiceServer).GetNetmapStatus(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlService_GetNetmapStatus_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).GetNetmapStatus(ctx, req.(*GetNetmapStatusRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlService_DropObjects_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DropObjectsRequest)
if err := dec(in); err != nil {
@ -819,6 +854,10 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
MethodName: "SetNetmapStatus",
Handler: _ControlService_SetNetmapStatus_Handler,
},
{
MethodName: "GetNetmapStatus",
Handler: _ControlService_GetNetmapStatus_Handler,
},
{
MethodName: "DropObjects",
Handler: _ControlService_DropObjects_Handler,