From aa24702ebe3a842803c160e66cc11ae9ad3c2da3 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Sun, 21 Feb 2021 11:30:32 +0300 Subject: [PATCH] [#401] control: Serve DropObjects RPC Re-compile protobuf definition of Control service. Implement required messages on DropObjects RPC request and response messages. Implement `DropObjects` method on Control service server of the node. Use `StorageEngine.Delete` method as a deleted object handler on server. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/control.go | 9 ++ pkg/services/control/server/gc.go | 61 +++++++++++ pkg/services/control/server/server.go | 10 ++ pkg/services/control/service.go | 141 ++++++++++++++++++++++++++ pkg/services/control/service.pb.go | Bin 42882 -> 54566 bytes pkg/services/control/types.pb.go | Bin 20076 -> 20076 bytes 6 files changed, 221 insertions(+) create mode 100644 pkg/services/control/server/gc.go diff --git a/cmd/neofs-node/control.go b/cmd/neofs-node/control.go index 477000680..915c09f32 100644 --- a/cmd/neofs-node/control.go +++ b/cmd/neofs-node/control.go @@ -5,7 +5,9 @@ import ( "encoding/hex" "net" + "github.com/nspcc-dev/neofs-api-go/pkg/object" crypto "github.com/nspcc-dev/neofs-crypto" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/control" controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server" "github.com/pkg/errors" @@ -44,6 +46,13 @@ func initControlService(c *cfg) { controlSvc.WithHealthChecker(c), controlSvc.WithNetMapSource(c.cfgNetmap.wrapper), controlSvc.WithNodeState(c), + controlSvc.WithDeletedObjectHandler(func(addrList []*object.Address) error { + prm := new(engine.DeletePrm).WithAddresses(addrList...) + + _, err := c.cfgObject.cfgLocalStorage.localStorage.Delete(prm) + + return err + }), ) var ( diff --git a/pkg/services/control/server/gc.go b/pkg/services/control/server/gc.go new file mode 100644 index 000000000..97751c0a0 --- /dev/null +++ b/pkg/services/control/server/gc.go @@ -0,0 +1,61 @@ +package control + +import ( + "context" + "fmt" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/services/control" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// DeletedObjectHandler is a handler of objects to be removed. +type DeletedObjectHandler func([]*object.Address) error + +// DropObjects marks objects to be removed from the local node. +// +// Objects are marked via garbage collector's callback. +// +// If some address is not a valid object address in a binary format, an error returns. +// If request is unsigned or signed by disallowed key, permission error returns. +func (s *Server) DropObjects(_ context.Context, req *control.DropObjectsRequest) (*control.DropObjectsResponse, error) { + // verify request + if err := s.isValidRequest(req); err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + binAddrList := req.GetBody().GetAddressList() + addrList := make([]*object.Address, 0, len(binAddrList)) + + for i := range binAddrList { + a := object.NewAddress() + + err := a.Unmarshal(binAddrList[i]) + if err != nil { + return nil, status.Error(codes.InvalidArgument, + fmt.Sprintf("invalid binary object address: %v", err), + ) + } + + addrList = append(addrList, a) + } + + err := s.delObjHandler(addrList) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + // create and fill response + resp := new(control.DropObjectsResponse) + + body := new(control.DropObjectsResponse_Body) + resp.SetBody(body) + + // sign the response + if err := SignMessage(s.key, resp); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return resp, nil +} diff --git a/pkg/services/control/server/server.go b/pkg/services/control/server/server.go index fcd87859f..c6f84b1ed 100644 --- a/pkg/services/control/server/server.go +++ b/pkg/services/control/server/server.go @@ -47,6 +47,8 @@ type cfg struct { netMapSrc netmap.Source nodeState NodeState + + delObjHandler DeletedObjectHandler } func defaultCfg() *cfg { @@ -103,3 +105,11 @@ func WithNodeState(state NodeState) Option { c.nodeState = state } } + +// WithDeletedObjectHandler returns option to function +// which is called on the objects being deleted. +func WithDeletedObjectHandler(h DeletedObjectHandler) Option { + return func(c *cfg) { + c.delObjHandler = h + } +} diff --git a/pkg/services/control/service.go b/pkg/services/control/service.go index 083dc4809..66ef33b24 100644 --- a/pkg/services/control/service.go +++ b/pkg/services/control/service.go @@ -453,3 +453,144 @@ func (x *SetNetmapStatusResponse) ReadSignedData(buf []byte) ([]byte, error) { func (x *SetNetmapStatusResponse) SignedDataSize() int { return x.GetBody().StableSize() } + +// SetAddressList sets list of objects to be removed in NeoFS API binary format. +func (x *DropObjectsRequest_Body) SetAddressList(v [][]byte) { + if x != nil { + x.AddressList = v + } +} + +const ( + _ = iota + addrListReqBodyStatusFNum +) + +// StableMarshal reads binary representation of "Drop objects" request body +// in protobuf binary format. +// +// If buffer length is less than x.StableSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same binary format. +func (x *DropObjectsRequest_Body) StableMarshal(buf []byte) ([]byte, error) { + if x == nil { + return []byte{}, nil + } + + if sz := x.StableSize(); len(buf) < sz { + buf = make([]byte, sz) + } + + _, err := proto.RepeatedBytesMarshal(addrListReqBodyStatusFNum, buf, x.AddressList) + if err != nil { + return nil, err + } + + return buf, nil +} + +// StableSize returns binary size of "Drop objects" response body +// in protobuf binary format. +// +// Structures with the same field values have the same binary size. +func (x *DropObjectsRequest_Body) StableSize() int { + if x == nil { + return 0 + } + + size := 0 + + size += proto.RepeatedBytesSize(addrListReqBodyStatusFNum, x.AddressList) + + return size +} + +// SetBody sets body of the set "Drop objects" request. +func (x *DropObjectsRequest) SetBody(v *DropObjectsRequest_Body) { + if x != nil { + x.Body = v + } +} + +// SetSignature sets signature of the "Drop objects" request body. +func (x *DropObjectsRequest) SetSignature(body *Signature) { + if x != nil { + x.Signature = body + } +} + +// ReadSignedData reads signed data of "Drop objects" request to buf. +// +// If buffer length is less than x.SignedDataSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same signed data. +func (x *DropObjectsRequest) ReadSignedData(buf []byte) ([]byte, error) { + return x.GetBody().StableMarshal(buf) +} + +// SignedDataSize returns binary size of the signed data of "Drop objects" request. +// +// Structures with the same field values have the same signed data size. +func (x *DropObjectsRequest) SignedDataSize() int { + return x.GetBody().StableSize() +} + +// StableMarshal reads binary representation of "Drop objects" response body +// in protobuf binary format. +// +// If buffer length is less than x.StableSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same binary format. +func (x *DropObjectsResponse_Body) StableMarshal(buf []byte) ([]byte, error) { + return buf, nil +} + +// StableSize returns binary size of "Drop objects" response body +// in protobuf binary format. +// +// Structures with the same field values have the same binary size. +func (x *DropObjectsResponse_Body) StableSize() int { + return 0 +} + +// SetBody sets set body of the "Drop objects" response. +func (x *DropObjectsResponse) SetBody(v *DropObjectsResponse_Body) { + if x != nil { + x.Body = v + } +} + +// SetSignature sets signature of the "Drop objects" response body. +func (x *DropObjectsResponse) SetSignature(v *Signature) { + if x != nil { + x.Signature = v + } +} + +// ReadSignedData reads signed data of "Drop objects" response to buf. +// +// If buffer length is less than x.SignedDataSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same signed data. +func (x *DropObjectsResponse) ReadSignedData(buf []byte) ([]byte, error) { + return x.GetBody().StableMarshal(buf) +} + +// SignedDataSize returns binary size of the signed data of "Drop objects" response. +// +// Structures with the same field values have the same signed data size. +func (x *DropObjectsResponse) SignedDataSize() int { + return x.GetBody().StableSize() +} diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index c5758b83c0850b2d33898072e9967d2ecb1a7c70..9456b1fde8bb27de236c9f43ff636f2a7773d2ce 100644 GIT binary patch delta 4536 zcmb_gYj6|S6&}f0wq(FIeg$LfwXv}!%SO9eNwy5G4LCgI7pWU;f-Oty7q-`uR(@a} zNn0>!332%ffwT#aN$E@}O&E2iO@~nU(X>paflm9cp@EswPSaPI^fA*+OYgla#k+|h z%}o7ceeZdn@0@dYufBNGc;P!K^G_IDmp@-_wRcLPs1#8+MdpHGd4f}hBu?%hkpfDj z)}qWzNZdw3ZetYOE_yzN2*AH+dBy@cg1=<_+_JF8)5V24_A{yd_=QcR2{a#sbU8fY8kjEnuMI`)Lz*)O$o`F+bpwIrI$gN62yUnVS>phPEED={y#Bi-@o6cW* zQEzO<`lJD8+!A(~mtp>p39Z%lkzaQYmFaY!8f&PzkOIfLMfg#JA4fBDFds6Zm;Zp! z?!`RLp%mO%pNf%YKmOw?0_XV$+5Zk_a`R$Y^%soD+?|oXx;(f(4X;%f;6X_~xQ%Hz=t;#D zPq&d5kiGfuRHC8A8nbLXVl+63`Yhy4Y3SoO5d8wOK4);ovNwOUx>N>9H|+QVP~u%*6Qc zIM*u4kMH1`+qZE2!{h$&3^yQ$$NUQ6H>;Y=Ue8lKPxa3zD9y;(z}v&pgnisUCasGb z?;RyR?IRI+JVBvK_KC2p$j&m3GQ4G4S@dP-C%V~ItZkitHw_y^0!&cX0?pMOx_!uF z``A85MOg&D?zcMDni`jGFV}=Oxb2+d5$(xf@1?WMhFXILnV|$8x5|n$G#uQ?%(s3t z(Vtqf2laEf)3R6>S!qLwZr|E4WZ)rJ=!dg|@6_UTc2gc{&H_DY!r-DfF?eEcZ0TNszBW^b-$S68cJS=!r%zt6Tg;AWQKTD>K%edbwsz2`$kuL^&J^4( zCgWr>BC_`s;H+<{(PGBM?VI51dWh~kEWmrY5po)8%e`+J@#}Gtc&J?>pd-odi*uGoNDkLxnB@8={f=|dPZ@x z2-RmibC#P&&5b=>YW}KLJ%cLF7SYsIaF7WJb=0V8n#4fY{r{623WRD-On4y52?x}K zz|7Urj@FlrITjxQGEpEHGpLi-jmK!foK5OaB%LgAho9p2%+77_wJ*hiE()-<@!@q; zUC?I|4{@xpbXdf5-DVsJ6w`v-+`a-$9gS#eHKF}U3qnui;dY+|*LG%LOHU>#D-)aU zX~ui4#rS8p6^^!aydSJ735ypjB1(N$^hu?;EM1n1nwLl_gAjPAU3vK7U?zUG^CT17KeU*v zU(ZwRxI{G?HI4yX8L2>{--cIro0vDXisZepZ>R*HIa5^P46EpK-N$m&h9Qn@gAng4{{weAJPb_Pu63fX>b`lYImzub>gm+gXyk? zMq(nL^#JG_DO3?=f$;%~@FQB2`CwH{);Q!yVXc`Vs;%iJIHPNiFPAnNt9N`r9F*|` zEkixI$V3yyW1~J?+_ntg-CBh+qm>3O_OsDlMw8oWne>OzTxE^zkZ&7wC-TyXe9X(H zYPXjQ$m2>_9<9|KvFycRF<03ZsJK6oU&)a_^3L@a8G*7L@6|!{FWg)Vrmn_i+5kW(! zNZ`eX_-A3)()XP{_6UxIit>3^GBrF1dyQ@{=A*f2i`a}FFRsrN;CRGlsKXnP^@jT6 z$_`_l9bc2nFr~N*Zd_7)c!kJDv}h5`5Bd zZ>pZebSG@30TlD3-H9$QB+E5|38l=vP}9UGe;hJiO^0_T)5YABSVRsSYCI|a`1MqI zozAw}4j(g~v-r3YVfrgKk;-zV)nVbTYLa?Efe-TQ(N+IAJn=B_XY4FdK znB$rZp>kZB5sZQZcW1hc0*{W_D){$`MnS;kSu3vXtu>QGsa=7b3N!kzxW2swD|Jc4 zM}ZWLX^2Fl0!6bT`7-I4?Ip-xp4~~^Gp>i)YtiU?<)|_dReIUAu%=Cb|&a>NabM7w8U#0)mi^kQ%Kc#nr z%SX0g&6ntv!G!nb-Zb*gI4PSntBG6;OduAFyhzD9hqvc`Y2;m>AYVRw9;rwFY;eau zI9g-!(NBJ#KfEV#fp>P`I+Pb zp*iLvkWZ9rzbmhw&Wo+W$weqUq5rCVh^{WDb0CCx+b0X;0GF_0!EMX#9r$`av{HHTZn=i-1-$f^#Qw@%tA&n#^K<`sQppuAa}+ z&~W$sDP6YPU*4zNkDVLTiXVAprcC3zDxgf;>CXwZdgt$+SsNSt)=vz$aUn-%uKGfm K*4&D35B?V^x#8;o delta 1339 zcmaJ>YfM{Z7|uC(+8&@#ngWA?ZQTMX6x!3v=_!uVt=n|n%3uyPvRQ_<1Ot;~7a`v8 z0&Ih9(x*T-N{PlOgH0gEc$q&OF=m?RhRebq1I`P^A8zr7KQz(k_nmgx{NtaV?|q)< zy*zKfeDBO=y?Tv(V?X0uY}ss)SKN;?__#WP1H2y31%ff37GaLVN?;SNsq_$~C^por zqf(=n;hoFEuR;WW`Aac>Fc9<2qtQt@E!y%_SQzEx3k^+KsP#k;+O5NHx@^1<_yb94 z9n*$FEN|tI@LJ@l9lb1k^;A?&3!%HD2+Kh=ntU9LzG8~YO}_06$1XAUkjSg@Ko z-)e};5AGge@%Od}dW=PMY}hLbaI=iVw~e`YuQ85e+t<-*E|Tk;K44MwFclWW&Z@Bd zh+64ruP7$xH!-@4>M|6E>97TpEyMEOwtX8-PKVWG@0FbN*RGuHhm>=~n$6pLT_}iH zV0+CBTRV@9C;428W+d&%@9-e+>3of#WD4zQ9^8VN*Y~2o%K-nNj(G~g@e($h$6~*k z5LWm*R3sypLf;K~VID9*>ej>A<;T!KKBVLNJL)HoXkyQpad4oJBH=DKDY)VP1ZLbG zwB}GBX$Mg}+ZWEsj20z!(0@{bKcdFfp*%DU(XK^U=&OY8MKdQHqKf7dA>2Ho%+TCf zh<}Fcq-d#unik!3FD}KYQ?#1!^Qlmph)}|#tk7d8#6!dP5@M9nqKgnS!y#pVX{{p) zrn?rlkxF7ver)6b%Q`IhCSD`|aC()&K2DDvC}cz|jTPg}sGISS)BX5y)W%A62u+lb zyT`#9v$I|rR$wGng;x?~${jH#)L0|&Pl-+V0(QoMKheNA;f-%YD{?RxmvEkZzl`r< zMJr5a9)LLJCbF)v5NUL2>lH66)#y$6mZ?`eQI3Nq*d;_;4kCVyf-Zs zDQd!&i90e+8c0r5YsjKVF=FKVq!G6#yqQA83I!SyRz|?{iFPv4T4KMNESIPSTplyx z;yI6oY?H?6;j$+^Jo$4dpbWPehbJreObVHd7BnU}EKNFCLBz|c667W8N&S~nrHCd2 ztRN|%n9|`!vX&Rp(+l2=G#4kPT=;#m6f09ZNg6F6ru}dFCUr!{Pzn2veoUZ${$USXfQmxhVt`4^3~$< z+;wc74dUwC=_hAq@d`TUXiQ%fmHgFQ0fVx4(+|X+`LFTm#UP341E50xLL43PEyf|Y kykDb|U7wu2PtI8?W#tb)vt`Mbu0%BQ!u8J?EZ=zaUy={kRsaA1 diff --git a/pkg/services/control/types.pb.go b/pkg/services/control/types.pb.go index efdc3b34ef4fc896e13852f8af79128969b93631..528de009a7aa39ef0f954b66d4c8ba52d0844625 100644 GIT binary patch delta 14 VcmaDehw;rE#t9*erW-@W{Qxmd1(*N; delta 14 VcmaDehw;rE#t9*e#v4P${QxmT1(pB+