[#13] services/object: Implement Put handlers

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
remotes/KirillovDenis/release/v0.21.1
Leonard Lyubich 2020-08-25 18:45:14 +03:00 committed by Alex Vanin
parent d03b153e09
commit 62d0c50b15
2 changed files with 75 additions and 7 deletions

View File

@ -27,7 +27,7 @@ type GetRangeObjectBodyStreamer interface {
type ServiceExecutor interface {
Get(context.Context, *object.GetRequestBody) (GetObjectBodyStreamer, error)
Put(context.Context) (object.PutObjectStreamer, error)
Put(context.Context) (PutObjectBodyStreamer, error)
Head(context.Context, *object.HeadRequestBody) (*object.HeadResponseBody, error)
Search(context.Context, *object.SearchRequestBody) (SearchObjectBodyStreamer, error)
Delete(context.Context, *object.DeleteRequestBody) (*object.DeleteResponseBody, error)
@ -53,6 +53,12 @@ type getStreamer struct {
metaHdr *session.ResponseMetaHeader
}
type putStreamer struct {
bodyStreamer PutObjectBodyStreamer
metaHdr *session.ResponseMetaHeader
}
// NewExecutionService wraps ServiceExecutor and returns Object Service interface.
//
// Passed meta header is attached to all responses.
@ -79,7 +85,7 @@ func (s *getStreamer) Recv() (*object.GetResponse, error) {
func (s *executorSvc) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) {
bodyStream, err := s.exec.Get(ctx, req.GetBody())
if err != nil {
return nil, errors.Wrap(err, "could not execute Balance request")
return nil, errors.Wrap(err, "could not execute Get request")
}
return &getStreamer{
@ -88,8 +94,33 @@ func (s *executorSvc) Get(ctx context.Context, req *object.GetRequest) (object.G
}, nil
}
func (*executorSvc) Put(context.Context) (object.PutObjectStreamer, error) {
panic("implement me")
func (s *putStreamer) Send(req *object.PutRequest) error {
return s.bodyStreamer.Send(req.GetBody())
}
func (s *putStreamer) CloseAndRecv() (*object.PutResponse, error) {
body, err := s.bodyStreamer.CloseAndRecv()
if err != nil {
return nil, errors.Wrap(err, "could not receive response body")
}
resp := new(object.PutResponse)
resp.SetBody(body)
resp.SetMetaHeader(s.metaHdr)
return resp, nil
}
func (s *executorSvc) Put(ctx context.Context) (object.PutObjectStreamer, error) {
bodyStream, err := s.exec.Put(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not execute Put request")
}
return &putStreamer{
bodyStreamer: bodyStream,
metaHdr: s.metaHeader,
}, nil
}
func (*executorSvc) Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) {
@ -112,7 +143,7 @@ func (s *searchStreamer) Recv() (*object.SearchResponse, error) {
func (s *executorSvc) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
bodyStream, err := s.exec.Search(ctx, req.GetBody())
if err != nil {
return nil, errors.Wrap(err, "could not execute Balance request")
return nil, errors.Wrap(err, "could not execute Search request")
}
return &searchStreamer{

View File

@ -15,6 +15,7 @@ type signService struct {
searchSigService *util.UnarySignService
getSigService *util.UnarySignService
putService object.Service
}
type searchStreamSigner struct {
@ -29,6 +30,12 @@ type getStreamSigner struct {
stream object.GetObjectStreamer
}
type putStreamSigner struct {
key *ecdsa.PrivateKey
stream object.PutObjectStreamer
}
func NewSignService(key *ecdsa.PrivateKey, svc object.Service) object.Service {
return &signService{
key: key,
@ -44,6 +51,7 @@ func NewSignService(key *ecdsa.PrivateKey, svc object.Service) object.Service {
return svc.Get(ctx, req.(*object.GetRequest))
},
),
putService: svc,
}
}
@ -72,8 +80,37 @@ func (s *signService) Get(ctx context.Context, req *object.GetRequest) (object.G
}, nil
}
func (s *signService) Put(context.Context) (object.PutObjectStreamer, error) {
panic("implement me")
func (s *putStreamSigner) Send(req *object.PutRequest) error {
if err := signature.VerifyServiceMessage(req); err != nil {
return errors.Wrap(err, "could not verify request")
}
return s.stream.Send(req)
}
func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) {
r, err := s.stream.CloseAndRecv()
if err != nil {
return nil, errors.Wrap(err, "could not receive response")
}
if err := signature.SignServiceMessage(s.key, r); err != nil {
return nil, errors.Wrap(err, "could not sign response")
}
return r, nil
}
func (s *signService) Put(ctx context.Context) (object.PutObjectStreamer, error) {
stream, err := s.putService.Put(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not create Put object streamer")
}
return &putStreamSigner{
key: s.key,
stream: stream,
}, nil
}
func (s *signService) Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) {