frostfs-node/pkg/services/object/range/streamer.go
Leonard Lyubich 9148980bd0 [#193] services/object: Support client options in all Object services
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2020-11-24 15:34:30 +03:00

241 lines
4.7 KiB
Go

package rangesvc
import (
"context"
"io"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/pkg/errors"
)
type Streamer interface {
Recv() (*Response, error)
}
type streamer struct {
*cfg
once *sync.Once
ctx context.Context
prm *Prm
traverser *placement.Traverser
rangeTraverser *svcutil.RangeTraverser
ch chan []byte
}
type chunkWriter struct {
ctx context.Context
ch chan<- []byte
written uint64
}
func (p *streamer) Recv() (*Response, error) {
var err error
p.once.Do(func() {
p.ch = make(chan []byte)
err = p.workerPool.Submit(p.start)
})
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not start streaming", p)
}
select {
case <-p.ctx.Done():
return nil, errors.Wrapf(p.ctx.Err(), "(%T) stream is stopped by context", p)
case v, ok := <-p.ch:
if !ok {
if _, rng := p.rangeTraverser.Next(); rng.GetLength() != 0 {
return nil, errors.Errorf("(%T) incomplete get payload range", p)
}
return nil, io.EOF
}
return &Response{
chunk: v,
}, nil
}
}
func (p *streamer) switchToObject(id *object.ID) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return errors.Wrapf(err, "(%T) could not get latest network map", p)
}
// get container to read payload range
cnr, err := p.cnrSrc.Get(p.prm.addr.ContainerID())
if err != nil {
return errors.Wrapf(err, "(%T) could not get container by ID", p)
}
// allocate placement traverser options
traverseOpts := make([]placement.Option, 0, 4)
// add common options
traverseOpts = append(traverseOpts,
// set processing container
placement.ForContainer(cnr),
// set success count (1st incoming full range)
placement.SuccessAfter(1),
// set identifier of the processing object
placement.ForObject(id),
)
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if p.prm.common.LocalOnly() {
// use local-only placement builder
builder = svcutil.NewLocalPlacement(builder, p.localAddrSrc)
}
// set placement builder
traverseOpts = append(traverseOpts, placement.UseBuilder(builder))
// build placement traverser
if p.traverser, err = placement.NewTraverser(traverseOpts...); err != nil {
return errors.Wrapf(err, "(%T) could not build placement traverser", p)
}
return nil
}
func (p *streamer) start() {
defer close(p.ch)
objAddr := object.NewAddress()
objAddr.SetContainerID(p.prm.addr.ContainerID())
loop:
for {
select {
case <-p.ctx.Done():
// TODO: log this
break loop
default:
}
nextID, nextRange := p.rangeTraverser.Next()
if nextRange.GetLength() == 0 {
break
} else if err := p.switchToObject(nextID); err != nil {
// TODO: log error
break
}
objAddr.SetObjectID(nextID)
subloop:
for {
select {
case <-p.ctx.Done():
// TODO: log this
break loop
default:
}
addrs := p.traverser.Next()
if len(addrs) == 0 {
break
}
for i := range addrs {
wg := new(sync.WaitGroup)
wg.Add(1)
addr := addrs[i]
if err := p.workerPool.Submit(func() {
defer wg.Done()
var rngWriter io.WriterTo
if network.IsLocalAddress(p.localAddrSrc, addr) {
rngWriter = &localRangeWriter{
addr: objAddr,
rng: nextRange,
storage: p.localStore,
}
} else {
rngWriter = &remoteRangeWriter{
ctx: p.ctx,
keyStorage: p.keyStorage,
node: addr,
token: p.prm.common.SessionToken(),
bearer: p.prm.common.BearerToken(),
addr: objAddr,
rng: nextRange,
clientCache: p.clientCache,
clientOpts: p.clientOpts,
}
}
written, err := rngWriter.WriteTo(&chunkWriter{
ctx: p.ctx,
ch: p.ch,
})
if err != nil {
svcutil.LogServiceError(p.log, "RANGE", addr, err)
}
ln := nextRange.GetLength()
uw := uint64(written)
p.rangeTraverser.PushSuccessSize(uw)
nextRange.SetLength(ln - uw)
nextRange.SetOffset(nextRange.GetOffset() + uw)
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(p.log, "RANGE", err)
break loop
}
wg.Wait()
if nextRange.GetLength() == 0 {
p.traverser.SubmitSuccess()
break subloop
}
}
}
if !p.traverser.Success() {
// TODO: log error
break loop
}
}
}
func (w *chunkWriter) Write(p []byte) (int, error) {
select {
case <-w.ctx.Done():
return 0, w.ctx.Err()
case w.ch <- p:
}
w.written += uint64(len(p))
return len(p), nil
}