From 652f64ae85203c18528c8fa323583046f4dac651 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 25 Aug 2020 18:57:57 +0300 Subject: [PATCH] [#13] services/object: Implement GetRange handlers Signed-off-by: Leonard Lyubich --- pkg/services/object/executor.go | 31 +++++++++++++++++++++++++-- pkg/services/object/sign.go | 38 +++++++++++++++++++++++++++++++-- 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/pkg/services/object/executor.go b/pkg/services/object/executor.go index 84bdc8b89..6cfefd56c 100644 --- a/pkg/services/object/executor.go +++ b/pkg/services/object/executor.go @@ -59,6 +59,12 @@ type putStreamer struct { metaHdr *session.ResponseMetaHeader } +type rangeStreamer struct { + bodyStreamer GetRangeObjectBodyStreamer + + metaHdr *session.ResponseMetaHeader +} + // NewExecutionService wraps ServiceExecutor and returns Object Service interface. // // Passed meta header is attached to all responses. @@ -156,8 +162,29 @@ func (*executorSvc) Delete(context.Context, *object.DeleteRequest) (*object.Dele panic("implement me") } -func (*executorSvc) GetRange(context.Context, *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { - panic("implement me") +func (s *rangeStreamer) Recv() (*object.GetRangeResponse, error) { + body, err := s.bodyStreamer.Recv() + if err != nil { + return nil, errors.Wrap(err, "could not receive response body") + } + + resp := new(object.GetRangeResponse) + resp.SetBody(body) + resp.SetMetaHeader(s.metaHdr) + + return resp, nil +} + +func (s *executorSvc) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { + bodyStream, err := s.exec.GetRange(ctx, req.GetBody()) + if err != nil { + return nil, errors.Wrap(err, "could not execute GetRange request") + } + + return &rangeStreamer{ + bodyStreamer: bodyStream, + metaHdr: s.metaHeader, + }, nil } func (*executorSvc) GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { diff --git a/pkg/services/object/sign.go b/pkg/services/object/sign.go index 03a8c4f98..e0cd28e88 100644 --- a/pkg/services/object/sign.go +++ b/pkg/services/object/sign.go @@ -16,6 +16,7 @@ type signService struct { searchSigService *util.UnarySignService getSigService *util.UnarySignService putService object.Service + rangeSigService *util.UnarySignService } type searchStreamSigner struct { @@ -36,6 +37,12 @@ type putStreamSigner struct { stream object.PutObjectStreamer } +type getRangeStreamSigner struct { + key *ecdsa.PrivateKey + + stream object.GetRangeObjectStreamer +} + func NewSignService(key *ecdsa.PrivateKey, svc object.Service) object.Service { return &signService{ key: key, @@ -52,6 +59,12 @@ func NewSignService(key *ecdsa.PrivateKey, svc object.Service) object.Service { }, ), putService: svc, + rangeSigService: util.NewUnarySignService( + nil, // private key is not needed because service returns stream + func(ctx context.Context, req interface{}) (interface{}, error) { + return svc.GetRange(ctx, req.(*object.GetRangeRequest)) + }, + ), } } @@ -146,8 +159,29 @@ func (s *signService) Delete(context.Context, *object.DeleteRequest) (*object.De panic("implement me") } -func (s *signService) GetRange(context.Context, *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { - panic("implement me") +func (s *getRangeStreamSigner) Recv() (*object.GetRangeResponse, error) { + r, err := s.stream.Recv() + if err != nil { + return nil, errors.Wrap(err, "could not receive response") + } + + if err := signature.SignServiceMessage(s.key, r); err != nil { + return nil, errors.Wrap(err, "could not sign response") + } + + return r, nil +} + +func (s *signService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { + resp, err := s.rangeSigService.HandleServerStreamRequest(ctx, req) + if err != nil { + return nil, err + } + + return &getRangeStreamSigner{ + key: s.key, + stream: resp.(object.GetRangeObjectStreamer), + }, nil } func (s *signService) GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {