forked from TrueCloudLab/frostfs-node
[#13] services/util: Support client-side stream in SignService
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
d54a5d4f66
commit
b7702349dc
2 changed files with 65 additions and 28 deletions
|
@ -5,7 +5,6 @@ import (
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/util"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
@ -19,21 +18,19 @@ type signService struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type searchStreamSigner struct {
|
type searchStreamSigner struct {
|
||||||
stream *util.MessageStreamer
|
stream *util.ResponseMessageStreamer
|
||||||
}
|
}
|
||||||
|
|
||||||
type getStreamSigner struct {
|
type getStreamSigner struct {
|
||||||
stream *util.MessageStreamer
|
stream *util.ResponseMessageStreamer
|
||||||
}
|
}
|
||||||
|
|
||||||
type putStreamSigner struct {
|
type putStreamSigner struct {
|
||||||
key *ecdsa.PrivateKey
|
stream *util.RequestMessageStreamer
|
||||||
|
|
||||||
stream object.PutObjectStreamer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type getRangeStreamSigner struct {
|
type getRangeStreamSigner struct {
|
||||||
stream *util.MessageStreamer
|
stream *util.ResponseMessageStreamer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSignService(key *ecdsa.PrivateKey, svc object.Service) object.Service {
|
func NewSignService(key *ecdsa.PrivateKey, svc object.Service) object.Service {
|
||||||
|
@ -55,7 +52,7 @@ func (s *getStreamSigner) Recv() (*object.GetResponse, error) {
|
||||||
|
|
||||||
func (s *signService) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) {
|
func (s *signService) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) {
|
||||||
stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req,
|
stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req,
|
||||||
func(ctx context.Context, req interface{}) (util.MessageReader, error) {
|
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
|
||||||
stream, err := s.svc.Get(ctx, req.(*object.GetRequest))
|
stream, err := s.svc.Get(ctx, req.(*object.GetRequest))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -76,10 +73,6 @@ func (s *signService) Get(ctx context.Context, req *object.GetRequest) (object.G
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *putStreamSigner) Send(req *object.PutRequest) error {
|
func (s *putStreamSigner) Send(req *object.PutRequest) error {
|
||||||
if err := signature.VerifyServiceMessage(req); err != nil {
|
|
||||||
return errors.Wrap(err, "could not verify request")
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.stream.Send(req)
|
return s.stream.Send(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,11 +82,7 @@ func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) {
|
||||||
return nil, errors.Wrap(err, "could not receive response")
|
return nil, errors.Wrap(err, "could not receive response")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := signature.SignServiceMessage(s.key, r); err != nil {
|
return r.(*object.PutResponse), nil
|
||||||
return nil, errors.Wrap(err, "could not sign response")
|
|
||||||
}
|
|
||||||
|
|
||||||
return r, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) Put(ctx context.Context) (object.PutObjectStreamer, error) {
|
func (s *signService) Put(ctx context.Context) (object.PutObjectStreamer, error) {
|
||||||
|
@ -103,8 +92,14 @@ func (s *signService) Put(ctx context.Context) (object.PutObjectStreamer, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &putStreamSigner{
|
return &putStreamSigner{
|
||||||
key: s.key,
|
stream: s.sigSvc.CreateRequestStreamer(
|
||||||
stream: stream,
|
func(req interface{}) error {
|
||||||
|
return stream.Send(req.(*object.PutRequest))
|
||||||
|
},
|
||||||
|
func() (interface{}, error) {
|
||||||
|
return stream.CloseAndRecv()
|
||||||
|
},
|
||||||
|
),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,7 +127,7 @@ func (s *searchStreamSigner) Recv() (*object.SearchResponse, error) {
|
||||||
|
|
||||||
func (s *signService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
|
func (s *signService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
|
||||||
stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req,
|
stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req,
|
||||||
func(ctx context.Context, req interface{}) (util.MessageReader, error) {
|
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
|
||||||
stream, err := s.svc.Search(ctx, req.(*object.SearchRequest))
|
stream, err := s.svc.Search(ctx, req.(*object.SearchRequest))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -176,7 +171,7 @@ func (s *getRangeStreamSigner) Recv() (*object.GetRangeResponse, error) {
|
||||||
|
|
||||||
func (s *signService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
|
func (s *signService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
|
||||||
stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req,
|
stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req,
|
||||||
func(ctx context.Context, req interface{}) (util.MessageReader, error) {
|
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
|
||||||
stream, err := s.svc.GetRange(ctx, req.(*object.GetRangeRequest))
|
stream, err := s.svc.GetRange(ctx, req.(*object.GetRangeRequest))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -14,14 +14,26 @@ type SignService struct {
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerStreamHandler func(context.Context, interface{}) (MessageReader, error)
|
type ServerStreamHandler func(context.Context, interface{}) (ResponseMessageReader, error)
|
||||||
|
|
||||||
type MessageReader func() (interface{}, error)
|
type ResponseMessageReader func() (interface{}, error)
|
||||||
|
|
||||||
type MessageStreamer struct {
|
type ResponseMessageStreamer struct {
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
|
|
||||||
recv MessageReader
|
recv ResponseMessageReader
|
||||||
|
}
|
||||||
|
|
||||||
|
type RequestMessageWriter func(interface{}) error
|
||||||
|
|
||||||
|
type ClientStreamCloser func() (interface{}, error)
|
||||||
|
|
||||||
|
type RequestMessageStreamer struct {
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
|
||||||
|
send RequestMessageWriter
|
||||||
|
|
||||||
|
close ClientStreamCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUnarySignService(key *ecdsa.PrivateKey) *SignService {
|
func NewUnarySignService(key *ecdsa.PrivateKey) *SignService {
|
||||||
|
@ -30,7 +42,37 @@ func NewUnarySignService(key *ecdsa.PrivateKey) *SignService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MessageStreamer) Recv() (interface{}, error) {
|
func (s *RequestMessageStreamer) Send(req interface{}) error {
|
||||||
|
// verify request signatures
|
||||||
|
if err := signature.VerifyServiceMessage(req); err != nil {
|
||||||
|
return errors.Wrap(err, "could not verify request")
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.send(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *RequestMessageStreamer) CloseAndRecv() (interface{}, error) {
|
||||||
|
resp, err := s.close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not close stream and receive response")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := signature.SignServiceMessage(s.key, resp); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not sign response")
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SignService) CreateRequestStreamer(sender RequestMessageWriter, closer ClientStreamCloser) *RequestMessageStreamer {
|
||||||
|
return &RequestMessageStreamer{
|
||||||
|
key: s.key,
|
||||||
|
send: sender,
|
||||||
|
close: closer,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ResponseMessageStreamer) Recv() (interface{}, error) {
|
||||||
m, err := s.recv()
|
m, err := s.recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "could not receive response message for signing")
|
return nil, errors.Wrap(err, "could not receive response message for signing")
|
||||||
|
@ -43,7 +85,7 @@ func (s *MessageStreamer) Recv() (interface{}, error) {
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SignService) HandleServerStreamRequest(ctx context.Context, req interface{}, handler ServerStreamHandler) (*MessageStreamer, error) {
|
func (s *SignService) HandleServerStreamRequest(ctx context.Context, req interface{}, handler ServerStreamHandler) (*ResponseMessageStreamer, error) {
|
||||||
// verify request signatures
|
// verify request signatures
|
||||||
if err := signature.VerifyServiceMessage(req); err != nil {
|
if err := signature.VerifyServiceMessage(req); err != nil {
|
||||||
return nil, errors.Wrap(err, "could not verify request")
|
return nil, errors.Wrap(err, "could not verify request")
|
||||||
|
@ -54,7 +96,7 @@ func (s *SignService) HandleServerStreamRequest(ctx context.Context, req interfa
|
||||||
return nil, errors.Wrap(err, "could not create message reader")
|
return nil, errors.Wrap(err, "could not create message reader")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &MessageStreamer{
|
return &ResponseMessageStreamer{
|
||||||
key: s.key,
|
key: s.key,
|
||||||
recv: msgRdr,
|
recv: msgRdr,
|
||||||
}, nil
|
}, nil
|
||||||
|
|
Loading…
Reference in a new issue