d485a5967d
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
183 lines
3.5 KiB
Go
183 lines
3.5 KiB
Go
package searchsvc
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
type Streamer struct {
|
|
*cfg
|
|
|
|
once *sync.Once
|
|
|
|
prm *Prm
|
|
|
|
traverser *placement.Traverser
|
|
|
|
ctx context.Context
|
|
|
|
ch chan []*object.ID
|
|
|
|
cache [][]*object.ID
|
|
}
|
|
|
|
func (p *Streamer) Recv() (*Response, error) {
|
|
var err error
|
|
|
|
p.once.Do(func() {
|
|
if err = p.preparePrm(p.prm); err == nil {
|
|
go p.start(p.prm)
|
|
}
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "(%T) could not start streaming", p)
|
|
}
|
|
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return nil, errors.Wrapf(p.ctx.Err(), "(%T) context is done", p)
|
|
case v, ok := <-p.ch:
|
|
if !ok {
|
|
return nil, io.EOF
|
|
}
|
|
|
|
v = p.cutCached(v)
|
|
|
|
return &Response{
|
|
idList: v,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
func (p *Streamer) cutCached(ids []*object.ID) []*object.ID {
|
|
loop:
|
|
for i := 0; i < len(ids); i++ {
|
|
for j := range p.cache {
|
|
for k := range p.cache[j] {
|
|
if ids[i].Equal(p.cache[j][k]) {
|
|
ids = append(ids[:i], ids[i+1:]...)
|
|
|
|
i--
|
|
|
|
continue loop
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(ids) > 0 {
|
|
p.cache = append(p.cache, ids)
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
func (p *Streamer) preparePrm(prm *Prm) error {
|
|
var err error
|
|
|
|
// get latest network map
|
|
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not get latest network map", p)
|
|
}
|
|
|
|
// get container to store the object
|
|
cnr, err := p.cnrSrc.Get(prm.cid)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not get container by ID", p)
|
|
}
|
|
|
|
// allocate placement traverser options
|
|
traverseOpts := make([]placement.Option, 0, 4)
|
|
|
|
// add common options
|
|
traverseOpts = append(traverseOpts,
|
|
// set processing container
|
|
placement.ForContainer(cnr),
|
|
)
|
|
|
|
// create placement builder from network map
|
|
builder := placement.NewNetworkMapBuilder(nm)
|
|
|
|
if prm.common.LocalOnly() {
|
|
// restrict success count to 1 stored copy (to local storage)
|
|
traverseOpts = append(traverseOpts, placement.SuccessAfter(1))
|
|
|
|
// use local-only placement builder
|
|
builder = util.NewLocalPlacement(builder, p.localAddrSrc)
|
|
}
|
|
|
|
// set placement builder
|
|
traverseOpts = append(traverseOpts, placement.UseBuilder(builder))
|
|
|
|
// build placement traverser
|
|
if p.traverser, err = placement.NewTraverser(traverseOpts...); err != nil {
|
|
return errors.Wrapf(err, "(%T) could not build placement traverser", p)
|
|
}
|
|
|
|
p.ch = make(chan []*object.ID)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Streamer) start(prm *Prm) {
|
|
defer close(p.ch)
|
|
|
|
loop:
|
|
for {
|
|
addrs := p.traverser.Next()
|
|
if len(addrs) == 0 {
|
|
break
|
|
}
|
|
|
|
wg := new(sync.WaitGroup)
|
|
|
|
for i := range addrs {
|
|
wg.Add(1)
|
|
|
|
addr := addrs[i]
|
|
|
|
if err := p.workerPool.Submit(func() {
|
|
defer wg.Done()
|
|
|
|
var streamer interface {
|
|
stream(context.Context, chan<- []*object.ID) error
|
|
}
|
|
|
|
if network.IsLocalAddress(p.localAddrSrc, addr) {
|
|
streamer = &localStream{
|
|
query: prm.query,
|
|
storage: p.localStore,
|
|
cid: prm.cid,
|
|
}
|
|
} else {
|
|
streamer = &remoteStream{
|
|
prm: prm,
|
|
keyStorage: p.keyStorage,
|
|
addr: addr,
|
|
clientCache: p.clientCache,
|
|
}
|
|
}
|
|
|
|
if err := streamer.stream(p.ctx, p.ch); err != nil {
|
|
// TODO: log error
|
|
}
|
|
}); err != nil {
|
|
wg.Done()
|
|
// TODO: log error
|
|
break loop
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
}
|