[#1043] control: Add ResetEvacuationStatus implementation

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
pull/1031/head
Dmitrii Stepanov 2024-03-12 18:57:38 +03:00 committed by Evgenii Stratonikov
parent 926cdeb072
commit 31e2396a5f
8 changed files with 1254 additions and 705 deletions

View File

@ -733,3 +733,13 @@ func (e *StorageEngine) EnqueRunningEvacuationStop(ctx context.Context) error {
return e.evacuateLimiter.CancelIfRunning()
}
func (e *StorageEngine) ResetEvacuationStatus(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return e.evacuateLimiter.ResetEvacuationStatus()
}

View File

@ -204,3 +204,18 @@ func (l *evacuationLimiter) CancelIfRunning() error {
l.cancel()
return nil
}
func (l *evacuationLimiter) ResetEvacuationStatus() error {
l.guard.Lock()
defer l.guard.Unlock()
if l.state.processState == EvacuateProcessStateRunning {
return logicerr.New("there is running evacuation task")
}
l.state = EvacuationState{}
l.eg = nil
l.cancel = nil
return nil
}

View File

@ -386,6 +386,8 @@ func TestEvacuateObjectsAsync(t *testing.T) {
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid running shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
require.Error(t, e.ResetEvacuationStatus(context.Background()))
close(blocker)
require.Eventually(t, func() bool {
@ -401,6 +403,16 @@ func TestEvacuateObjectsAsync(t *testing.T) {
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
require.NoError(t, eg.Wait())
require.NoError(t, e.ResetEvacuationStatus(context.Background()))
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get state after reset failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid state after reset")
require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid count after reset")
require.Nil(t, st.StartedAt(), "invalid started at after reset")
require.Nil(t, st.FinishedAt(), "invalid finished at after reset")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid shard ids after reset")
require.Equal(t, "", st.ErrorMessage(), "invalid error message after reset")
}
func TestEvacuateTreesLocal(t *testing.T) {

View File

@ -102,3 +102,29 @@ func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShard
}
return resp, nil
}
func (s *Server) ResetShardEvacuationStatus(ctx context.Context, req *control.ResetShardEvacuationStatusRequest) (*control.ResetShardEvacuationStatusResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
err = s.s.ResetEvacuationStatus(ctx)
if err != nil {
var logicalErr logicerr.Logical
if errors.As(err, &logicalErr) {
return nil, status.Error(codes.Aborted, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.ResetShardEvacuationStatusResponse{
Body: &control.ResetShardEvacuationStatusResponse_Body{},
}
err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -36,6 +36,9 @@ service ControlService {
// GetShardEvacuationStatus returns evacuation status.
rpc GetShardEvacuationStatus (GetShardEvacuationStatusRequest) returns (GetShardEvacuationStatusResponse);
// ResetShardEvacuationStatus resets evacuation status if there is no running evacuation process.
rpc ResetShardEvacuationStatus (ResetShardEvacuationStatusRequest) returns (ResetShardEvacuationStatusResponse);
// StopShardEvacuation stops moving all data from one shard to the others.
rpc StopShardEvacuation (StopShardEvacuationRequest) returns (StopShardEvacuationResponse);
@ -426,6 +429,22 @@ message GetShardEvacuationStatusResponse {
Signature signature = 2;
}
// ResetShardEvacuationStatus request.
message ResetShardEvacuationStatusRequest {
message Body {}
Body body = 1;
Signature signature = 2;
}
// ResetShardEvacuationStatus response.
message ResetShardEvacuationStatusResponse {
message Body {}
Body body = 1;
Signature signature = 2;
}
// StopShardEvacuation request.
message StopShardEvacuationRequest {
// Request body structure.

View File

@ -1916,6 +1916,160 @@ func (x *GetShardEvacuationStatusResponse) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *ResetShardEvacuationStatusRequest_Body) StableSize() (size int) {
if x == nil {
return 0
}
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *ResetShardEvacuationStatusRequest_Body) StableMarshal(buf []byte) []byte {
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *ResetShardEvacuationStatusRequest) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *ResetShardEvacuationStatusRequest) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *ResetShardEvacuationStatusRequest) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *ResetShardEvacuationStatusRequest) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *ResetShardEvacuationStatusRequest) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *ResetShardEvacuationStatusResponse_Body) StableSize() (size int) {
if x == nil {
return 0
}
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *ResetShardEvacuationStatusResponse_Body) StableMarshal(buf []byte) []byte {
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *ResetShardEvacuationStatusResponse) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *ResetShardEvacuationStatusResponse) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *ResetShardEvacuationStatusResponse) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *ResetShardEvacuationStatusResponse) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *ResetShardEvacuationStatusResponse) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.

View File

@ -19,25 +19,26 @@ import (
const _ = grpc.SupportPackageIsVersion7
const (
ControlService_HealthCheck_FullMethodName = "/control.ControlService/HealthCheck"
ControlService_SetNetmapStatus_FullMethodName = "/control.ControlService/SetNetmapStatus"
ControlService_DropObjects_FullMethodName = "/control.ControlService/DropObjects"
ControlService_ListShards_FullMethodName = "/control.ControlService/ListShards"
ControlService_SetShardMode_FullMethodName = "/control.ControlService/SetShardMode"
ControlService_SynchronizeTree_FullMethodName = "/control.ControlService/SynchronizeTree"
ControlService_EvacuateShard_FullMethodName = "/control.ControlService/EvacuateShard"
ControlService_StartShardEvacuation_FullMethodName = "/control.ControlService/StartShardEvacuation"
ControlService_GetShardEvacuationStatus_FullMethodName = "/control.ControlService/GetShardEvacuationStatus"
ControlService_StopShardEvacuation_FullMethodName = "/control.ControlService/StopShardEvacuation"
ControlService_FlushCache_FullMethodName = "/control.ControlService/FlushCache"
ControlService_Doctor_FullMethodName = "/control.ControlService/Doctor"
ControlService_AddChainLocalOverride_FullMethodName = "/control.ControlService/AddChainLocalOverride"
ControlService_GetChainLocalOverride_FullMethodName = "/control.ControlService/GetChainLocalOverride"
ControlService_ListChainLocalOverrides_FullMethodName = "/control.ControlService/ListChainLocalOverrides"
ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride"
ControlService_ListTargetsLocalOverrides_FullMethodName = "/control.ControlService/ListTargetsLocalOverrides"
ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache"
ControlService_DetachShards_FullMethodName = "/control.ControlService/DetachShards"
ControlService_HealthCheck_FullMethodName = "/control.ControlService/HealthCheck"
ControlService_SetNetmapStatus_FullMethodName = "/control.ControlService/SetNetmapStatus"
ControlService_DropObjects_FullMethodName = "/control.ControlService/DropObjects"
ControlService_ListShards_FullMethodName = "/control.ControlService/ListShards"
ControlService_SetShardMode_FullMethodName = "/control.ControlService/SetShardMode"
ControlService_SynchronizeTree_FullMethodName = "/control.ControlService/SynchronizeTree"
ControlService_EvacuateShard_FullMethodName = "/control.ControlService/EvacuateShard"
ControlService_StartShardEvacuation_FullMethodName = "/control.ControlService/StartShardEvacuation"
ControlService_GetShardEvacuationStatus_FullMethodName = "/control.ControlService/GetShardEvacuationStatus"
ControlService_ResetShardEvacuationStatus_FullMethodName = "/control.ControlService/ResetShardEvacuationStatus"
ControlService_StopShardEvacuation_FullMethodName = "/control.ControlService/StopShardEvacuation"
ControlService_FlushCache_FullMethodName = "/control.ControlService/FlushCache"
ControlService_Doctor_FullMethodName = "/control.ControlService/Doctor"
ControlService_AddChainLocalOverride_FullMethodName = "/control.ControlService/AddChainLocalOverride"
ControlService_GetChainLocalOverride_FullMethodName = "/control.ControlService/GetChainLocalOverride"
ControlService_ListChainLocalOverrides_FullMethodName = "/control.ControlService/ListChainLocalOverrides"
ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride"
ControlService_ListTargetsLocalOverrides_FullMethodName = "/control.ControlService/ListTargetsLocalOverrides"
ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache"
ControlService_DetachShards_FullMethodName = "/control.ControlService/DetachShards"
)
// ControlServiceClient is the client API for ControlService service.
@ -63,6 +64,8 @@ type ControlServiceClient interface {
StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error)
// GetShardEvacuationStatus returns evacuation status.
GetShardEvacuationStatus(ctx context.Context, in *GetShardEvacuationStatusRequest, opts ...grpc.CallOption) (*GetShardEvacuationStatusResponse, error)
// ResetShardEvacuationStatus resets evacuation status if there is no running evacuation process.
ResetShardEvacuationStatus(ctx context.Context, in *ResetShardEvacuationStatusRequest, opts ...grpc.CallOption) (*ResetShardEvacuationStatusResponse, error)
// StopShardEvacuation stops moving all data from one shard to the others.
StopShardEvacuation(ctx context.Context, in *StopShardEvacuationRequest, opts ...grpc.CallOption) (*StopShardEvacuationResponse, error)
// FlushCache moves all data from one shard to the others.
@ -174,6 +177,15 @@ func (c *controlServiceClient) GetShardEvacuationStatus(ctx context.Context, in
return out, nil
}
func (c *controlServiceClient) ResetShardEvacuationStatus(ctx context.Context, in *ResetShardEvacuationStatusRequest, opts ...grpc.CallOption) (*ResetShardEvacuationStatusResponse, error) {
out := new(ResetShardEvacuationStatusResponse)
err := c.cc.Invoke(ctx, ControlService_ResetShardEvacuationStatus_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlServiceClient) StopShardEvacuation(ctx context.Context, in *StopShardEvacuationRequest, opts ...grpc.CallOption) (*StopShardEvacuationResponse, error) {
out := new(StopShardEvacuationResponse)
err := c.cc.Invoke(ctx, ControlService_StopShardEvacuation_FullMethodName, in, out, opts...)
@ -287,6 +299,8 @@ type ControlServiceServer interface {
StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error)
// GetShardEvacuationStatus returns evacuation status.
GetShardEvacuationStatus(context.Context, *GetShardEvacuationStatusRequest) (*GetShardEvacuationStatusResponse, error)
// ResetShardEvacuationStatus resets evacuation status if there is no running evacuation process.
ResetShardEvacuationStatus(context.Context, *ResetShardEvacuationStatusRequest) (*ResetShardEvacuationStatusResponse, error)
// StopShardEvacuation stops moving all data from one shard to the others.
StopShardEvacuation(context.Context, *StopShardEvacuationRequest) (*StopShardEvacuationResponse, error)
// FlushCache moves all data from one shard to the others.
@ -340,6 +354,9 @@ func (UnimplementedControlServiceServer) StartShardEvacuation(context.Context, *
func (UnimplementedControlServiceServer) GetShardEvacuationStatus(context.Context, *GetShardEvacuationStatusRequest) (*GetShardEvacuationStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetShardEvacuationStatus not implemented")
}
func (UnimplementedControlServiceServer) ResetShardEvacuationStatus(context.Context, *ResetShardEvacuationStatusRequest) (*ResetShardEvacuationStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ResetShardEvacuationStatus not implemented")
}
func (UnimplementedControlServiceServer) StopShardEvacuation(context.Context, *StopShardEvacuationRequest) (*StopShardEvacuationResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StopShardEvacuation not implemented")
}
@ -544,6 +561,24 @@ func _ControlService_GetShardEvacuationStatus_Handler(srv interface{}, ctx conte
return interceptor(ctx, in, info, handler)
}
func _ControlService_ResetShardEvacuationStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ResetShardEvacuationStatusRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlServiceServer).ResetShardEvacuationStatus(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlService_ResetShardEvacuationStatus_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).ResetShardEvacuationStatus(ctx, req.(*ResetShardEvacuationStatusRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlService_StopShardEvacuation_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StopShardEvacuationRequest)
if err := dec(in); err != nil {
@ -767,6 +802,10 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
MethodName: "GetShardEvacuationStatus",
Handler: _ControlService_GetShardEvacuationStatus_Handler,
},
{
MethodName: "ResetShardEvacuationStatus",
Handler: _ControlService_ResetShardEvacuationStatus_Handler,
},
{
MethodName: "StopShardEvacuation",
Handler: _ControlService_StopShardEvacuation_Handler,