diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go new file mode 100644 index 0000000000..b7c7b844aa --- /dev/null +++ b/pkg/services/object/get/prm.go @@ -0,0 +1,27 @@ +package getsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +type Prm struct { + local, full bool + + addr *object.Address +} + +func (p *Prm) OnlyLocal(v bool) *Prm { + if p != nil { + p.local = v + } + + return p +} + +func (p *Prm) WithAddress(v *object.Address) *Prm { + if p != nil { + p.addr = v + } + + return p +} diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go new file mode 100644 index 0000000000..f72e696bc3 --- /dev/null +++ b/pkg/services/object/get/service.go @@ -0,0 +1,55 @@ +package getsvc + +import ( + "context" + + rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" + "github.com/pkg/errors" +) + +type Service struct { + *cfg +} + +type Option func(*cfg) + +type cfg struct { + rngSvc *rangesvc.Service +} + +func defaultCfg() *cfg { + return new(cfg) +} + +func NewService(opts ...Option) *Service { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +func (s *Service) Get(ctx context.Context, prm *Prm) (*Streamer, error) { + r, err := s.rngSvc.GetRange(ctx, new(rangesvc.Prm). + WithAddress(prm.addr). + FullRange(). + OnlyLocal(prm.local), + ) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not get range", s) + } + + return &Streamer{ + rngRes: r, + }, nil +} + +func WithRangeService(v *rangesvc.Service) Option { + return func(c *cfg) { + c.rngSvc = v + } +} diff --git a/pkg/services/object/get/streamer.go b/pkg/services/object/get/streamer.go new file mode 100644 index 0000000000..28ac9507b5 --- /dev/null +++ b/pkg/services/object/get/streamer.go @@ -0,0 +1,26 @@ +package getsvc + +import ( + rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" + "github.com/pkg/errors" +) + +type Streamer struct { + headSent bool + + rngRes *rangesvc.Result +} + +func (p *Streamer) Recv() (interface{}, error) { + if !p.headSent { + p.headSent = true + return p.rngRes.Head(), nil + } + + rngResp, err := p.rngRes.Stream().Recv() + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not receive range response", p) + } + + return rngResp.PayloadChunk(), nil +} diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go new file mode 100644 index 0000000000..d8637b5154 --- /dev/null +++ b/pkg/services/object/get/v2/service.go @@ -0,0 +1,50 @@ +package getsvc + +import ( + "context" + + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" + "github.com/pkg/errors" +) + +// Service implements Get operation of Object service v2. +type Service struct { + *cfg +} + +// Option represents Service constructor option. +type Option func(*cfg) + +type cfg struct { + svc *getsvc.Service +} + +// NewService constructs Service instance from provided options. +func NewService(opts ...Option) *Service { + c := new(cfg) + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +// Get calls internal service and returns v2 object stream. +func (s *Service) Get(ctx context.Context, req *objectV2.GetRequest) (objectV2.GetObjectStreamer, error) { + stream, err := s.svc.Get(ctx, toPrm(req)) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not get object payload range data", s) + } + + return fromResponse(stream), nil +} + +func WithInternalService(v *getsvc.Service) Option { + return func(c *cfg) { + c.svc = v + } +} diff --git a/pkg/services/object/get/v2/streamer.go b/pkg/services/object/get/v2/streamer.go new file mode 100644 index 0000000000..2438310fed --- /dev/null +++ b/pkg/services/object/get/v2/streamer.go @@ -0,0 +1,47 @@ +package getsvc + +import ( + "fmt" + + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" + "github.com/pkg/errors" +) + +type streamer struct { + stream *getsvc.Streamer + + body *objectV2.GetResponseBody +} + +func (s *streamer) Recv() (*objectV2.GetResponse, error) { + r, err := s.stream.Recv() + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not receive get response", s) + } + + switch v := r.(type) { + case *object.Object: + oV2 := v.ToV2() + + partInit := new(objectV2.GetObjectPartInit) + partInit.SetHeader(oV2.GetHeader()) + partInit.SetSignature(oV2.GetSignature()) + partInit.SetObjectID(oV2.GetObjectID()) + + s.body.SetObjectPart(partInit) + case []byte: + partChunk := new(objectV2.GetObjectPartChunk) + partChunk.SetChunk(v) + + s.body.SetObjectPart(partChunk) + default: + panic(fmt.Sprintf("unexpected response type %T from %T", r, s.stream)) + } + + resp := new(objectV2.GetResponse) + resp.SetBody(s.body) + + return resp, nil +} diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go new file mode 100644 index 0000000000..a81107b6e0 --- /dev/null +++ b/pkg/services/object/get/v2/util.go @@ -0,0 +1,22 @@ +package getsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" +) + +func toPrm(req *objectV2.GetRequest) *getsvc.Prm { + return new(getsvc.Prm). + WithAddress( + object.NewAddressFromV2(req.GetBody().GetAddress()), + ). + OnlyLocal(req.GetMetaHeader().GetTTL() == 1) // FIXME: use constant +} + +func fromResponse(res *getsvc.Streamer) objectV2.GetObjectStreamer { + return &streamer{ + stream: res, + body: new(objectV2.GetResponseBody), + } +}