diff --git a/pkg/services/object/executor.go b/pkg/services/object/executor.go index 5d11d4d209..84bdc8b894 100644 --- a/pkg/services/object/executor.go +++ b/pkg/services/object/executor.go @@ -27,7 +27,7 @@ type GetRangeObjectBodyStreamer interface { type ServiceExecutor interface { Get(context.Context, *object.GetRequestBody) (GetObjectBodyStreamer, error) - Put(context.Context) (object.PutObjectStreamer, error) + Put(context.Context) (PutObjectBodyStreamer, error) Head(context.Context, *object.HeadRequestBody) (*object.HeadResponseBody, error) Search(context.Context, *object.SearchRequestBody) (SearchObjectBodyStreamer, error) Delete(context.Context, *object.DeleteRequestBody) (*object.DeleteResponseBody, error) @@ -53,6 +53,12 @@ type getStreamer struct { metaHdr *session.ResponseMetaHeader } +type putStreamer struct { + bodyStreamer PutObjectBodyStreamer + + metaHdr *session.ResponseMetaHeader +} + // NewExecutionService wraps ServiceExecutor and returns Object Service interface. // // Passed meta header is attached to all responses. @@ -79,7 +85,7 @@ func (s *getStreamer) Recv() (*object.GetResponse, error) { func (s *executorSvc) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) { bodyStream, err := s.exec.Get(ctx, req.GetBody()) if err != nil { - return nil, errors.Wrap(err, "could not execute Balance request") + return nil, errors.Wrap(err, "could not execute Get request") } return &getStreamer{ @@ -88,8 +94,33 @@ func (s *executorSvc) Get(ctx context.Context, req *object.GetRequest) (object.G }, nil } -func (*executorSvc) Put(context.Context) (object.PutObjectStreamer, error) { - panic("implement me") +func (s *putStreamer) Send(req *object.PutRequest) error { + return s.bodyStreamer.Send(req.GetBody()) +} + +func (s *putStreamer) CloseAndRecv() (*object.PutResponse, error) { + body, err := s.bodyStreamer.CloseAndRecv() + if err != nil { + return nil, errors.Wrap(err, "could not receive response body") + } + + resp := new(object.PutResponse) + resp.SetBody(body) + resp.SetMetaHeader(s.metaHdr) + + return resp, nil +} + +func (s *executorSvc) Put(ctx context.Context) (object.PutObjectStreamer, error) { + bodyStream, err := s.exec.Put(ctx) + if err != nil { + return nil, errors.Wrap(err, "could not execute Put request") + } + + return &putStreamer{ + bodyStreamer: bodyStream, + metaHdr: s.metaHeader, + }, nil } func (*executorSvc) Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) { @@ -112,7 +143,7 @@ func (s *searchStreamer) Recv() (*object.SearchResponse, error) { func (s *executorSvc) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { bodyStream, err := s.exec.Search(ctx, req.GetBody()) if err != nil { - return nil, errors.Wrap(err, "could not execute Balance request") + return nil, errors.Wrap(err, "could not execute Search request") } return &searchStreamer{ diff --git a/pkg/services/object/sign.go b/pkg/services/object/sign.go index b2f5cdfbff..03a8c4f984 100644 --- a/pkg/services/object/sign.go +++ b/pkg/services/object/sign.go @@ -15,6 +15,7 @@ type signService struct { searchSigService *util.UnarySignService getSigService *util.UnarySignService + putService object.Service } type searchStreamSigner struct { @@ -29,6 +30,12 @@ type getStreamSigner struct { stream object.GetObjectStreamer } +type putStreamSigner struct { + key *ecdsa.PrivateKey + + stream object.PutObjectStreamer +} + func NewSignService(key *ecdsa.PrivateKey, svc object.Service) object.Service { return &signService{ key: key, @@ -44,6 +51,7 @@ func NewSignService(key *ecdsa.PrivateKey, svc object.Service) object.Service { return svc.Get(ctx, req.(*object.GetRequest)) }, ), + putService: svc, } } @@ -72,8 +80,37 @@ func (s *signService) Get(ctx context.Context, req *object.GetRequest) (object.G }, nil } -func (s *signService) Put(context.Context) (object.PutObjectStreamer, error) { - panic("implement me") +func (s *putStreamSigner) Send(req *object.PutRequest) error { + if err := signature.VerifyServiceMessage(req); err != nil { + return errors.Wrap(err, "could not verify request") + } + + return s.stream.Send(req) +} + +func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) { + r, err := s.stream.CloseAndRecv() + if err != nil { + return nil, errors.Wrap(err, "could not receive response") + } + + if err := signature.SignServiceMessage(s.key, r); err != nil { + return nil, errors.Wrap(err, "could not sign response") + } + + return r, nil +} + +func (s *signService) Put(ctx context.Context) (object.PutObjectStreamer, error) { + stream, err := s.putService.Put(ctx) + if err != nil { + return nil, errors.Wrap(err, "could not create Put object streamer") + } + + return &putStreamSigner{ + key: s.key, + stream: stream, + }, nil } func (s *signService) Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) {