[] object/range: Share object range traverser between services

Replace object range chain utilities to object/util package in order to
reuse it in other services.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-09-28 16:10:50 +03:00 committed by Alex Vanin
parent 759605410a
commit 8791c4f0a5
4 changed files with 126 additions and 21 deletions
pkg/services/object

View file

@ -10,8 +10,6 @@ type Prm struct {
addr *object.Address
rng *object.Range
traverser *rangeTraverser
}
func (p *Prm) OnlyLocal(v bool) *Prm {

View file

@ -11,6 +11,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/nspcc-dev/neofs-node/pkg/network"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/pkg/errors"
)
@ -82,7 +83,7 @@ func (s *Service) GetRange(ctx context.Context, prm *Prm) (*Result, error) {
right = origin
}
rngTraverser := newRangeTraverser(originSize, right, prm.rng)
rngTraverser := objutil.NewRangeTraverser(originSize, right, prm.rng)
if err := s.fillTraverser(ctx, prm, rngTraverser); err != nil {
return nil, errors.Wrapf(err, "(%T) could not fill range traverser", s)
}
@ -99,17 +100,17 @@ func (s *Service) GetRange(ctx context.Context, prm *Prm) (*Result, error) {
}, nil
}
func (s *Service) fillTraverser(ctx context.Context, prm *Prm, traverser *rangeTraverser) error {
func (s *Service) fillTraverser(ctx context.Context, prm *Prm, traverser *objutil.RangeTraverser) error {
addr := object.NewAddress()
addr.SetContainerID(prm.addr.GetContainerID())
for {
next := traverser.next()
if next.rng != nil {
nextID, nextRng := traverser.Next()
if nextRng != nil {
return nil
}
addr.SetObjectID(next.id)
addr.SetObjectID(nextID)
head, err := s.headSvc.Head(ctx, new(headsvc.Prm).
WithAddress(addr).
@ -119,7 +120,7 @@ func (s *Service) fillTraverser(ctx context.Context, prm *Prm, traverser *rangeT
return errors.Wrapf(err, "(%T) could not receive object header", s)
}
traverser.pushHeader(head.Header())
traverser.PushHeader(head.Header())
}
}

View file

@ -28,7 +28,7 @@ type streamer struct {
traverser *placement.Traverser
rangeTraverser *rangeTraverser
rangeTraverser *util.RangeTraverser
ch chan []byte
}
@ -58,7 +58,7 @@ func (p *streamer) Recv() (*Response, error) {
return nil, errors.Wrapf(p.ctx.Err(), "(%T) stream is stopped by context", p)
case v, ok := <-p.ch:
if !ok {
if p.rangeTraverser.next().rng.GetLength() != 0 {
if _, rng := p.rangeTraverser.Next(); rng.GetLength() != 0 {
return nil, errors.Errorf("(%T) incomplete get payload range", p)
}
@ -135,15 +135,15 @@ loop:
default:
}
nextRange := p.rangeTraverser.next()
if nextRange.rng.GetLength() == 0 {
nextID, nextRange := p.rangeTraverser.Next()
if nextRange.GetLength() == 0 {
break
} else if err := p.switchToObject(nextRange.id); err != nil {
} else if err := p.switchToObject(nextID); err != nil {
// TODO: log error
break
}
objAddr.SetObjectID(nextRange.id)
objAddr.SetObjectID(nextID)
subloop:
for {
@ -173,7 +173,7 @@ loop:
if network.IsLocalAddress(p.localAddrSrc, addr) {
rngWriter = &localRangeWriter{
addr: objAddr,
rng: nextRange.rng,
rng: nextRange,
storage: p.localStore,
}
} else {
@ -182,7 +182,7 @@ loop:
key: p.key,
node: addr,
addr: objAddr,
rng: nextRange.rng,
rng: nextRange,
}
}
@ -194,12 +194,12 @@ loop:
// TODO: log error
}
ln := nextRange.rng.GetLength()
ln := nextRange.GetLength()
uw := uint64(written)
p.rangeTraverser.pushSuccessSize(uw)
nextRange.rng.SetLength(ln - uw)
nextRange.rng.SetOffset(nextRange.rng.GetOffset() + uw)
p.rangeTraverser.PushSuccessSize(uw)
nextRange.SetLength(ln - uw)
nextRange.SetOffset(nextRange.GetOffset() + uw)
}); err != nil {
wg.Done()
// TODO: log error
@ -208,7 +208,7 @@ loop:
wg.Wait()
if nextRange.rng.GetLength() == 0 {
if nextRange.GetLength() == 0 {
p.traverser.SubmitSuccess()
break subloop
}

View file

@ -0,0 +1,106 @@
package util
import (
"fmt"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
)
type RangeTraverser struct {
chain *rangeChain
seekBounds *rangeBounds
}
type rangeBounds struct {
left, right uint64
}
type rangeChain struct {
next, prev *rangeChain
bounds *rangeBounds
id *objectSDK.ID
}
func NewRangeTraverser(originSize uint64, rightElement *object.Object, rngSeek *objectSDK.Range) *RangeTraverser {
right := &rangeChain{
bounds: &rangeBounds{
left: originSize - rightElement.GetPayloadSize(),
right: originSize,
},
id: rightElement.GetID(),
}
left := &rangeChain{
id: rightElement.GetPreviousID(),
}
left.next, right.prev = right, left
return &RangeTraverser{
chain: right,
seekBounds: &rangeBounds{
left: rngSeek.GetOffset(),
right: rngSeek.GetOffset() + rngSeek.GetLength(),
},
}
}
func (c *RangeTraverser) Next() (id *objectSDK.ID, rng *objectSDK.Range) {
left := c.chain.bounds.left
seekLeft := c.seekBounds.left
if left > seekLeft {
id = c.chain.prev.id
} else {
id = c.chain.id
rng = objectSDK.NewRange()
rng.SetOffset(seekLeft - left)
rng.SetLength(min(c.chain.bounds.right, c.seekBounds.right) - seekLeft)
}
return
}
func min(a, b uint64) uint64 {
if a < b {
return a
}
return b
}
func (c *RangeTraverser) PushHeader(obj *object.Object) {
id := obj.GetID()
if !id.Equal(c.chain.prev.id) {
panic(fmt.Sprintf("(%T) unexpected identifier in header", c))
}
sz := obj.GetPayloadSize()
c.chain.prev.bounds = &rangeBounds{
left: c.chain.bounds.left - sz,
right: c.chain.bounds.left,
}
c.chain = c.chain.prev
if prev := obj.GetPreviousID(); prev != nil {
c.chain.prev = &rangeChain{
next: c.chain,
id: prev,
}
}
}
func (c *RangeTraverser) PushSuccessSize(sz uint64) {
c.seekBounds.left += sz
if c.seekBounds.left >= c.chain.bounds.right && c.chain.next != nil {
c.chain = c.chain.next
}
}