[#50] services/object: Implement GetRange service

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-09-25 16:04:55 +03:00 committed by Alex Vanin
parent dd16f568c3
commit 0490107165
12 changed files with 742 additions and 1 deletions

2
go.mod
View file

@ -13,7 +13,7 @@ require (
github.com/multiformats/go-multiaddr-net v0.1.2 // v0.1.1 => v0.1.2
github.com/multiformats/go-multihash v0.0.13 // indirect
github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78
github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200922150714-14fa89b81919
github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200925125840-c814cc62faf4
github.com/nspcc-dev/neofs-crypto v0.3.0
github.com/nspcc-dev/tzhash v1.4.0
github.com/panjf2000/ants/v2 v2.3.0

BIN
go.sum

Binary file not shown.

View file

@ -0,0 +1,112 @@
package rangesvc
import (
"fmt"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
)
type rangeTraverser struct {
chain *rangeChain
seekBounds *rangeBounds
}
type rangeBounds struct {
left, right uint64
}
type objectRange struct {
rng *objectSDK.Range
id *objectSDK.ID
}
type rangeChain struct {
next, prev *rangeChain
bounds *rangeBounds
id *objectSDK.ID
}
func newRangeTraverser(originSize uint64, rightElement *object.Object, rngSeek *objectSDK.Range) *rangeTraverser {
right := &rangeChain{
bounds: &rangeBounds{
left: originSize - rightElement.GetPayloadSize(),
right: originSize,
},
id: rightElement.GetID(),
}
left := &rangeChain{
id: rightElement.GetPreviousID(),
}
left.next, right.prev = right, left
return &rangeTraverser{
chain: right,
seekBounds: &rangeBounds{
left: rngSeek.GetOffset(),
right: rngSeek.GetOffset() + rngSeek.GetLength(),
},
}
}
func (c *rangeTraverser) next() *objectRange {
left := c.chain.bounds.left
seekLeft := c.seekBounds.left
res := new(objectRange)
if left > seekLeft {
res.id = c.chain.prev.id
} else {
res.id = c.chain.id
res.rng = objectSDK.NewRange()
res.rng.SetOffset(seekLeft - left)
res.rng.SetLength(min(c.chain.bounds.right, c.seekBounds.right) - seekLeft)
}
return res
}
func min(a, b uint64) uint64 {
if a < b {
return a
}
return b
}
func (c *rangeTraverser) pushHeader(obj *object.Object) {
id := obj.GetID()
if !id.Equal(c.chain.prev.id) {
panic(fmt.Sprintf("(%T) unexpected identifier in header", c))
}
sz := obj.GetPayloadSize()
c.chain.prev.bounds = &rangeBounds{
left: c.chain.bounds.left - sz,
right: c.chain.bounds.left,
}
c.chain = c.chain.prev
c.chain.prev = &rangeChain{
next: c.chain,
id: obj.GetPreviousID(),
}
}
func (c *rangeTraverser) pushSuccessSize(sz uint64) {
c.seekBounds.left += sz
if c.seekBounds.left >= c.chain.bounds.right && c.chain.next != nil {
c.chain = c.chain.next
}
}

View file

@ -0,0 +1,36 @@
package rangesvc
import (
"io"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/pkg/errors"
)
type localRangeWriter struct {
addr *object.Address
rng *object.Range
storage *localstore.Storage
}
func (l *localRangeWriter) WriteTo(w io.Writer) (int64, error) {
obj, err := l.storage.Get(l.addr)
if err != nil {
return 0, errors.Wrapf(err, "(%T) could not get object from local storage", l)
}
payload := obj.GetPayload()
left := l.rng.GetOffset()
right := left + l.rng.GetLength()
if ln := uint64(len(payload)); ln < right {
return 0, errors.Errorf("(%T) object range is out-of-boundaries (size %d, range [%d:%d]", l, ln, left, right)
}
n, err := w.Write(payload[left:right])
return int64(n), err
}

View file

@ -0,0 +1,39 @@
package rangesvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
)
type Prm struct {
local bool
addr *object.Address
rng *object.Range
traverser *rangeTraverser
}
func (p *Prm) OnlyLocal(v bool) *Prm {
if p != nil {
p.local = v
}
return p
}
func (p *Prm) WithAddress(v *object.Address) *Prm {
if p != nil {
p.addr = v
}
return p
}
func (p *Prm) WithRange(v *object.Range) *Prm {
if p != nil {
p.rng = v
}
return p
}

View file

@ -0,0 +1,49 @@
package rangesvc
import (
"context"
"crypto/ecdsa"
"io"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/pkg/errors"
)
type remoteRangeWriter struct {
ctx context.Context
key *ecdsa.PrivateKey
node *network.Address
addr *object.Address
rng *object.Range
}
func (r *remoteRangeWriter) WriteTo(w io.Writer) (int64, error) {
addr := r.node.NetAddr()
c, err := client.New(r.key,
client.WithAddress(addr),
)
if err != nil {
return 0, errors.Wrapf(err, "(%T) could not create SDK client %s", r, addr)
}
// TODO: change ObjectPayloadRangeData to implement WriterTo
chunk, err := c.ObjectPayloadRangeData(r.ctx, new(client.RangeDataParams).
WithRange(r.rng).
WithAddress(r.addr),
client.WithTTL(1), // FIXME: use constant
)
if err != nil {
return 0, errors.Wrapf(err, "(%T) could not read object payload range from %s", r, addr)
}
n, err := w.Write(chunk)
return int64(n), err
}

View file

@ -0,0 +1,9 @@
package rangesvc
type Response struct {
chunk []byte
}
func (r *Response) PayloadChunk() []byte {
return r.chunk
}

View file

@ -0,0 +1,159 @@
package rangesvc
import (
"context"
"crypto/ecdsa"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/nspcc-dev/neofs-node/pkg/network"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/pkg/errors"
)
type Service struct {
*cfg
}
type Option func(*cfg)
type cfg struct {
key *ecdsa.PrivateKey
localStore *localstore.Storage
cnrSrc container.Source
netMapSrc netmap.Source
workerPool util.WorkerPool
localAddrSrc network.LocalAddressSource
headSvc *headsvc.Service
}
func defaultCfg() *cfg {
return &cfg{
workerPool: new(util.SyncWorkerPool),
}
}
func NewService(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
func (s *Service) GetRange(ctx context.Context, prm *Prm) (Streamer, error) {
headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm).
WithAddress(prm.addr).
OnlyLocal(prm.local),
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive Head result", s)
}
off, ln := prm.rng.GetOffset(), prm.rng.GetLength()
origin := headResult.Header()
originSize := origin.GetPayloadSize()
if originSize < off+ln {
return nil, errors.Errorf("(%T) requested payload range is out-of-bounds", s)
}
right := headResult.RightChild()
if right == nil {
right = origin
}
rngTraverser := newRangeTraverser(originSize, right, prm.rng)
if err := s.fillTraverser(ctx, prm, rngTraverser); err != nil {
return nil, errors.Wrapf(err, "(%T) could not fill range traverser", s)
}
return &streamer{
cfg: s.cfg,
once: new(sync.Once),
ctx: ctx,
prm: prm,
rangeTraverser: rngTraverser,
}, nil
}
func (s *Service) fillTraverser(ctx context.Context, prm *Prm, traverser *rangeTraverser) error {
addr := object.NewAddress()
addr.SetContainerID(prm.addr.GetContainerID())
for {
next := traverser.next()
if next.rng != nil {
return nil
}
addr.SetObjectID(next.id)
head, err := s.headSvc.Head(ctx, new(headsvc.Prm).
WithAddress(addr).
OnlyLocal(prm.local),
)
if err != nil {
return errors.Wrapf(err, "(%T) could not receive object header", s)
}
traverser.pushHeader(head.Header())
}
}
func WithKey(v *ecdsa.PrivateKey) Option {
return func(c *cfg) {
c.key = v
}
}
func WithLocalStorage(v *localstore.Storage) Option {
return func(c *cfg) {
c.localStore = v
}
}
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
func WithNetworkMapSource(v netmap.Source) Option {
return func(c *cfg) {
c.netMapSrc = v
}
}
func WithWorkerPool(v util.WorkerPool) Option {
return func(c *cfg) {
c.workerPool = v
}
}
func WithLocalAddressSource(v network.LocalAddressSource) Option {
return func(c *cfg) {
c.localAddrSrc = v
}
}
func WithHeadService(v *headsvc.Service) Option {
return func(c *cfg) {
c.headSvc = v
}
}

View file

@ -0,0 +1,235 @@
package rangesvc
import (
"context"
"io"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/pkg/errors"
)
type Streamer interface {
Recv() (*Response, error)
}
type streamer struct {
*cfg
once *sync.Once
ctx context.Context
prm *Prm
traverser *placement.Traverser
rangeTraverser *rangeTraverser
ch chan []byte
}
type chunkWriter struct {
ctx context.Context
ch chan<- []byte
written uint64
}
func (p *streamer) Recv() (*Response, error) {
var err error
p.once.Do(func() {
p.ch = make(chan []byte)
err = p.workerPool.Submit(p.start)
})
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not start streaming", p)
}
select {
case <-p.ctx.Done():
return nil, errors.Wrapf(p.ctx.Err(), "(%T) stream is stopped by context", p)
case v, ok := <-p.ch:
if !ok {
if p.rangeTraverser.next().rng.GetLength() != 0 {
return nil, errors.Errorf("(%T) incomplete get payload range", p)
}
return nil, io.EOF
}
return &Response{
chunk: v,
}, nil
}
}
func (p *streamer) switchToObject(id *object.ID) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return errors.Wrapf(err, "(%T) could not get latest network map", p)
}
// get container to read payload range
cnr, err := p.cnrSrc.Get(p.prm.addr.GetContainerID())
if err != nil {
return errors.Wrapf(err, "(%T) could not get container by ID", p)
}
// allocate placement traverser options
traverseOpts := make([]placement.Option, 0, 4)
// add common options
traverseOpts = append(traverseOpts,
// set processing container
placement.ForContainer(cnr),
// set success count (1st incoming full range)
placement.SuccessAfter(1),
// set identifier of the processing object
placement.ForObject(id),
)
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if p.prm.local {
// use local-only placement builder
builder = util.NewLocalPlacement(builder, p.localAddrSrc)
}
// set placement builder
traverseOpts = append(traverseOpts, placement.UseBuilder(builder))
// build placement traverser
if p.traverser, err = placement.NewTraverser(traverseOpts...); err != nil {
return errors.Wrapf(err, "(%T) could not build placement traverser", p)
}
return nil
}
func (p *streamer) start() {
defer close(p.ch)
objAddr := object.NewAddress()
objAddr.SetContainerID(p.prm.addr.GetContainerID())
loop:
for {
select {
case <-p.ctx.Done():
// TODO: log this
break loop
default:
}
nextRange := p.rangeTraverser.next()
if nextRange.rng.GetLength() == 0 {
break
} else if err := p.switchToObject(nextRange.id); err != nil {
// TODO: log error
break
}
objAddr.SetObjectID(nextRange.id)
subloop:
for {
select {
case <-p.ctx.Done():
// TODO: log this
break loop
default:
}
addrs := p.traverser.Next()
if len(addrs) == 0 {
break
}
for i := range addrs {
wg := new(sync.WaitGroup)
wg.Add(1)
addr := addrs[i]
if err := p.workerPool.Submit(func() {
defer wg.Done()
var rngWriter io.WriterTo
if network.IsLocalAddress(p.localAddrSrc, addr) {
rngWriter = &localRangeWriter{
addr: objAddr,
rng: nextRange.rng,
storage: p.localStore,
}
} else {
rngWriter = &remoteRangeWriter{
ctx: p.ctx,
key: p.key,
node: addr,
addr: objAddr,
rng: nextRange.rng,
}
}
written, err := rngWriter.WriteTo(&chunkWriter{
ctx: p.ctx,
ch: p.ch,
})
if err != nil {
// TODO: log error
}
ln := nextRange.rng.GetLength()
uw := uint64(written)
p.rangeTraverser.pushSuccessSize(uw)
nextRange.rng.SetLength(ln - uw)
nextRange.rng.SetOffset(nextRange.rng.GetOffset() + uw)
}); err != nil {
wg.Done()
// TODO: log error
break loop
}
wg.Wait()
if nextRange.rng.GetLength() == 0 {
p.traverser.SubmitSuccess()
break subloop
}
}
}
if !p.traverser.Success() {
// TODO: log error
break loop
}
}
}
func (w *chunkWriter) Write(p []byte) (int, error) {
select {
case <-w.ctx.Done():
return 0, w.ctx.Err()
case w.ch <- p:
}
w.written += uint64(len(p))
return len(p), nil
}

View file

@ -0,0 +1,50 @@
package rangesvc
import (
"context"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
"github.com/pkg/errors"
)
// Service implements GetRange operation of Object service v2.
type Service struct {
*cfg
}
// Option represents Service constructor option.
type Option func(*cfg)
type cfg struct {
svc *rangesvc.Service
}
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
c := new(cfg)
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
// GetRange calls internal service and returns v2 object payload range stream.
func (s *Service) GetRange(ctx context.Context, req *objectV2.GetRangeRequest) (objectV2.GetRangeObjectStreamer, error) {
stream, err := s.svc.GetRange(ctx, toPrm(req))
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not get object payload range data", s)
}
return fromResponse(stream), nil
}
func WithInternalService(v *rangesvc.Service) Option {
return func(c *cfg) {
c.svc = v
}
}

View file

@ -0,0 +1,27 @@
package rangesvc
import (
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
"github.com/pkg/errors"
)
type streamer struct {
stream rangesvc.Streamer
body *objectV2.GetRangeResponseBody
}
func (s *streamer) Recv() (*objectV2.GetRangeResponse, error) {
r, err := s.stream.Recv()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not read response from stream", s)
}
s.body.SetChunk(r.PayloadChunk())
resp := new(objectV2.GetRangeResponse)
resp.SetBody(s.body)
return resp, nil
}

View file

@ -0,0 +1,25 @@
package rangesvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
)
func toPrm(req *objectV2.GetRangeRequest) *rangesvc.Prm {
body := req.GetBody()
return new(rangesvc.Prm).
WithAddress(
object.NewAddressFromV2(body.GetAddress()),
).
WithRange(object.NewRangeFromV2(body.GetRange())).
OnlyLocal(req.GetMetaHeader().GetTTL() == 1) // FIXME: use constant
}
func fromResponse(stream rangesvc.Streamer) objectV2.GetRangeObjectStreamer {
return &streamer{
stream: stream,
body: new(objectV2.GetRangeResponseBody),
}
}