forked from TrueCloudLab/frostfs-node
[#1826] services/object: Parallelize object search
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
90bfe0bad9
commit
2ac42b70ce
4 changed files with 45 additions and 57 deletions
|
@ -2,6 +2,8 @@ package searchsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -65,24 +67,53 @@ func (exec *execCtx) processCurrentEpoch() bool {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var mtx sync.Mutex
|
||||||
|
|
||||||
for i := range addrs {
|
for i := range addrs {
|
||||||
select {
|
wg.Add(1)
|
||||||
case <-ctx.Done():
|
go func(i int) {
|
||||||
exec.log.Debug("interrupt placement iteration by context",
|
defer wg.Done()
|
||||||
zap.String("error", ctx.Err().Error()),
|
select {
|
||||||
)
|
case <-ctx.Done():
|
||||||
|
exec.log.Debug("interrupt placement iteration by context",
|
||||||
|
zap.String("error", ctx.Err().Error()))
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
return true
|
var info client.NodeInfo
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: #1142 consider parallel execution
|
client.NodeInfoFromNetmapElement(&info, addrs[i])
|
||||||
var info client.NodeInfo
|
|
||||||
|
|
||||||
client.NodeInfoFromNetmapElement(&info, addrs[i])
|
exec.log.Debug("processing node...", zap.String("key", hex.EncodeToString(addrs[i].PublicKey())))
|
||||||
|
|
||||||
exec.processNode(ctx, info)
|
c, err := exec.svc.clientConstructor.get(info)
|
||||||
|
if err != nil {
|
||||||
|
mtx.Lock()
|
||||||
|
exec.status = statusUndefined
|
||||||
|
exec.err = err
|
||||||
|
mtx.Unlock()
|
||||||
|
|
||||||
|
exec.log.Debug("could not construct remote node client")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ids, err := c.searchObjects(exec, info)
|
||||||
|
if err != nil {
|
||||||
|
exec.log.Debug("remote operation failed",
|
||||||
|
zap.String("error", err.Error()))
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
mtx.Lock()
|
||||||
|
exec.writeIDList(ids)
|
||||||
|
mtx.Unlock()
|
||||||
|
}(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -3,7 +3,6 @@ package searchsvc
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
|
@ -118,21 +117,6 @@ func (exec *execCtx) generateTraverser(cnr cid.ID) (*placement.Traverser, bool)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec execCtx) remoteClient(info client.NodeInfo) (searchClient, bool) {
|
|
||||||
c, err := exec.svc.clientConstructor.get(info)
|
|
||||||
switch {
|
|
||||||
default:
|
|
||||||
exec.status = statusUndefined
|
|
||||||
exec.err = err
|
|
||||||
|
|
||||||
exec.log.Debug("could not construct remote node client")
|
|
||||||
case err == nil:
|
|
||||||
return c, true
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) writeIDList(ids []oid.ID) {
|
func (exec *execCtx) writeIDList(ids []oid.ID) {
|
||||||
err := exec.prm.writer.WriteIDs(ids)
|
err := exec.prm.writer.WriteIDs(ids)
|
||||||
|
|
||||||
|
|
|
@ -1,29 +0,0 @@
|
||||||
package searchsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) {
|
|
||||||
exec.log.Debug("processing node...")
|
|
||||||
|
|
||||||
client, ok := exec.remoteClient(info)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ids, err := client.searchObjects(exec, info)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
exec.log.Debug("remote operation failed",
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
exec.writeIDList(ids)
|
|
||||||
}
|
|
|
@ -22,6 +22,8 @@ type Service struct {
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
type searchClient interface {
|
type searchClient interface {
|
||||||
|
// searchObjects searches objects on the specified node.
|
||||||
|
// MUST NOT modify execCtx as it can be accessed concurrently.
|
||||||
searchObjects(*execCtx, client.NodeInfo) ([]oid.ID, error)
|
searchObjects(*execCtx, client.NodeInfo) ([]oid.ID, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue