From 0490107165a73affb277e2e6a65f9408ccad192f Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 25 Sep 2020 16:04:55 +0300 Subject: [PATCH] [#50] services/object: Implement GetRange service Signed-off-by: Leonard Lyubich --- go.mod | 2 +- go.sum | Bin 58361 -> 58116 bytes pkg/services/object/range/chain.go | 112 +++++++++++ pkg/services/object/range/local.go | 36 ++++ pkg/services/object/range/prm.go | 39 ++++ pkg/services/object/range/remote.go | 49 +++++ pkg/services/object/range/res.go | 9 + pkg/services/object/range/service.go | 159 +++++++++++++++ pkg/services/object/range/streamer.go | 235 +++++++++++++++++++++++ pkg/services/object/range/v2/service.go | 50 +++++ pkg/services/object/range/v2/streamer.go | 27 +++ pkg/services/object/range/v2/util.go | 25 +++ 12 files changed, 742 insertions(+), 1 deletion(-) create mode 100644 pkg/services/object/range/chain.go create mode 100644 pkg/services/object/range/local.go create mode 100644 pkg/services/object/range/prm.go create mode 100644 pkg/services/object/range/remote.go create mode 100644 pkg/services/object/range/res.go create mode 100644 pkg/services/object/range/service.go create mode 100644 pkg/services/object/range/streamer.go create mode 100644 pkg/services/object/range/v2/service.go create mode 100644 pkg/services/object/range/v2/streamer.go create mode 100644 pkg/services/object/range/v2/util.go diff --git a/go.mod b/go.mod index 0105a8ba..d05dfb7b 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/multiformats/go-multiaddr-net v0.1.2 // v0.1.1 => v0.1.2 github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78 - github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200922150714-14fa89b81919 + github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200925125840-c814cc62faf4 github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 diff --git a/go.sum b/go.sum index df9a20cd5d4f7b3cbf18bf9533bcdaaccc7c1619..799130b0283681b24187cc7d1ecad72254a626a4 100644 GIT binary patch delta 111 zcmex)oVn!~^M>bX&PJw&My3`f2D-@>h9=3$W=3g=X(kF8hF03y`G%o6?!^Ha-fm|8 xhT2gs{^jB3Ier-tKCao?srr8U=_$F1rha~2lMAyXCLc(b6~SiI=8TMq)d2kuBl7?N delta 217 zcmZoU#{Ba*^M>bXE`}Bc7DlEf#=5DN=4s{zmS)Lj$tDUJhE|pZer6F7mX=Yfs+rWN=-hOCMkl=u*pT~A(K7Q#hr}| zO%2QqO>_-S(h@B!lPnA^4J|n_CSL7N%Es_$!X4dB7^oi8~ZW=`4 diff --git a/pkg/services/object/range/chain.go b/pkg/services/object/range/chain.go new file mode 100644 index 00000000..18e85a2e --- /dev/null +++ b/pkg/services/object/range/chain.go @@ -0,0 +1,112 @@ +package rangesvc + +import ( + "fmt" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" +) + +type rangeTraverser struct { + chain *rangeChain + + seekBounds *rangeBounds +} + +type rangeBounds struct { + left, right uint64 +} + +type objectRange struct { + rng *objectSDK.Range + + id *objectSDK.ID +} + +type rangeChain struct { + next, prev *rangeChain + + bounds *rangeBounds + + id *objectSDK.ID +} + +func newRangeTraverser(originSize uint64, rightElement *object.Object, rngSeek *objectSDK.Range) *rangeTraverser { + right := &rangeChain{ + bounds: &rangeBounds{ + left: originSize - rightElement.GetPayloadSize(), + right: originSize, + }, + id: rightElement.GetID(), + } + + left := &rangeChain{ + id: rightElement.GetPreviousID(), + } + + left.next, right.prev = right, left + + return &rangeTraverser{ + chain: right, + seekBounds: &rangeBounds{ + left: rngSeek.GetOffset(), + right: rngSeek.GetOffset() + rngSeek.GetLength(), + }, + } +} + +func (c *rangeTraverser) next() *objectRange { + left := c.chain.bounds.left + seekLeft := c.seekBounds.left + + res := new(objectRange) + + if left > seekLeft { + res.id = c.chain.prev.id + } else { + res.id = c.chain.id + res.rng = objectSDK.NewRange() + res.rng.SetOffset(seekLeft - left) + res.rng.SetLength(min(c.chain.bounds.right, c.seekBounds.right) - seekLeft) + } + + return res +} + +func min(a, b uint64) uint64 { + if a < b { + return a + } + + return b +} + +func (c *rangeTraverser) pushHeader(obj *object.Object) { + id := obj.GetID() + if !id.Equal(c.chain.prev.id) { + panic(fmt.Sprintf("(%T) unexpected identifier in header", c)) + } + + sz := obj.GetPayloadSize() + + c.chain.prev.bounds = &rangeBounds{ + left: c.chain.bounds.left - sz, + right: c.chain.bounds.left, + } + + c.chain = c.chain.prev + + c.chain.prev = &rangeChain{ + next: c.chain, + id: obj.GetPreviousID(), + } +} + +func (c *rangeTraverser) pushSuccessSize(sz uint64) { + c.seekBounds.left += sz + + if c.seekBounds.left >= c.chain.bounds.right && c.chain.next != nil { + c.chain = c.chain.next + } +} diff --git a/pkg/services/object/range/local.go b/pkg/services/object/range/local.go new file mode 100644 index 00000000..5934a9dd --- /dev/null +++ b/pkg/services/object/range/local.go @@ -0,0 +1,36 @@ +package rangesvc + +import ( + "io" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/pkg/errors" +) + +type localRangeWriter struct { + addr *object.Address + + rng *object.Range + + storage *localstore.Storage +} + +func (l *localRangeWriter) WriteTo(w io.Writer) (int64, error) { + obj, err := l.storage.Get(l.addr) + if err != nil { + return 0, errors.Wrapf(err, "(%T) could not get object from local storage", l) + } + + payload := obj.GetPayload() + left := l.rng.GetOffset() + right := left + l.rng.GetLength() + + if ln := uint64(len(payload)); ln < right { + return 0, errors.Errorf("(%T) object range is out-of-boundaries (size %d, range [%d:%d]", l, ln, left, right) + } + + n, err := w.Write(payload[left:right]) + + return int64(n), err +} diff --git a/pkg/services/object/range/prm.go b/pkg/services/object/range/prm.go new file mode 100644 index 00000000..8d9f01ed --- /dev/null +++ b/pkg/services/object/range/prm.go @@ -0,0 +1,39 @@ +package rangesvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +type Prm struct { + local bool + + addr *object.Address + + rng *object.Range + + traverser *rangeTraverser +} + +func (p *Prm) OnlyLocal(v bool) *Prm { + if p != nil { + p.local = v + } + + return p +} + +func (p *Prm) WithAddress(v *object.Address) *Prm { + if p != nil { + p.addr = v + } + + return p +} + +func (p *Prm) WithRange(v *object.Range) *Prm { + if p != nil { + p.rng = v + } + + return p +} diff --git a/pkg/services/object/range/remote.go b/pkg/services/object/range/remote.go new file mode 100644 index 00000000..821de474 --- /dev/null +++ b/pkg/services/object/range/remote.go @@ -0,0 +1,49 @@ +package rangesvc + +import ( + "context" + "crypto/ecdsa" + "io" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/pkg/errors" +) + +type remoteRangeWriter struct { + ctx context.Context + + key *ecdsa.PrivateKey + + node *network.Address + + addr *object.Address + + rng *object.Range +} + +func (r *remoteRangeWriter) WriteTo(w io.Writer) (int64, error) { + addr := r.node.NetAddr() + + c, err := client.New(r.key, + client.WithAddress(addr), + ) + if err != nil { + return 0, errors.Wrapf(err, "(%T) could not create SDK client %s", r, addr) + } + + // TODO: change ObjectPayloadRangeData to implement WriterTo + chunk, err := c.ObjectPayloadRangeData(r.ctx, new(client.RangeDataParams). + WithRange(r.rng). + WithAddress(r.addr), + client.WithTTL(1), // FIXME: use constant + ) + if err != nil { + return 0, errors.Wrapf(err, "(%T) could not read object payload range from %s", r, addr) + } + + n, err := w.Write(chunk) + + return int64(n), err +} diff --git a/pkg/services/object/range/res.go b/pkg/services/object/range/res.go new file mode 100644 index 00000000..fcdeb289 --- /dev/null +++ b/pkg/services/object/range/res.go @@ -0,0 +1,9 @@ +package rangesvc + +type Response struct { + chunk []byte +} + +func (r *Response) PayloadChunk() []byte { + return r.chunk +} diff --git a/pkg/services/object/range/service.go b/pkg/services/object/range/service.go new file mode 100644 index 00000000..f9d2b8f9 --- /dev/null +++ b/pkg/services/object/range/service.go @@ -0,0 +1,159 @@ +package rangesvc + +import ( + "context" + "crypto/ecdsa" + "sync" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/nspcc-dev/neofs-node/pkg/network" + headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" + "github.com/nspcc-dev/neofs-node/pkg/util" + "github.com/pkg/errors" +) + +type Service struct { + *cfg +} + +type Option func(*cfg) + +type cfg struct { + key *ecdsa.PrivateKey + + localStore *localstore.Storage + + cnrSrc container.Source + + netMapSrc netmap.Source + + workerPool util.WorkerPool + + localAddrSrc network.LocalAddressSource + + headSvc *headsvc.Service +} + +func defaultCfg() *cfg { + return &cfg{ + workerPool: new(util.SyncWorkerPool), + } +} + +func NewService(opts ...Option) *Service { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +func (s *Service) GetRange(ctx context.Context, prm *Prm) (Streamer, error) { + headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm). + WithAddress(prm.addr). + OnlyLocal(prm.local), + ) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not receive Head result", s) + } + + off, ln := prm.rng.GetOffset(), prm.rng.GetLength() + + origin := headResult.Header() + + originSize := origin.GetPayloadSize() + if originSize < off+ln { + return nil, errors.Errorf("(%T) requested payload range is out-of-bounds", s) + } + + right := headResult.RightChild() + if right == nil { + right = origin + } + + rngTraverser := newRangeTraverser(originSize, right, prm.rng) + if err := s.fillTraverser(ctx, prm, rngTraverser); err != nil { + return nil, errors.Wrapf(err, "(%T) could not fill range traverser", s) + } + + return &streamer{ + cfg: s.cfg, + once: new(sync.Once), + ctx: ctx, + prm: prm, + rangeTraverser: rngTraverser, + }, nil +} + +func (s *Service) fillTraverser(ctx context.Context, prm *Prm, traverser *rangeTraverser) error { + addr := object.NewAddress() + addr.SetContainerID(prm.addr.GetContainerID()) + + for { + next := traverser.next() + if next.rng != nil { + return nil + } + + addr.SetObjectID(next.id) + + head, err := s.headSvc.Head(ctx, new(headsvc.Prm). + WithAddress(addr). + OnlyLocal(prm.local), + ) + if err != nil { + return errors.Wrapf(err, "(%T) could not receive object header", s) + } + + traverser.pushHeader(head.Header()) + } +} + +func WithKey(v *ecdsa.PrivateKey) Option { + return func(c *cfg) { + c.key = v + } +} + +func WithLocalStorage(v *localstore.Storage) Option { + return func(c *cfg) { + c.localStore = v + } +} + +func WithContainerSource(v container.Source) Option { + return func(c *cfg) { + c.cnrSrc = v + } +} + +func WithNetworkMapSource(v netmap.Source) Option { + return func(c *cfg) { + c.netMapSrc = v + } +} + +func WithWorkerPool(v util.WorkerPool) Option { + return func(c *cfg) { + c.workerPool = v + } +} + +func WithLocalAddressSource(v network.LocalAddressSource) Option { + return func(c *cfg) { + c.localAddrSrc = v + } +} + +func WithHeadService(v *headsvc.Service) Option { + return func(c *cfg) { + c.headSvc = v + } +} diff --git a/pkg/services/object/range/streamer.go b/pkg/services/object/range/streamer.go new file mode 100644 index 00000000..f5ff0888 --- /dev/null +++ b/pkg/services/object/range/streamer.go @@ -0,0 +1,235 @@ +package rangesvc + +import ( + "context" + "io" + "sync" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/pkg/errors" +) + +type Streamer interface { + Recv() (*Response, error) +} + +type streamer struct { + *cfg + + once *sync.Once + + ctx context.Context + + prm *Prm + + traverser *placement.Traverser + + rangeTraverser *rangeTraverser + + ch chan []byte +} + +type chunkWriter struct { + ctx context.Context + + ch chan<- []byte + + written uint64 +} + +func (p *streamer) Recv() (*Response, error) { + var err error + + p.once.Do(func() { + p.ch = make(chan []byte) + err = p.workerPool.Submit(p.start) + }) + + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not start streaming", p) + } + + select { + case <-p.ctx.Done(): + return nil, errors.Wrapf(p.ctx.Err(), "(%T) stream is stopped by context", p) + case v, ok := <-p.ch: + if !ok { + if p.rangeTraverser.next().rng.GetLength() != 0 { + return nil, errors.Errorf("(%T) incomplete get payload range", p) + } + + return nil, io.EOF + } + + return &Response{ + chunk: v, + }, nil + } +} + +func (p *streamer) switchToObject(id *object.ID) error { + var err error + + // get latest network map + nm, err := netmap.GetLatestNetworkMap(p.netMapSrc) + if err != nil { + return errors.Wrapf(err, "(%T) could not get latest network map", p) + } + + // get container to read payload range + cnr, err := p.cnrSrc.Get(p.prm.addr.GetContainerID()) + if err != nil { + return errors.Wrapf(err, "(%T) could not get container by ID", p) + } + + // allocate placement traverser options + traverseOpts := make([]placement.Option, 0, 4) + + // add common options + traverseOpts = append(traverseOpts, + // set processing container + placement.ForContainer(cnr), + + // set success count (1st incoming full range) + placement.SuccessAfter(1), + + // set identifier of the processing object + placement.ForObject(id), + ) + + // create placement builder from network map + builder := placement.NewNetworkMapBuilder(nm) + + if p.prm.local { + // use local-only placement builder + builder = util.NewLocalPlacement(builder, p.localAddrSrc) + } + + // set placement builder + traverseOpts = append(traverseOpts, placement.UseBuilder(builder)) + + // build placement traverser + if p.traverser, err = placement.NewTraverser(traverseOpts...); err != nil { + return errors.Wrapf(err, "(%T) could not build placement traverser", p) + } + + return nil +} + +func (p *streamer) start() { + defer close(p.ch) + + objAddr := object.NewAddress() + objAddr.SetContainerID(p.prm.addr.GetContainerID()) + +loop: + for { + select { + case <-p.ctx.Done(): + // TODO: log this + break loop + default: + } + + nextRange := p.rangeTraverser.next() + if nextRange.rng.GetLength() == 0 { + break + } else if err := p.switchToObject(nextRange.id); err != nil { + // TODO: log error + break + } + + objAddr.SetObjectID(nextRange.id) + + subloop: + for { + select { + case <-p.ctx.Done(): + // TODO: log this + break loop + default: + } + + addrs := p.traverser.Next() + if len(addrs) == 0 { + break + } + + for i := range addrs { + wg := new(sync.WaitGroup) + wg.Add(1) + + addr := addrs[i] + + if err := p.workerPool.Submit(func() { + defer wg.Done() + + var rngWriter io.WriterTo + + if network.IsLocalAddress(p.localAddrSrc, addr) { + rngWriter = &localRangeWriter{ + addr: objAddr, + rng: nextRange.rng, + storage: p.localStore, + } + } else { + rngWriter = &remoteRangeWriter{ + ctx: p.ctx, + key: p.key, + node: addr, + addr: objAddr, + rng: nextRange.rng, + } + } + + written, err := rngWriter.WriteTo(&chunkWriter{ + ctx: p.ctx, + ch: p.ch, + }) + if err != nil { + // TODO: log error + } + + ln := nextRange.rng.GetLength() + uw := uint64(written) + + p.rangeTraverser.pushSuccessSize(uw) + nextRange.rng.SetLength(ln - uw) + nextRange.rng.SetOffset(nextRange.rng.GetOffset() + uw) + }); err != nil { + wg.Done() + // TODO: log error + break loop + } + + wg.Wait() + + if nextRange.rng.GetLength() == 0 { + p.traverser.SubmitSuccess() + break subloop + } + } + } + + if !p.traverser.Success() { + // TODO: log error + break loop + } + } +} + +func (w *chunkWriter) Write(p []byte) (int, error) { + select { + case <-w.ctx.Done(): + return 0, w.ctx.Err() + case w.ch <- p: + } + + w.written += uint64(len(p)) + + return len(p), nil +} diff --git a/pkg/services/object/range/v2/service.go b/pkg/services/object/range/v2/service.go new file mode 100644 index 00000000..f7516cb8 --- /dev/null +++ b/pkg/services/object/range/v2/service.go @@ -0,0 +1,50 @@ +package rangesvc + +import ( + "context" + + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" + "github.com/pkg/errors" +) + +// Service implements GetRange operation of Object service v2. +type Service struct { + *cfg +} + +// Option represents Service constructor option. +type Option func(*cfg) + +type cfg struct { + svc *rangesvc.Service +} + +// NewService constructs Service instance from provided options. +func NewService(opts ...Option) *Service { + c := new(cfg) + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +// GetRange calls internal service and returns v2 object payload range stream. +func (s *Service) GetRange(ctx context.Context, req *objectV2.GetRangeRequest) (objectV2.GetRangeObjectStreamer, error) { + stream, err := s.svc.GetRange(ctx, toPrm(req)) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not get object payload range data", s) + } + + return fromResponse(stream), nil +} + +func WithInternalService(v *rangesvc.Service) Option { + return func(c *cfg) { + c.svc = v + } +} diff --git a/pkg/services/object/range/v2/streamer.go b/pkg/services/object/range/v2/streamer.go new file mode 100644 index 00000000..c3c82525 --- /dev/null +++ b/pkg/services/object/range/v2/streamer.go @@ -0,0 +1,27 @@ +package rangesvc + +import ( + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" + "github.com/pkg/errors" +) + +type streamer struct { + stream rangesvc.Streamer + + body *objectV2.GetRangeResponseBody +} + +func (s *streamer) Recv() (*objectV2.GetRangeResponse, error) { + r, err := s.stream.Recv() + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not read response from stream", s) + } + + s.body.SetChunk(r.PayloadChunk()) + + resp := new(objectV2.GetRangeResponse) + resp.SetBody(s.body) + + return resp, nil +} diff --git a/pkg/services/object/range/v2/util.go b/pkg/services/object/range/v2/util.go new file mode 100644 index 00000000..2f03f544 --- /dev/null +++ b/pkg/services/object/range/v2/util.go @@ -0,0 +1,25 @@ +package rangesvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" +) + +func toPrm(req *objectV2.GetRangeRequest) *rangesvc.Prm { + body := req.GetBody() + + return new(rangesvc.Prm). + WithAddress( + object.NewAddressFromV2(body.GetAddress()), + ). + WithRange(object.NewRangeFromV2(body.GetRange())). + OnlyLocal(req.GetMetaHeader().GetTTL() == 1) // FIXME: use constant +} + +func fromResponse(stream rangesvc.Streamer) objectV2.GetRangeObjectStreamer { + return &streamer{ + stream: stream, + body: new(objectV2.GetRangeResponseBody), + } +}