diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 54388036..b9f541a1 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -286,7 +286,7 @@ func initObjectService(c *cfg) { sSearch := searchsvc.New( searchsvc.WithLogger(c.log), searchsvc.WithLocalStorageEngine(ls), - searchsvc.WithClientConstructor(coreConstructor), + searchsvc.WithClientConstructor(groupConstructor), searchsvc.WithTraverserGenerator( traverseGen.WithTraverseOptions( placement.WithoutSuccessTracking(), diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go index 25e3a4d2..d287897c 100644 --- a/pkg/services/object/search/container.go +++ b/pkg/services/object/search/container.go @@ -3,6 +3,7 @@ package searchsvc import ( "context" + "github.com/nspcc-dev/neofs-node/pkg/network" "go.uber.org/zap" ) @@ -76,7 +77,7 @@ func (exec *execCtx) processCurrentEpoch() bool { } // TODO: consider parallel execution - exec.processNode(ctx, addrs[i]) + exec.processNode(ctx, network.GroupFromAddress(addrs[i])) } } diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go index 75fababa..784506b9 100644 --- a/pkg/services/object/search/exec.go +++ b/pkg/services/object/search/exec.go @@ -117,16 +117,14 @@ func (exec *execCtx) generateTraverser(cid *cid.ID) (*placement.Traverser, bool) } } -func (exec execCtx) remoteClient(node network.Address) (searchClient, bool) { - log := exec.log.With(zap.Stringer("node", node)) - +func (exec execCtx) remoteClient(node network.AddressGroup) (searchClient, bool) { c, err := exec.svc.clientConstructor.get(node) switch { default: exec.status = statusUndefined exec.err = err - log.Debug("could not construct remote node client") + exec.log.Debug("could not construct remote node client") case err == nil: return c, true } diff --git a/pkg/services/object/search/prm.go b/pkg/services/object/search/prm.go index 3340cb40..31719c5e 100644 --- a/pkg/services/object/search/prm.go +++ b/pkg/services/object/search/prm.go @@ -27,7 +27,7 @@ type IDListWriter interface { // RequestForwarder is a callback for forwarding of the // original Search requests. -type RequestForwarder func(network.Address, coreclient.Client) ([]*objectSDK.ID, error) +type RequestForwarder func(network.AddressGroup, coreclient.Client) ([]*objectSDK.ID, error) // SetCommonParameters sets common parameters of the operation. func (p *Prm) SetCommonParameters(common *util.CommonPrm) { diff --git a/pkg/services/object/search/remote.go b/pkg/services/object/search/remote.go index d4646b3d..332a17eb 100644 --- a/pkg/services/object/search/remote.go +++ b/pkg/services/object/search/remote.go @@ -7,10 +7,8 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) processNode(ctx context.Context, addr network.Address) { - log := exec.log.With(zap.Stringer("remote node", addr)) - - log.Debug("processing node...") +func (exec *execCtx) processNode(ctx context.Context, addr network.AddressGroup) { + exec.log.Debug("processing node...") client, ok := exec.remoteClient(addr) if !ok { diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 8e8c200c..8ec231b9 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -84,8 +84,8 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap return res, nil } -func (c *testClientCache) get(mAddr network.Address) (searchClient, error) { - v, ok := c.clients[mAddr.HostAddr()] +func (c *testClientCache) get(mAddr network.AddressGroup) (searchClient, error) { + v, ok := c.clients[network.StringifyGroup(mAddr)] if !ok { return nil, errors.New("could not construct client") } @@ -102,7 +102,7 @@ func (s *testStorage) search(exec *execCtx) ([]*objectSDK.ID, error) { return v.ids, v.err } -func (c *testStorage) searchObjects(exec *execCtx, _ network.Address) ([]*objectSDK.ID, error) { +func (c *testStorage) searchObjects(exec *execCtx, _ network.AddressGroup) ([]*objectSDK.ID, error) { v, ok := c.items[exec.containerID().String()] if !ok { return nil, nil @@ -200,16 +200,16 @@ func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) { strconv.Itoa(60000+j), ) - var na network.Address - - err := na.FromString(a) - require.NoError(t, err) - - as[j] = na.HostAddr() - ni := netmap.NewNodeInfo() ni.SetAddress(a) + var na network.AddressGroup + + err := na.FromIterator(ni) + require.NoError(t, err) + + as[j] = network.StringifyGroup(na) + ns[j] = *ni } diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index b68fee63..cd5c2820 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -23,11 +23,11 @@ type Service struct { type Option func(*cfg) type searchClient interface { - searchObjects(*execCtx, network.Address) ([]*object.ID, error) + searchObjects(*execCtx, network.AddressGroup) ([]*object.ID, error) } type ClientConstructor interface { - Get(network.Address) (client.Client, error) + Get(network.AddressGroup) (client.Client, error) } type cfg struct { @@ -38,7 +38,7 @@ type cfg struct { } clientConstructor interface { - get(network.Address) (searchClient, error) + get(network.AddressGroup) (searchClient, error) } traverserGenerator interface { diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index 81cecbac..bb26bc39 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -68,7 +68,7 @@ func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error { return w.writer.WriteIDs(list) } -func (c *clientConstructorWrapper) get(addr network.Address) (searchClient, error) { +func (c *clientConstructorWrapper) get(addr network.AddressGroup) (searchClient, error) { clt, err := c.constructor.Get(addr) return &clientWrapper{ @@ -76,7 +76,7 @@ func (c *clientConstructorWrapper) get(addr network.Address) (searchClient, erro }, err } -func (c *clientWrapper) searchObjects(exec *execCtx, addr network.Address) ([]*objectSDK.ID, error) { +func (c *clientWrapper) searchObjects(exec *execCtx, addr network.AddressGroup) ([]*objectSDK.ID, error) { if exec.prm.forwarder != nil { return exec.prm.forwarder(addr, c.client) } diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go index b2d4c5d3..ce8b2bd1 100644 --- a/pkg/services/object/search/v2/util.go +++ b/pkg/services/object/search/v2/util.go @@ -46,7 +46,7 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre if !commonPrm.LocalOnly() { var onceResign sync.Once - p.SetRequestForwarder(func(addr network.Address, c client.Client) ([]*objectSDK.ID, error) { + p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.Client) ([]*objectSDK.ID, error) { var err error // once compose and resign forwarding request @@ -101,7 +101,7 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre } return searchResult, nil - }) + })) } body := req.GetBody() @@ -110,3 +110,32 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return p, nil } + +func groupAddressRequestForwarder(f func(network.Address, client.Client) ([]*objectSDK.ID, error)) searchsvc.RequestForwarder { + return func(addrGroup network.AddressGroup, c client.Client) ([]*objectSDK.ID, error) { + var ( + firstErr error + res []*objectSDK.ID + ) + + addrGroup.IterateAddresses(func(addr network.Address) (stop bool) { + var err error + + defer func() { + stop = err == nil + + if stop || firstErr == nil { + firstErr = err + } + + // would be nice to log otherwise + }() + + res, err = f(addr, c) + + return + }) + + return res, firstErr + } +}