frostfs-node/pkg/services/object/search/streamer.go
Leonard Lyubich 9148980bd0 [#193] services/object: Support client options in all Object services
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2020-11-24 15:34:30 +03:00

186 lines
3.6 KiB
Go

package searchsvc
import (
"context"
"io"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/pkg/errors"
)
type Streamer struct {
*cfg
once *sync.Once
prm *Prm
traverser *placement.Traverser
ctx context.Context
ch chan []*object.ID
cache [][]*object.ID
}
func (p *Streamer) Recv() (*Response, error) {
var err error
p.once.Do(func() {
if err = p.preparePrm(p.prm); err == nil {
go p.start(p.prm)
}
})
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not start streaming", p)
}
select {
case <-p.ctx.Done():
return nil, errors.Wrapf(p.ctx.Err(), "(%T) context is done", p)
case v, ok := <-p.ch:
if !ok {
return nil, io.EOF
}
v = p.cutCached(v)
return &Response{
idList: v,
}, nil
}
}
func (p *Streamer) cutCached(ids []*object.ID) []*object.ID {
loop:
for i := 0; i < len(ids); i++ {
for j := range p.cache {
for k := range p.cache[j] {
if ids[i].Equal(p.cache[j][k]) {
ids = append(ids[:i], ids[i+1:]...)
i--
continue loop
}
}
}
}
if len(ids) > 0 {
p.cache = append(p.cache, ids)
}
return ids
}
func (p *Streamer) preparePrm(prm *Prm) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return errors.Wrapf(err, "(%T) could not get latest network map", p)
}
// get container to store the object
cnr, err := p.cnrSrc.Get(prm.cid)
if err != nil {
return errors.Wrapf(err, "(%T) could not get container by ID", p)
}
// allocate placement traverser options
traverseOpts := make([]placement.Option, 0, 4)
// add common options
traverseOpts = append(traverseOpts,
// set processing container
placement.ForContainer(cnr),
)
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.common.LocalOnly() {
// restrict success count to 1 stored copy (to local storage)
traverseOpts = append(traverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = svcutil.NewLocalPlacement(builder, p.localAddrSrc)
}
// set placement builder
traverseOpts = append(traverseOpts, placement.UseBuilder(builder))
// build placement traverser
if p.traverser, err = placement.NewTraverser(traverseOpts...); err != nil {
return errors.Wrapf(err, "(%T) could not build placement traverser", p)
}
p.ch = make(chan []*object.ID)
return nil
}
func (p *Streamer) start(prm *Prm) {
defer close(p.ch)
loop:
for {
addrs := p.traverser.Next()
if len(addrs) == 0 {
break
}
wg := new(sync.WaitGroup)
for i := range addrs {
wg.Add(1)
addr := addrs[i]
if err := p.workerPool.Submit(func() {
defer wg.Done()
var streamer interface {
stream(context.Context, chan<- []*object.ID) error
}
if network.IsLocalAddress(p.localAddrSrc, addr) {
streamer = &localStream{
query: prm.query,
storage: p.localStore,
cid: prm.cid,
}
} else {
streamer = &remoteStream{
prm: prm,
keyStorage: p.keyStorage,
addr: addr,
clientCache: p.clientCache,
clientOpts: p.clientOpts,
}
}
if err := streamer.stream(p.ctx, p.ch); err != nil {
svcutil.LogServiceError(p.log, "SEARCH", addr, err)
}
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(p.log, "SEARCH", err)
break loop
}
}
wg.Wait()
}
}