diff --git a/pkg/local_object_storage/engine/restore.go b/pkg/local_object_storage/engine/restore.go new file mode 100644 index 000000000..605a47580 --- /dev/null +++ b/pkg/local_object_storage/engine/restore.go @@ -0,0 +1,19 @@ +package engine + +import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + +// RestoreShard restores objects from dump to the shard with provided identifier. +// +// Returns an error if shard is not read-only. +func (e *StorageEngine) RestoreShard(id *shard.ID, prm *shard.RestorePrm) error { + e.mtx.RLock() + defer e.mtx.RUnlock() + + sh, ok := e.shards[id.String()] + if !ok { + return errShardNotFound + } + + _, err := sh.Restore(prm) + return err +} diff --git a/pkg/services/control/convert.go b/pkg/services/control/convert.go index c64b80b81..b8a8a616b 100644 --- a/pkg/services/control/convert.go +++ b/pkg/services/control/convert.go @@ -148,3 +148,21 @@ func (w *dumpShardResponseWrapper) FromGRPCMessage(m grpc.Message) error { w.DumpShardResponse = r return nil } + +type restoreShardResponseWrapper struct { + *RestoreShardResponse +} + +func (w *restoreShardResponseWrapper) ToGRPCMessage() grpc.Message { + return w.RestoreShardResponse +} + +func (w *restoreShardResponseWrapper) FromGRPCMessage(m grpc.Message) error { + r, ok := m.(*RestoreShardResponse) + if !ok { + return message.NewUnexpectedMessageType(m, (*RestoreShardResponse)(nil)) + } + + w.RestoreShardResponse = r + return nil +} diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index eb86a36fd..4c23f503c 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -15,6 +15,7 @@ const ( rpcListShards = "ListShards" rpcSetShardMode = "SetShardMode" rpcDumpShard = "DumpShard" + rpcRestoreShard = "RestoreShard" ) // HealthCheck executes ControlService.HealthCheck RPC. @@ -158,3 +159,16 @@ func DumpShard(cli *client.Client, req *DumpShardRequest, opts ...client.CallOpt return wResp.DumpShardResponse, nil } + +// RestoreShard executes ControlService.DumpShard RPC. +func RestoreShard(cli *client.Client, req *RestoreShardRequest, opts ...client.CallOption) (*RestoreShardResponse, error) { + wResp := &restoreShardResponseWrapper{new(RestoreShardResponse)} + wReq := &requestWrapper{m: req} + + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcRestoreShard), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.RestoreShardResponse, nil +} diff --git a/pkg/services/control/server/restore.go b/pkg/services/control/server/restore.go new file mode 100644 index 000000000..ec92f2fe9 --- /dev/null +++ b/pkg/services/control/server/restore.go @@ -0,0 +1,37 @@ +package control + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/services/control" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Server) RestoreShard(_ context.Context, req *control.RestoreShardRequest) (*control.RestoreShardResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID()) + + prm := new(shard.RestorePrm) + prm.WithPath(req.GetBody().GetFilepath()) + prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) + + err = s.s.RestoreShard(shardID, prm) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + resp := new(control.RestoreShardResponse) + resp.SetBody(new(control.RestoreShardResponse_Body)) + + err = SignMessage(s.key, resp) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return resp, nil +} diff --git a/pkg/services/control/service.go b/pkg/services/control/service.go index b57d70c80..d14604914 100644 --- a/pkg/services/control/service.go +++ b/pkg/services/control/service.go @@ -1079,3 +1079,172 @@ func (x *DumpShardResponse) ReadSignedData(buf []byte) ([]byte, error) { func (x *DumpShardResponse) SignedDataSize() int { return x.GetBody().StableSize() } + +// SetShardID sets shard ID for the restore shard request. +func (x *RestoreShardRequest_Body) SetShardID(id []byte) { + x.Shard_ID = id +} + +// SetFilepath sets filepath for the restore shard request. +func (x *RestoreShardRequest_Body) SetFilepath(p string) { + x.Filepath = p +} + +// SetIgnoreErrors sets ignore errors flag for the restore shard request. +func (x *RestoreShardRequest_Body) SetIgnoreErrors(ignore bool) { + x.IgnoreErrors = ignore +} + +const ( + _ = iota + restoreShardReqBodyShardIDFNum + restoreShardReqBodyFilepathFNum + restoreShardReqBodyIgnoreErrorsFNum +) + +// StableMarshal reads binary representation of request body binary format. +// +// If buffer length is less than StableSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same binary format. +func (x *RestoreShardRequest_Body) StableMarshal(buf []byte) ([]byte, error) { + if x == nil { + return []byte{}, nil + } + + if sz := x.StableSize(); len(buf) < sz { + buf = make([]byte, sz) + } + + var ( + offset, n int + err error + ) + + n, err = proto.BytesMarshal(restoreShardReqBodyShardIDFNum, buf, x.Shard_ID) + if err != nil { + return nil, err + } + + offset += n + + n, err = proto.StringMarshal(restoreShardReqBodyFilepathFNum, buf[offset:], x.Filepath) + if err != nil { + return nil, err + } + + offset += n + + _, err = proto.BoolMarshal(restoreShardReqBodyIgnoreErrorsFNum, buf[offset:], x.IgnoreErrors) + if err != nil { + return nil, err + } + + return buf, nil +} + +// StableSize returns binary size of the request body in protobuf binary format. +// +// Structures with the same field values have the same binary size. +func (x *RestoreShardRequest_Body) StableSize() int { + if x == nil { + return 0 + } + + size := 0 + + size += proto.BytesSize(restoreShardReqBodyShardIDFNum, x.Shard_ID) + size += proto.StringSize(restoreShardReqBodyFilepathFNum, x.Filepath) + size += proto.BoolSize(restoreShardReqBodyIgnoreErrorsFNum, x.IgnoreErrors) + + return size +} + +// SetBody sets request body. +func (x *RestoreShardRequest) SetBody(v *RestoreShardRequest_Body) { + if x != nil { + x.Body = v + } +} + +// SetSignature sets body signature of the request. +func (x *RestoreShardRequest) SetSignature(v *Signature) { + if x != nil { + x.Signature = v + } +} + +// ReadSignedData reads signed data from request to buf. +// +// If buffer length is less than x.SignedDataSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same signed data. +func (x *RestoreShardRequest) ReadSignedData(buf []byte) ([]byte, error) { + return x.GetBody().StableMarshal(buf) +} + +// SignedDataSize returns size of the request signed data in bytes. +// +// Structures with the same field values have the same signed data size. +func (x *RestoreShardRequest) SignedDataSize() int { + return x.GetBody().StableSize() +} + +// StableMarshal reads binary representation of the response body in protobuf binary format. +// +// If buffer length is less than x.StableSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same binary format. +func (x *RestoreShardResponse_Body) StableMarshal(buf []byte) ([]byte, error) { + return buf, nil +} + +// StableSize returns binary size of the response body +// in protobuf binary format. +// +// Structures with the same field values have the same binary size. +func (x *RestoreShardResponse_Body) StableSize() int { + return 0 +} + +// SetBody sets response body. +func (x *RestoreShardResponse) SetBody(v *RestoreShardResponse_Body) { + if x != nil { + x.Body = v + } +} + +// SetSignature sets response body signature. +func (x *RestoreShardResponse) SetSignature(v *Signature) { + if x != nil { + x.Signature = v + } +} + +// ReadSignedData reads signed data from response to buf. +// +// If buffer length is less than SignedDataSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same signed data. +func (x *RestoreShardResponse) ReadSignedData(buf []byte) ([]byte, error) { + return x.GetBody().StableMarshal(buf) +} + +// SignedDataSize returns binary size of the signed data. +// +// Structures with the same field values have the same signed data size. +func (x *RestoreShardResponse) SignedDataSize() int { + return x.GetBody().StableSize() +} diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index 5ab9bd963..b6d8e046c 100644 Binary files a/pkg/services/control/service.pb.go and b/pkg/services/control/service.pb.go differ diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 7146047e6..c34b2cf7a 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -28,6 +28,9 @@ service ControlService { // Dump objects from the shard. rpc DumpShard (DumpShardRequest) returns (DumpShardResponse); + + // Restore objects from dump. + rpc RestoreShard (RestoreShardRequest) returns (RestoreShardResponse); } // Health check request. @@ -238,3 +241,38 @@ message DumpShardResponse { // Body signature. Signature signature = 2; } + + +// RestoreShard request. +message RestoreShardRequest { + // Request body structure. + message Body { + // ID of the shard. + bytes shard_ID = 1; + + // Path to the output. + string filepath = 2; + + // Flag indicating whether object read errors should be ignored. + bool ignore_errors = 3; + } + + // Body of restore shard request message. + Body body = 1; + + // Body signature. + Signature signature = 2; +} + +// RestoreShard response. +message RestoreShardResponse { + // Response body structure. + message Body { + } + + // Body of restore shard response message. + Body body = 1; + + // Body signature. + Signature signature = 2; +} diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index cbfdbf7fb..db08a63b5 100644 Binary files a/pkg/services/control/service_grpc.pb.go and b/pkg/services/control/service_grpc.pb.go differ