[#11] services/container: Implement Neo:Morph executor and service

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-08-24 17:07:08 +03:00 committed by Alex Vanin
parent 9e30a87022
commit 8539f5c2cd
6 changed files with 458 additions and 3 deletions

View file

@ -0,0 +1,112 @@
package container
import (
"context"
"github.com/nspcc-dev/neofs-api-go/v2/container"
"github.com/nspcc-dev/neofs-api-go/v2/session"
"github.com/pkg/errors"
)
type ServiceExecutor interface {
Put(context.Context, *container.PutRequestBody) (*container.PutResponseBody, error)
Delete(context.Context, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
SetExtendedACL(context.Context, *container.SetExtendedACLRequestBody) (*container.SetExtendedACLResponseBody, error)
GetExtendedACL(context.Context, *container.GetExtendedACLRequestBody) (*container.GetExtendedACLResponseBody, error)
}
type executorSvc struct {
exec ServiceExecutor
metaHeader *session.ResponseMetaHeader
}
// NewExecutionService wraps ServiceExecutor and returns Container Service interface.
//
// Passed meta header is attached to all responses.
func NewExecutionService(exec ServiceExecutor, metaHdr *session.ResponseMetaHeader) container.Service {
return &executorSvc{
exec: exec,
metaHeader: metaHdr,
}
}
func (s *executorSvc) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
respBody, err := s.exec.Put(ctx, req.GetBody())
if err != nil {
return nil, errors.Wrap(err, "could not execute Put request")
}
resp := new(container.PutResponse)
resp.SetBody(respBody)
resp.SetMetaHeader(s.metaHeader)
return resp, nil
}
func (s *executorSvc) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
respBody, err := s.exec.Delete(ctx, req.GetBody())
if err != nil {
return nil, errors.Wrap(err, "could not execute Delete request")
}
resp := new(container.DeleteResponse)
resp.SetBody(respBody)
resp.SetMetaHeader(s.metaHeader)
return resp, nil
}
func (s *executorSvc) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
respBody, err := s.exec.Get(ctx, req.GetBody())
if err != nil {
return nil, errors.Wrap(err, "could not execute Get request")
}
resp := new(container.GetResponse)
resp.SetBody(respBody)
resp.SetMetaHeader(s.metaHeader)
return resp, nil
}
func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
respBody, err := s.exec.List(ctx, req.GetBody())
if err != nil {
return nil, errors.Wrap(err, "could not execute List request")
}
resp := new(container.ListResponse)
resp.SetBody(respBody)
resp.SetMetaHeader(s.metaHeader)
return resp, nil
}
func (s *executorSvc) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
respBody, err := s.exec.SetExtendedACL(ctx, req.GetBody())
if err != nil {
return nil, errors.Wrap(err, "could not execute SetEACL request")
}
resp := new(container.SetExtendedACLResponse)
resp.SetBody(respBody)
resp.SetMetaHeader(s.metaHeader)
return resp, nil
}
func (s *executorSvc) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
respBody, err := s.exec.GetExtendedACL(ctx, req.GetBody())
if err != nil {
return nil, errors.Wrap(err, "could not execute GetEACL request")
}
resp := new(container.GetExtendedACLResponse)
resp.SetBody(respBody)
resp.SetMetaHeader(s.metaHeader)
return resp, nil
}

View file

@ -0,0 +1,179 @@
package container
import (
"context"
"crypto/sha256"
"github.com/nspcc-dev/neofs-api-go/v2/acl"
aclGRPC "github.com/nspcc-dev/neofs-api-go/v2/acl/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/container"
container2 "github.com/nspcc-dev/neofs-api-go/v2/container/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
containerMorph "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
containerSvc "github.com/nspcc-dev/neofs-node/pkg/services/container"
"github.com/pkg/errors"
)
type morphExecutor struct {
// TODO: use client wrapper
client *containerMorph.Client
}
func NewExecutor(client *containerMorph.Client) containerSvc.ServiceExecutor {
return &morphExecutor{
client: client,
}
}
func (s *morphExecutor) Put(ctx context.Context, body *container.PutRequestBody) (*container.PutResponseBody, error) {
cnr := body.GetContainer()
// marshal owner ID of the container
ownerBytes, err := cnr.GetOwnerID().StableMarshal(nil)
if err != nil {
return nil, errors.Wrap(err, "could not marshal owner ID")
}
// marshal the container
cnrBytes, err := cnr.StableMarshal(nil)
if err != nil {
return nil, errors.Wrap(err, "could not marshal the container")
}
args := containerMorph.PutArgs{}
args.SetOwnerID(ownerBytes)
args.SetContainer(cnrBytes)
args.SetSignature(body.GetSignature().GetSign())
if err := s.client.Put(args); err != nil {
return nil, errors.Wrap(err, "could not call Put method")
}
res := new(container.PutResponseBody)
// FIXME: implement and use CID calculation
cidBytes := sha256.Sum256(cnrBytes)
cid := new(refs.ContainerID)
cid.SetValue(cidBytes[:])
res.SetContainerID(cid)
return res, nil
}
func (s *morphExecutor) Delete(ctx context.Context, body *container.DeleteRequestBody) (*container.DeleteResponseBody, error) {
// marshal container identifier
cidBytes, err := body.GetContainerID().StableMarshal(nil)
if err != nil {
return nil, errors.Wrap(err, "could not marshal container ID")
}
args := containerMorph.DeleteArgs{}
args.SetCID(cidBytes)
args.SetSignature(body.GetSignature().GetSign())
if err := s.client.Delete(args); err != nil {
return nil, errors.Wrap(err, "could not call Delete method")
}
return new(container.DeleteResponseBody), nil
}
func (s *morphExecutor) Get(ctx context.Context, body *container.GetRequestBody) (*container.GetResponseBody, error) {
args := containerMorph.GetArgs{}
args.SetCID(body.GetContainerID().GetValue())
val, err := s.client.Get(args)
if err != nil {
return nil, errors.Wrap(err, "could not call Get method")
}
// FIXME: implement and use stable unmarshaler
cnrGRPC := new(container2.Container)
if err := cnrGRPC.Unmarshal(val.Container()); err != nil {
return nil, errors.Wrap(err, "could not unmarshal the container")
}
res := new(container.GetResponseBody)
res.SetContainer(container.ContainerFromGRPCMessage(cnrGRPC))
return res, nil
}
func (s *morphExecutor) List(ctx context.Context, body *container.ListRequestBody) (*container.ListResponseBody, error) {
// marshal owner ID
ownerBytes, err := body.GetOwnerID().StableMarshal(nil)
if err != nil {
return nil, errors.Wrap(err, "could not marshal owner ID")
}
args := containerMorph.ListArgs{}
args.SetOwnerID(ownerBytes)
val, err := s.client.List(args)
if err != nil {
return nil, errors.Wrap(err, "could not call List method")
}
binCidList := val.CIDList()
cidList := make([]*refs.ContainerID, 0, len(binCidList))
for i := range binCidList {
cid := new(refs.ContainerID)
cid.SetValue(binCidList[i])
cidList = append(cidList, cid)
}
res := new(container.ListResponseBody)
res.SetContainerIDs(cidList)
return res, nil
}
func (s *morphExecutor) SetExtendedACL(ctx context.Context, body *container.SetExtendedACLRequestBody) (*container.SetExtendedACLResponseBody, error) {
eacl := body.GetEACL()
cidBytes, err := eacl.GetContainerID().StableMarshal(nil)
if err != nil {
return nil, errors.Wrap(err, "could not marshal container ID")
}
eaclBytes, err := eacl.StableMarshal(nil)
if err != nil {
return nil, errors.Wrap(err, "could not marshal eACL table")
}
args := containerMorph.SetEACLArgs{}
args.SetCID(cidBytes)
args.SetEACL(eaclBytes)
args.SetSignature(body.GetSignature().GetSign())
if err := s.client.SetEACL(args); err != nil {
return nil, errors.Wrap(err, "could not call SetEACL method")
}
return new(container.SetExtendedACLResponseBody), nil
}
func (s *morphExecutor) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequestBody) (*container.GetExtendedACLResponseBody, error) {
args := containerMorph.EACLArgs{}
args.SetCID(req.GetContainerID().GetValue())
val, err := s.client.EACL(args)
if err != nil {
return nil, errors.Wrap(err, "could not call EACL method")
}
// FIXME: implement and use stable unmarshaler
eaclGRPC := new(aclGRPC.EACLTable)
if err := eaclGRPC.Unmarshal(val.EACL()); err != nil {
return nil, errors.Wrap(err, "could not unmarshal eACL table")
}
eacl := acl.TableFromGRPCMessage(eaclGRPC)
res := new(container.GetExtendedACLResponseBody)
res.SetEACL(eacl)
return res, nil
}

View file

@ -0,0 +1,113 @@
package container
import (
"context"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/v2/container"
"github.com/nspcc-dev/neofs-node/pkg/services/util"
)
type signService struct {
putSigService *util.UnarySignService
getSigService *util.UnarySignService
delSigService *util.UnarySignService
listSigService *util.UnarySignService
setEACLSigService *util.UnarySignService
getEACLSigService *util.UnarySignService
}
func NewSignService(key *ecdsa.PrivateKey, svc container.Service) container.Service {
return &signService{
putSigService: util.NewUnarySignService(
key,
func(ctx context.Context, req interface{}) (interface{}, error) {
return svc.Put(ctx, req.(*container.PutRequest))
},
),
getSigService: util.NewUnarySignService(
key,
func(ctx context.Context, req interface{}) (interface{}, error) {
return svc.Get(ctx, req.(*container.GetRequest))
},
),
delSigService: util.NewUnarySignService(
key,
func(ctx context.Context, req interface{}) (interface{}, error) {
return svc.Delete(ctx, req.(*container.DeleteRequest))
},
),
listSigService: util.NewUnarySignService(
key,
func(ctx context.Context, req interface{}) (interface{}, error) {
return svc.List(ctx, req.(*container.ListRequest))
},
),
setEACLSigService: util.NewUnarySignService(
key,
func(ctx context.Context, req interface{}) (interface{}, error) {
return svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest))
},
),
getEACLSigService: util.NewUnarySignService(
key,
func(ctx context.Context, req interface{}) (interface{}, error) {
return svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest))
},
),
}
}
func (s *signService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
resp, err := s.putSigService.HandleUnaryRequest(ctx, req)
if err != nil {
return nil, err
}
return resp.(*container.PutResponse), nil
}
func (s *signService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
resp, err := s.delSigService.HandleUnaryRequest(ctx, req)
if err != nil {
return nil, err
}
return resp.(*container.DeleteResponse), nil
}
func (s *signService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
resp, err := s.getSigService.HandleUnaryRequest(ctx, req)
if err != nil {
return nil, err
}
return resp.(*container.GetResponse), nil
}
func (s *signService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
resp, err := s.listSigService.HandleUnaryRequest(ctx, req)
if err != nil {
return nil, err
}
return resp.(*container.ListResponse), nil
}
func (s *signService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
resp, err := s.setEACLSigService.HandleUnaryRequest(ctx, req)
if err != nil {
return nil, err
}
return resp.(*container.SetExtendedACLResponse), nil
}
func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
resp, err := s.getEACLSigService.HandleUnaryRequest(ctx, req)
if err != nil {
return nil, err
}
return resp.(*container.GetExtendedACLResponse), nil
}