From 8791c4f0a56efd388c3ec9e89f838f94e0784097 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 28 Sep 2020 16:10:50 +0300 Subject: [PATCH] [#53] object/range: Share object range traverser between services Replace object range chain utilities to object/util package in order to reuse it in other services. Signed-off-by: Leonard Lyubich --- pkg/services/object/range/prm.go | 2 - pkg/services/object/range/service.go | 13 ++-- pkg/services/object/range/streamer.go | 26 +++---- pkg/services/object/util/chain.go | 106 ++++++++++++++++++++++++++ 4 files changed, 126 insertions(+), 21 deletions(-) create mode 100644 pkg/services/object/util/chain.go diff --git a/pkg/services/object/range/prm.go b/pkg/services/object/range/prm.go index 730f8a5d13..80f2ded8ea 100644 --- a/pkg/services/object/range/prm.go +++ b/pkg/services/object/range/prm.go @@ -10,8 +10,6 @@ type Prm struct { addr *object.Address rng *object.Range - - traverser *rangeTraverser } func (p *Prm) OnlyLocal(v bool) *Prm { diff --git a/pkg/services/object/range/service.go b/pkg/services/object/range/service.go index 6f36d0e241..5aae6181d6 100644 --- a/pkg/services/object/range/service.go +++ b/pkg/services/object/range/service.go @@ -11,6 +11,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" "github.com/nspcc-dev/neofs-node/pkg/network" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" + objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/pkg/errors" ) @@ -82,7 +83,7 @@ func (s *Service) GetRange(ctx context.Context, prm *Prm) (*Result, error) { right = origin } - rngTraverser := newRangeTraverser(originSize, right, prm.rng) + rngTraverser := objutil.NewRangeTraverser(originSize, right, prm.rng) if err := s.fillTraverser(ctx, prm, rngTraverser); err != nil { return nil, errors.Wrapf(err, "(%T) could not fill range traverser", s) } @@ -99,17 +100,17 @@ func (s *Service) GetRange(ctx context.Context, prm *Prm) (*Result, error) { }, nil } -func (s *Service) fillTraverser(ctx context.Context, prm *Prm, traverser *rangeTraverser) error { +func (s *Service) fillTraverser(ctx context.Context, prm *Prm, traverser *objutil.RangeTraverser) error { addr := object.NewAddress() addr.SetContainerID(prm.addr.GetContainerID()) for { - next := traverser.next() - if next.rng != nil { + nextID, nextRng := traverser.Next() + if nextRng != nil { return nil } - addr.SetObjectID(next.id) + addr.SetObjectID(nextID) head, err := s.headSvc.Head(ctx, new(headsvc.Prm). WithAddress(addr). @@ -119,7 +120,7 @@ func (s *Service) fillTraverser(ctx context.Context, prm *Prm, traverser *rangeT return errors.Wrapf(err, "(%T) could not receive object header", s) } - traverser.pushHeader(head.Header()) + traverser.PushHeader(head.Header()) } } diff --git a/pkg/services/object/range/streamer.go b/pkg/services/object/range/streamer.go index f5ff0888b4..467c96383e 100644 --- a/pkg/services/object/range/streamer.go +++ b/pkg/services/object/range/streamer.go @@ -28,7 +28,7 @@ type streamer struct { traverser *placement.Traverser - rangeTraverser *rangeTraverser + rangeTraverser *util.RangeTraverser ch chan []byte } @@ -58,7 +58,7 @@ func (p *streamer) Recv() (*Response, error) { return nil, errors.Wrapf(p.ctx.Err(), "(%T) stream is stopped by context", p) case v, ok := <-p.ch: if !ok { - if p.rangeTraverser.next().rng.GetLength() != 0 { + if _, rng := p.rangeTraverser.Next(); rng.GetLength() != 0 { return nil, errors.Errorf("(%T) incomplete get payload range", p) } @@ -135,15 +135,15 @@ loop: default: } - nextRange := p.rangeTraverser.next() - if nextRange.rng.GetLength() == 0 { + nextID, nextRange := p.rangeTraverser.Next() + if nextRange.GetLength() == 0 { break - } else if err := p.switchToObject(nextRange.id); err != nil { + } else if err := p.switchToObject(nextID); err != nil { // TODO: log error break } - objAddr.SetObjectID(nextRange.id) + objAddr.SetObjectID(nextID) subloop: for { @@ -173,7 +173,7 @@ loop: if network.IsLocalAddress(p.localAddrSrc, addr) { rngWriter = &localRangeWriter{ addr: objAddr, - rng: nextRange.rng, + rng: nextRange, storage: p.localStore, } } else { @@ -182,7 +182,7 @@ loop: key: p.key, node: addr, addr: objAddr, - rng: nextRange.rng, + rng: nextRange, } } @@ -194,12 +194,12 @@ loop: // TODO: log error } - ln := nextRange.rng.GetLength() + ln := nextRange.GetLength() uw := uint64(written) - p.rangeTraverser.pushSuccessSize(uw) - nextRange.rng.SetLength(ln - uw) - nextRange.rng.SetOffset(nextRange.rng.GetOffset() + uw) + p.rangeTraverser.PushSuccessSize(uw) + nextRange.SetLength(ln - uw) + nextRange.SetOffset(nextRange.GetOffset() + uw) }); err != nil { wg.Done() // TODO: log error @@ -208,7 +208,7 @@ loop: wg.Wait() - if nextRange.rng.GetLength() == 0 { + if nextRange.GetLength() == 0 { p.traverser.SubmitSuccess() break subloop } diff --git a/pkg/services/object/util/chain.go b/pkg/services/object/util/chain.go new file mode 100644 index 0000000000..c7e8fe187d --- /dev/null +++ b/pkg/services/object/util/chain.go @@ -0,0 +1,106 @@ +package util + +import ( + "fmt" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" +) + +type RangeTraverser struct { + chain *rangeChain + + seekBounds *rangeBounds +} + +type rangeBounds struct { + left, right uint64 +} + +type rangeChain struct { + next, prev *rangeChain + + bounds *rangeBounds + + id *objectSDK.ID +} + +func NewRangeTraverser(originSize uint64, rightElement *object.Object, rngSeek *objectSDK.Range) *RangeTraverser { + right := &rangeChain{ + bounds: &rangeBounds{ + left: originSize - rightElement.GetPayloadSize(), + right: originSize, + }, + id: rightElement.GetID(), + } + + left := &rangeChain{ + id: rightElement.GetPreviousID(), + } + + left.next, right.prev = right, left + + return &RangeTraverser{ + chain: right, + seekBounds: &rangeBounds{ + left: rngSeek.GetOffset(), + right: rngSeek.GetOffset() + rngSeek.GetLength(), + }, + } +} + +func (c *RangeTraverser) Next() (id *objectSDK.ID, rng *objectSDK.Range) { + left := c.chain.bounds.left + seekLeft := c.seekBounds.left + + if left > seekLeft { + id = c.chain.prev.id + } else { + id = c.chain.id + rng = objectSDK.NewRange() + rng.SetOffset(seekLeft - left) + rng.SetLength(min(c.chain.bounds.right, c.seekBounds.right) - seekLeft) + } + + return +} + +func min(a, b uint64) uint64 { + if a < b { + return a + } + + return b +} + +func (c *RangeTraverser) PushHeader(obj *object.Object) { + id := obj.GetID() + if !id.Equal(c.chain.prev.id) { + panic(fmt.Sprintf("(%T) unexpected identifier in header", c)) + } + + sz := obj.GetPayloadSize() + + c.chain.prev.bounds = &rangeBounds{ + left: c.chain.bounds.left - sz, + right: c.chain.bounds.left, + } + + c.chain = c.chain.prev + + if prev := obj.GetPreviousID(); prev != nil { + c.chain.prev = &rangeChain{ + next: c.chain, + id: prev, + } + } +} + +func (c *RangeTraverser) PushSuccessSize(sz uint64) { + c.seekBounds.left += sz + + if c.seekBounds.left >= c.chain.bounds.right && c.chain.next != nil { + c.chain = c.chain.next + } +}