forked from TrueCloudLab/frostfs-node
[#401] control: Serve DropObjects RPC
Re-compile protobuf definition of Control service. Implement required messages on DropObjects RPC request and response messages. Implement `DropObjects` method on Control service server of the node. Use `StorageEngine.Delete` method as a deleted object handler on server. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
f2337baedc
commit
aa24702ebe
6 changed files with 221 additions and 0 deletions
|
@ -5,7 +5,9 @@ import (
|
|||
"encoding/hex"
|
||||
"net"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
crypto "github.com/nspcc-dev/neofs-crypto"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/control"
|
||||
controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -44,6 +46,13 @@ func initControlService(c *cfg) {
|
|||
controlSvc.WithHealthChecker(c),
|
||||
controlSvc.WithNetMapSource(c.cfgNetmap.wrapper),
|
||||
controlSvc.WithNodeState(c),
|
||||
controlSvc.WithDeletedObjectHandler(func(addrList []*object.Address) error {
|
||||
prm := new(engine.DeletePrm).WithAddresses(addrList...)
|
||||
|
||||
_, err := c.cfgObject.cfgLocalStorage.localStorage.Delete(prm)
|
||||
|
||||
return err
|
||||
}),
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
61
pkg/services/control/server/gc.go
Normal file
61
pkg/services/control/server/gc.go
Normal file
|
@ -0,0 +1,61 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/control"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// DeletedObjectHandler is a handler of objects to be removed.
|
||||
type DeletedObjectHandler func([]*object.Address) error
|
||||
|
||||
// DropObjects marks objects to be removed from the local node.
|
||||
//
|
||||
// Objects are marked via garbage collector's callback.
|
||||
//
|
||||
// If some address is not a valid object address in a binary format, an error returns.
|
||||
// If request is unsigned or signed by disallowed key, permission error returns.
|
||||
func (s *Server) DropObjects(_ context.Context, req *control.DropObjectsRequest) (*control.DropObjectsResponse, error) {
|
||||
// verify request
|
||||
if err := s.isValidRequest(req); err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
binAddrList := req.GetBody().GetAddressList()
|
||||
addrList := make([]*object.Address, 0, len(binAddrList))
|
||||
|
||||
for i := range binAddrList {
|
||||
a := object.NewAddress()
|
||||
|
||||
err := a.Unmarshal(binAddrList[i])
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument,
|
||||
fmt.Sprintf("invalid binary object address: %v", err),
|
||||
)
|
||||
}
|
||||
|
||||
addrList = append(addrList, a)
|
||||
}
|
||||
|
||||
err := s.delObjHandler(addrList)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// create and fill response
|
||||
resp := new(control.DropObjectsResponse)
|
||||
|
||||
body := new(control.DropObjectsResponse_Body)
|
||||
resp.SetBody(body)
|
||||
|
||||
// sign the response
|
||||
if err := SignMessage(s.key, resp); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
|
@ -47,6 +47,8 @@ type cfg struct {
|
|||
netMapSrc netmap.Source
|
||||
|
||||
nodeState NodeState
|
||||
|
||||
delObjHandler DeletedObjectHandler
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
|
@ -103,3 +105,11 @@ func WithNodeState(state NodeState) Option {
|
|||
c.nodeState = state
|
||||
}
|
||||
}
|
||||
|
||||
// WithDeletedObjectHandler returns option to function
|
||||
// which is called on the objects being deleted.
|
||||
func WithDeletedObjectHandler(h DeletedObjectHandler) Option {
|
||||
return func(c *cfg) {
|
||||
c.delObjHandler = h
|
||||
}
|
||||
}
|
||||
|
|
|
@ -453,3 +453,144 @@ func (x *SetNetmapStatusResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
|||
func (x *SetNetmapStatusResponse) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// SetAddressList sets list of objects to be removed in NeoFS API binary format.
|
||||
func (x *DropObjectsRequest_Body) SetAddressList(v [][]byte) {
|
||||
if x != nil {
|
||||
x.AddressList = v
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
_ = iota
|
||||
addrListReqBodyStatusFNum
|
||||
)
|
||||
|
||||
// StableMarshal reads binary representation of "Drop objects" request body
|
||||
// in protobuf binary format.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *DropObjectsRequest_Body) StableMarshal(buf []byte) ([]byte, error) {
|
||||
if x == nil {
|
||||
return []byte{}, nil
|
||||
}
|
||||
|
||||
if sz := x.StableSize(); len(buf) < sz {
|
||||
buf = make([]byte, sz)
|
||||
}
|
||||
|
||||
_, err := proto.RepeatedBytesMarshal(addrListReqBodyStatusFNum, buf, x.AddressList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
// StableSize returns binary size of "Drop objects" response body
|
||||
// in protobuf binary format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *DropObjectsRequest_Body) StableSize() int {
|
||||
if x == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
size := 0
|
||||
|
||||
size += proto.RepeatedBytesSize(addrListReqBodyStatusFNum, x.AddressList)
|
||||
|
||||
return size
|
||||
}
|
||||
|
||||
// SetBody sets body of the set "Drop objects" request.
|
||||
func (x *DropObjectsRequest) SetBody(v *DropObjectsRequest_Body) {
|
||||
if x != nil {
|
||||
x.Body = v
|
||||
}
|
||||
}
|
||||
|
||||
// SetSignature sets signature of the "Drop objects" request body.
|
||||
func (x *DropObjectsRequest) SetSignature(body *Signature) {
|
||||
if x != nil {
|
||||
x.Signature = body
|
||||
}
|
||||
}
|
||||
|
||||
// ReadSignedData reads signed data of "Drop objects" request to buf.
|
||||
//
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *DropObjectsRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().StableMarshal(buf)
|
||||
}
|
||||
|
||||
// SignedDataSize returns binary size of the signed data of "Drop objects" request.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *DropObjectsRequest) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// StableMarshal reads binary representation of "Drop objects" response body
|
||||
// in protobuf binary format.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *DropObjectsResponse_Body) StableMarshal(buf []byte) ([]byte, error) {
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
// StableSize returns binary size of "Drop objects" response body
|
||||
// in protobuf binary format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *DropObjectsResponse_Body) StableSize() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
// SetBody sets set body of the "Drop objects" response.
|
||||
func (x *DropObjectsResponse) SetBody(v *DropObjectsResponse_Body) {
|
||||
if x != nil {
|
||||
x.Body = v
|
||||
}
|
||||
}
|
||||
|
||||
// SetSignature sets signature of the "Drop objects" response body.
|
||||
func (x *DropObjectsResponse) SetSignature(v *Signature) {
|
||||
if x != nil {
|
||||
x.Signature = v
|
||||
}
|
||||
}
|
||||
|
||||
// ReadSignedData reads signed data of "Drop objects" response to buf.
|
||||
//
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *DropObjectsResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().StableMarshal(buf)
|
||||
}
|
||||
|
||||
// SignedDataSize returns binary size of the signed data of "Drop objects" response.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *DropObjectsResponse) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
|
BIN
pkg/services/control/service.pb.go
generated
BIN
pkg/services/control/service.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/types.pb.go
generated
BIN
pkg/services/control/types.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue