diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go index 61ff9402..5c238cde 100644 --- a/pkg/services/object/search/container.go +++ b/pkg/services/object/search/container.go @@ -2,6 +2,8 @@ package searchsvc import ( "context" + "encoding/hex" + "sync" "github.com/nspcc-dev/neofs-node/pkg/core/client" "go.uber.org/zap" @@ -65,24 +67,53 @@ func (exec *execCtx) processCurrentEpoch() bool { break } + var wg sync.WaitGroup + var mtx sync.Mutex + for i := range addrs { - select { - case <-ctx.Done(): - exec.log.Debug("interrupt placement iteration by context", - zap.String("error", ctx.Err().Error()), - ) + wg.Add(1) + go func(i int) { + defer wg.Done() + select { + case <-ctx.Done(): + exec.log.Debug("interrupt placement iteration by context", + zap.String("error", ctx.Err().Error())) + return + default: + } - return true - default: - } + var info client.NodeInfo - // TODO: #1142 consider parallel execution - var info client.NodeInfo + client.NodeInfoFromNetmapElement(&info, addrs[i]) - client.NodeInfoFromNetmapElement(&info, addrs[i]) + exec.log.Debug("processing node...", zap.String("key", hex.EncodeToString(addrs[i].PublicKey()))) - exec.processNode(ctx, info) + c, err := exec.svc.clientConstructor.get(info) + if err != nil { + mtx.Lock() + exec.status = statusUndefined + exec.err = err + mtx.Unlock() + + exec.log.Debug("could not construct remote node client") + return + } + + ids, err := c.searchObjects(exec, info) + if err != nil { + exec.log.Debug("remote operation failed", + zap.String("error", err.Error())) + + return + } + + mtx.Lock() + exec.writeIDList(ids) + mtx.Unlock() + }(i) } + + wg.Wait() } return false diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go index 97b5dc27..4e7100f0 100644 --- a/pkg/services/object/search/exec.go +++ b/pkg/services/object/search/exec.go @@ -3,7 +3,6 @@ package searchsvc import ( "context" - "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/logger" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -118,21 +117,6 @@ func (exec *execCtx) generateTraverser(cnr cid.ID) (*placement.Traverser, bool) } } -func (exec execCtx) remoteClient(info client.NodeInfo) (searchClient, bool) { - c, err := exec.svc.clientConstructor.get(info) - switch { - default: - exec.status = statusUndefined - exec.err = err - - exec.log.Debug("could not construct remote node client") - case err == nil: - return c, true - } - - return nil, false -} - func (exec *execCtx) writeIDList(ids []oid.ID) { err := exec.prm.writer.WriteIDs(ids) diff --git a/pkg/services/object/search/remote.go b/pkg/services/object/search/remote.go deleted file mode 100644 index a51cec6d..00000000 --- a/pkg/services/object/search/remote.go +++ /dev/null @@ -1,29 +0,0 @@ -package searchsvc - -import ( - "context" - - "github.com/nspcc-dev/neofs-node/pkg/core/client" - "go.uber.org/zap" -) - -func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) { - exec.log.Debug("processing node...") - - client, ok := exec.remoteClient(info) - if !ok { - return - } - - ids, err := client.searchObjects(exec, info) - - if err != nil { - exec.log.Debug("remote operation failed", - zap.String("error", err.Error()), - ) - - return - } - - exec.writeIDList(ids) -} diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index 227c4a2b..231c19d9 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -22,6 +22,8 @@ type Service struct { type Option func(*cfg) type searchClient interface { + // searchObjects searches objects on the specified node. + // MUST NOT modify execCtx as it can be accessed concurrently. searchObjects(*execCtx, client.NodeInfo) ([]oid.ID, error) }