From 18cfd8b04243646a100fe54d9766d576b9a4926c Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 25 Jan 2022 14:54:35 +0300 Subject: [PATCH] [#1086] services/control: implement `RestoreShard` RPC Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/engine/restore.go | 19 +++ pkg/services/control/convert.go | 18 +++ pkg/services/control/rpc.go | 14 ++ pkg/services/control/server/restore.go | 37 +++++ pkg/services/control/service.go | 169 +++++++++++++++++++++ pkg/services/control/service.pb.go | Bin 77800 -> 88777 bytes pkg/services/control/service.proto | 38 +++++ pkg/services/control/service_grpc.pb.go | Bin 13424 -> 14980 bytes 8 files changed, 295 insertions(+) create mode 100644 pkg/local_object_storage/engine/restore.go create mode 100644 pkg/services/control/server/restore.go diff --git a/pkg/local_object_storage/engine/restore.go b/pkg/local_object_storage/engine/restore.go new file mode 100644 index 000000000..605a47580 --- /dev/null +++ b/pkg/local_object_storage/engine/restore.go @@ -0,0 +1,19 @@ +package engine + +import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + +// RestoreShard restores objects from dump to the shard with provided identifier. +// +// Returns an error if shard is not read-only. +func (e *StorageEngine) RestoreShard(id *shard.ID, prm *shard.RestorePrm) error { + e.mtx.RLock() + defer e.mtx.RUnlock() + + sh, ok := e.shards[id.String()] + if !ok { + return errShardNotFound + } + + _, err := sh.Restore(prm) + return err +} diff --git a/pkg/services/control/convert.go b/pkg/services/control/convert.go index c64b80b81..b8a8a616b 100644 --- a/pkg/services/control/convert.go +++ b/pkg/services/control/convert.go @@ -148,3 +148,21 @@ func (w *dumpShardResponseWrapper) FromGRPCMessage(m grpc.Message) error { w.DumpShardResponse = r return nil } + +type restoreShardResponseWrapper struct { + *RestoreShardResponse +} + +func (w *restoreShardResponseWrapper) ToGRPCMessage() grpc.Message { + return w.RestoreShardResponse +} + +func (w *restoreShardResponseWrapper) FromGRPCMessage(m grpc.Message) error { + r, ok := m.(*RestoreShardResponse) + if !ok { + return message.NewUnexpectedMessageType(m, (*RestoreShardResponse)(nil)) + } + + w.RestoreShardResponse = r + return nil +} diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index eb86a36fd..4c23f503c 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -15,6 +15,7 @@ const ( rpcListShards = "ListShards" rpcSetShardMode = "SetShardMode" rpcDumpShard = "DumpShard" + rpcRestoreShard = "RestoreShard" ) // HealthCheck executes ControlService.HealthCheck RPC. @@ -158,3 +159,16 @@ func DumpShard(cli *client.Client, req *DumpShardRequest, opts ...client.CallOpt return wResp.DumpShardResponse, nil } + +// RestoreShard executes ControlService.DumpShard RPC. +func RestoreShard(cli *client.Client, req *RestoreShardRequest, opts ...client.CallOption) (*RestoreShardResponse, error) { + wResp := &restoreShardResponseWrapper{new(RestoreShardResponse)} + wReq := &requestWrapper{m: req} + + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcRestoreShard), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.RestoreShardResponse, nil +} diff --git a/pkg/services/control/server/restore.go b/pkg/services/control/server/restore.go new file mode 100644 index 000000000..ec92f2fe9 --- /dev/null +++ b/pkg/services/control/server/restore.go @@ -0,0 +1,37 @@ +package control + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/services/control" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Server) RestoreShard(_ context.Context, req *control.RestoreShardRequest) (*control.RestoreShardResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID()) + + prm := new(shard.RestorePrm) + prm.WithPath(req.GetBody().GetFilepath()) + prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) + + err = s.s.RestoreShard(shardID, prm) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + resp := new(control.RestoreShardResponse) + resp.SetBody(new(control.RestoreShardResponse_Body)) + + err = SignMessage(s.key, resp) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return resp, nil +} diff --git a/pkg/services/control/service.go b/pkg/services/control/service.go index b57d70c80..d14604914 100644 --- a/pkg/services/control/service.go +++ b/pkg/services/control/service.go @@ -1079,3 +1079,172 @@ func (x *DumpShardResponse) ReadSignedData(buf []byte) ([]byte, error) { func (x *DumpShardResponse) SignedDataSize() int { return x.GetBody().StableSize() } + +// SetShardID sets shard ID for the restore shard request. +func (x *RestoreShardRequest_Body) SetShardID(id []byte) { + x.Shard_ID = id +} + +// SetFilepath sets filepath for the restore shard request. +func (x *RestoreShardRequest_Body) SetFilepath(p string) { + x.Filepath = p +} + +// SetIgnoreErrors sets ignore errors flag for the restore shard request. +func (x *RestoreShardRequest_Body) SetIgnoreErrors(ignore bool) { + x.IgnoreErrors = ignore +} + +const ( + _ = iota + restoreShardReqBodyShardIDFNum + restoreShardReqBodyFilepathFNum + restoreShardReqBodyIgnoreErrorsFNum +) + +// StableMarshal reads binary representation of request body binary format. +// +// If buffer length is less than StableSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same binary format. +func (x *RestoreShardRequest_Body) StableMarshal(buf []byte) ([]byte, error) { + if x == nil { + return []byte{}, nil + } + + if sz := x.StableSize(); len(buf) < sz { + buf = make([]byte, sz) + } + + var ( + offset, n int + err error + ) + + n, err = proto.BytesMarshal(restoreShardReqBodyShardIDFNum, buf, x.Shard_ID) + if err != nil { + return nil, err + } + + offset += n + + n, err = proto.StringMarshal(restoreShardReqBodyFilepathFNum, buf[offset:], x.Filepath) + if err != nil { + return nil, err + } + + offset += n + + _, err = proto.BoolMarshal(restoreShardReqBodyIgnoreErrorsFNum, buf[offset:], x.IgnoreErrors) + if err != nil { + return nil, err + } + + return buf, nil +} + +// StableSize returns binary size of the request body in protobuf binary format. +// +// Structures with the same field values have the same binary size. +func (x *RestoreShardRequest_Body) StableSize() int { + if x == nil { + return 0 + } + + size := 0 + + size += proto.BytesSize(restoreShardReqBodyShardIDFNum, x.Shard_ID) + size += proto.StringSize(restoreShardReqBodyFilepathFNum, x.Filepath) + size += proto.BoolSize(restoreShardReqBodyIgnoreErrorsFNum, x.IgnoreErrors) + + return size +} + +// SetBody sets request body. +func (x *RestoreShardRequest) SetBody(v *RestoreShardRequest_Body) { + if x != nil { + x.Body = v + } +} + +// SetSignature sets body signature of the request. +func (x *RestoreShardRequest) SetSignature(v *Signature) { + if x != nil { + x.Signature = v + } +} + +// ReadSignedData reads signed data from request to buf. +// +// If buffer length is less than x.SignedDataSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same signed data. +func (x *RestoreShardRequest) ReadSignedData(buf []byte) ([]byte, error) { + return x.GetBody().StableMarshal(buf) +} + +// SignedDataSize returns size of the request signed data in bytes. +// +// Structures with the same field values have the same signed data size. +func (x *RestoreShardRequest) SignedDataSize() int { + return x.GetBody().StableSize() +} + +// StableMarshal reads binary representation of the response body in protobuf binary format. +// +// If buffer length is less than x.StableSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same binary format. +func (x *RestoreShardResponse_Body) StableMarshal(buf []byte) ([]byte, error) { + return buf, nil +} + +// StableSize returns binary size of the response body +// in protobuf binary format. +// +// Structures with the same field values have the same binary size. +func (x *RestoreShardResponse_Body) StableSize() int { + return 0 +} + +// SetBody sets response body. +func (x *RestoreShardResponse) SetBody(v *RestoreShardResponse_Body) { + if x != nil { + x.Body = v + } +} + +// SetSignature sets response body signature. +func (x *RestoreShardResponse) SetSignature(v *Signature) { + if x != nil { + x.Signature = v + } +} + +// ReadSignedData reads signed data from response to buf. +// +// If buffer length is less than SignedDataSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same signed data. +func (x *RestoreShardResponse) ReadSignedData(buf []byte) ([]byte, error) { + return x.GetBody().StableMarshal(buf) +} + +// SignedDataSize returns binary size of the signed data. +// +// Structures with the same field values have the same signed data size. +func (x *RestoreShardResponse) SignedDataSize() int { + return x.GetBody().StableSize() +} diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index 5ab9bd96308139183d74f2e9a331998132b7f26a..b6d8e046c1d007db11bf93c37dabcd2058f6c1b3 100644 GIT binary patch delta 3850 zcma)8dvKFg7N7H_Nt!Q7n^GvWkTz|RwxK|hFG(LjTVEpreWAP+S|J7HRai>#Y|f|#`jw!Xxr$Eg9E}uz(_e3CFR$Vxu``QHRoX+ z9(pvOc2vX?nyXsW*7@&BSoe4U)&+KqUwJ1Um}kTL3;UsVUK;;j1k17(w9dETjRZ5S zH5tf$Sgc!7@ZwzctHo<1G(S;+_bTn!yXsEVCB|(|;XrG**ilxs920BL!sZY*z1m#e zDB+_z;T(glzq1Cb)g=m9R2#?xguD=Ao)SSqHmlZ}BN7T$1>jm~Mal9GE=Qno^%bzmanxS zDMyb}&*;IKQx88oO~Uh=gkwLr)))o2joiO7-@sK1?5k5zHb%(v(7CxqUB31u9bSJq zfIK7wcRq1*6lS-Xak-u$=8rAv!i{zvW^NCtKW?%S4(w>b!i`aAoM1rNW`QpgjT_B4 zG*0M7wcy41I?BdA`&=|_H{;U=f%HyHoMJ{kmse~7UyYgnEt*PgaJ(GE4C^$rn%^)) zM>!%;ZQ5=mE^1z8oSe#2@6f2QPK)8Pk@qa1S27*0TI_ydXxPxHCtom-+WeA2zYO-4 zSiJX)EzAwBig}Xa67_odP3a}aaL#{6O1B2utV4urdmUO6e0IBCZ(7hcki zcw8!!a!vbDtD->=GqvnjOba;g^AP=c2TuRn z-UIvB4WZAH#OC&Wr{EvwGyfmv+xMo!axtyHD488Z;R}}ca4b0U-T*vsF*7gt>1CL-n<>I>nQ_sg##Ka4kcpD7lREG!SyTlf3Y?-@(uB zvi322#P_u4Qx;m=r*gUQ`V^DG`zg94xE+G?olcWGD`?^LXlxsi+m*m(MeyC$UpET+ zCB=iH8^s$t4A^;Ns?cuUDCJju(al*x*?qHA^0+`9L9M)_)a2o}jVR*c#HXOSBRf=9 zv?2usox{=8WkgkHsoVod5$zF(hDY*GgoyU??j&wnLkVo1iDI$qs!>#{>?&c=q7@%> zm4wn}iEhOO8<6^9%|P+|)$Hyp=Yh>0DzAh)S4MM@IHfqRGM%FHtFF zvjq~}C)6g13Mq$0>1Zdb=b1DX6|Z*I*!l=^nR?=Vc=9I)I~PIjNPd%uNWUO^e55u^O7M(mXNGOilZUyD zypP3^yxe)Na$@TvDV^PAATK*@q`}N&A{*Oj;OEX8$<+lW?%8D^H(Mvv2}T;tevG6< z=8#EdyN#rt(D21+E;^QlUHME zrq`eA^Rl0~BReWnl!f2ZoWPk%cCObi%+6jGC0khH*jO{g^=+2flV%#g2UqyL{Z}&~ zCPYC;0)5Hsi8MZBNg6``V)rD`c$R0UQ>@w{Br935{)&awCDH-5Et#A4I>?4Po1RS_ zN}sS#QfPuOeZ#J&a_K@cmr`%jB=JnPun|rkSu~7CF20Q?D}KWK_8Y?}hJ<>WZB6G3 zO=lPqQRM(DXRN@Y6#}%TfBWB-OKWZ zQ!^_V$X&Q(W|f(=jTJa}n-o{jA?)&HB}3^8{$A>1CCN!Um~8}I6wkR^*RGGP9g{t~b=}Ku-PYT7-PVwe2or?8d2Ip%qJ<%{%?u()U_}(i zW577bfw=KdNCrljxq%sFP-Bpgs3H0XhUfrIjUgnA7(*lhJm0$^tiWY23k8USkqy{OIuR#tVzgkV(QYU zvTyap7`(S4gr&wQ=y*$yk$!P_1Q)y(#eJ(S*pLv%xK0`CYL3Bat3uem$%MRCn~D@NWn;aHao&peeMcdbsDF>^5PTg12_-@M8bp`0jgA z<@Ky)C1ck z99STnlc>E3;_y=cOk751P}w6+CE*ucrC48O#_pOp?ZA_+kaG6$FA|;~2q_bT7CchVpL zvXC3VOWo;E2R`wdN|t<%cOxA<;HRbnNTp@@5J#>I2FeAHO&eJzBaeU@3;A&=0|L}0 z)QlA170zVqa0ZxZXC7k@GQlNztH^1Gmn2V+Zn-%7W;=Hy)36<;N!}pcE`nrwC@>%8 zXTc)Qyg3UN#(SA`(Thq%CZ**Oy^#y)^ko*)B)DQuX-hUVa#cTM z!xEvo9ncIFw8a7KLLFySVus`#O&`dC=6JbEOXd+gImzOLO2e}Ps5cFAsoM#~>4Oe!*)NeU-;$-EeYQ;WBor=0S<<$8w>GIlrF^3;ye{LU5l+ zYx1Dj;M0OsJ~s}IqkIllPIvQRu~1(rfM%)@6FgqPODJ6U z&8=4#PG<8~kSzr=Xta>~nN|n}sdAD{6Lq-27w^$nu6o{ZnGH`{^m%yq<`}#hXFkDC zrn+lAJXZs+IgiMv?$y(Ux>^J!daW}bH5NdcwpN+N;N@w5UBW;|F;r8eggJ{8!wkt6 z(BLdL)JnbzO=&6SA68RXGzH?P~tVs6J>0@Eqh&Hn8kFw(vfaM4mXARi;K&wA6c?hWloN4yzuB+N zda+*)`st|5OKp?k1+!mmeX-9U^viTi<_(=KL-KR5&o5vH^~qrVA6(8UgwyC<8PG<( zJ`jJrINH+;-%{I5m@6R5!CCMVg==9B*=NIVbn0bIQvXV{WD$Hw$7aE13RZzl{U)g1 zk7u(tLn-`wiKEC|xI*qZ?D()=T$>>1sIM8mr?$Boc8fl!W2vr*sq-|#k2F%RsW<6H z155X+VaonNwX!QDH8FCu1#VMuGfPWe;(ISX(R6Dbr&-XVVd}?rlt<8 diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 7146047e6..c34b2cf7a 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -28,6 +28,9 @@ service ControlService { // Dump objects from the shard. rpc DumpShard (DumpShardRequest) returns (DumpShardResponse); + + // Restore objects from dump. + rpc RestoreShard (RestoreShardRequest) returns (RestoreShardResponse); } // Health check request. @@ -238,3 +241,38 @@ message DumpShardResponse { // Body signature. Signature signature = 2; } + + +// RestoreShard request. +message RestoreShardRequest { + // Request body structure. + message Body { + // ID of the shard. + bytes shard_ID = 1; + + // Path to the output. + string filepath = 2; + + // Flag indicating whether object read errors should be ignored. + bool ignore_errors = 3; + } + + // Body of restore shard request message. + Body body = 1; + + // Body signature. + Signature signature = 2; +} + +// RestoreShard response. +message RestoreShardResponse { + // Response body structure. + message Body { + } + + // Body of restore shard response message. + Body body = 1; + + // Body signature. + Signature signature = 2; +} diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index cbfdbf7fb8b68dcceb04d80df5ef5029bc9317bd..db08a63b5a10168bcab00a938dd0a3cf79f77f1f 100644 GIT binary patch delta 462 zcmey6(NenMDEs6OA}q>5sl_GvMX3t;Nm;4MCB+J9Mftf3DW$mudR&}PxykYD^6W5n zoPZ{reNkF;^AYwA&dK{Z5ejfCmYn>7hnoj#P;f?KQOe{-ej8MQ0#5(UK7tJ_M7wRF zw1gb8kwK}2r9dxfDrjiIoD6d2Y0(n4$>$^yZYN4#0IK8Sfd<*jJQAMlpe%|ib3kD_ zs=zt98dQNmope+IL#+yVWQS`cmsIE|6r~pG!Ck)jfxZ>XWPc-JRGk|QmEcjpYLX2A DM9ivY delta 36 scmZoE{gAQYDEsCJj&{z?O2Q2+n?)sx**3?^KN8-2!}KTfWId}e02aayX#fBK