From d0e48c949b1cb32b402db856274844f1b39a45db Mon Sep 17 00:00:00 2001
From: Leonard Lyubich <leonard@nspcc.ru>
Date: Tue, 22 Jun 2021 15:19:30 +0300
Subject: [PATCH] [#607] object/search: Make client constructor to work with
 group address

Make Object Search service to work with `AddressGroup` instead of `Address`
in order to support multiple addresses of the storage node.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
---
 cmd/neofs-node/object.go                  |  2 +-
 pkg/services/object/search/container.go   |  3 ++-
 pkg/services/object/search/exec.go        |  6 ++---
 pkg/services/object/search/prm.go         |  2 +-
 pkg/services/object/search/remote.go      |  6 ++---
 pkg/services/object/search/search_test.go | 20 +++++++-------
 pkg/services/object/search/service.go     |  6 ++---
 pkg/services/object/search/util.go        |  4 +--
 pkg/services/object/search/v2/util.go     | 33 +++++++++++++++++++++--
 9 files changed, 54 insertions(+), 28 deletions(-)

diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go
index 54388036b..b9f541a1c 100644
--- a/cmd/neofs-node/object.go
+++ b/cmd/neofs-node/object.go
@@ -286,7 +286,7 @@ func initObjectService(c *cfg) {
 	sSearch := searchsvc.New(
 		searchsvc.WithLogger(c.log),
 		searchsvc.WithLocalStorageEngine(ls),
-		searchsvc.WithClientConstructor(coreConstructor),
+		searchsvc.WithClientConstructor(groupConstructor),
 		searchsvc.WithTraverserGenerator(
 			traverseGen.WithTraverseOptions(
 				placement.WithoutSuccessTracking(),
diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go
index 25e3a4d2c..d287897ce 100644
--- a/pkg/services/object/search/container.go
+++ b/pkg/services/object/search/container.go
@@ -3,6 +3,7 @@ package searchsvc
 import (
 	"context"
 
+	"github.com/nspcc-dev/neofs-node/pkg/network"
 	"go.uber.org/zap"
 )
 
@@ -76,7 +77,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
 			}
 
 			// TODO: consider parallel execution
-			exec.processNode(ctx, addrs[i])
+			exec.processNode(ctx, network.GroupFromAddress(addrs[i]))
 		}
 	}
 
diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go
index 75fababac..784506b92 100644
--- a/pkg/services/object/search/exec.go
+++ b/pkg/services/object/search/exec.go
@@ -117,16 +117,14 @@ func (exec *execCtx) generateTraverser(cid *cid.ID) (*placement.Traverser, bool)
 	}
 }
 
-func (exec execCtx) remoteClient(node network.Address) (searchClient, bool) {
-	log := exec.log.With(zap.Stringer("node", node))
-
+func (exec execCtx) remoteClient(node network.AddressGroup) (searchClient, bool) {
 	c, err := exec.svc.clientConstructor.get(node)
 	switch {
 	default:
 		exec.status = statusUndefined
 		exec.err = err
 
-		log.Debug("could not construct remote node client")
+		exec.log.Debug("could not construct remote node client")
 	case err == nil:
 		return c, true
 	}
diff --git a/pkg/services/object/search/prm.go b/pkg/services/object/search/prm.go
index 3340cb40a..31719c5ec 100644
--- a/pkg/services/object/search/prm.go
+++ b/pkg/services/object/search/prm.go
@@ -27,7 +27,7 @@ type IDListWriter interface {
 
 // RequestForwarder is a callback for forwarding of the
 // original Search requests.
-type RequestForwarder func(network.Address, coreclient.Client) ([]*objectSDK.ID, error)
+type RequestForwarder func(network.AddressGroup, coreclient.Client) ([]*objectSDK.ID, error)
 
 // SetCommonParameters sets common parameters of the operation.
 func (p *Prm) SetCommonParameters(common *util.CommonPrm) {
diff --git a/pkg/services/object/search/remote.go b/pkg/services/object/search/remote.go
index d4646b3d7..332a17eb2 100644
--- a/pkg/services/object/search/remote.go
+++ b/pkg/services/object/search/remote.go
@@ -7,10 +7,8 @@ import (
 	"go.uber.org/zap"
 )
 
-func (exec *execCtx) processNode(ctx context.Context, addr network.Address) {
-	log := exec.log.With(zap.Stringer("remote node", addr))
-
-	log.Debug("processing node...")
+func (exec *execCtx) processNode(ctx context.Context, addr network.AddressGroup) {
+	exec.log.Debug("processing node...")
 
 	client, ok := exec.remoteClient(addr)
 	if !ok {
diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go
index 8e8c200c5..8ec231b9e 100644
--- a/pkg/services/object/search/search_test.go
+++ b/pkg/services/object/search/search_test.go
@@ -84,8 +84,8 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap
 	return res, nil
 }
 
-func (c *testClientCache) get(mAddr network.Address) (searchClient, error) {
-	v, ok := c.clients[mAddr.HostAddr()]
+func (c *testClientCache) get(mAddr network.AddressGroup) (searchClient, error) {
+	v, ok := c.clients[network.StringifyGroup(mAddr)]
 	if !ok {
 		return nil, errors.New("could not construct client")
 	}
@@ -102,7 +102,7 @@ func (s *testStorage) search(exec *execCtx) ([]*objectSDK.ID, error) {
 	return v.ids, v.err
 }
 
-func (c *testStorage) searchObjects(exec *execCtx, _ network.Address) ([]*objectSDK.ID, error) {
+func (c *testStorage) searchObjects(exec *execCtx, _ network.AddressGroup) ([]*objectSDK.ID, error) {
 	v, ok := c.items[exec.containerID().String()]
 	if !ok {
 		return nil, nil
@@ -200,16 +200,16 @@ func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) {
 				strconv.Itoa(60000+j),
 			)
 
-			var na network.Address
-
-			err := na.FromString(a)
-			require.NoError(t, err)
-
-			as[j] = na.HostAddr()
-
 			ni := netmap.NewNodeInfo()
 			ni.SetAddress(a)
 
+			var na network.AddressGroup
+
+			err := na.FromIterator(ni)
+			require.NoError(t, err)
+
+			as[j] = network.StringifyGroup(na)
+
 			ns[j] = *ni
 		}
 
diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go
index b68fee63e..cd5c28207 100644
--- a/pkg/services/object/search/service.go
+++ b/pkg/services/object/search/service.go
@@ -23,11 +23,11 @@ type Service struct {
 type Option func(*cfg)
 
 type searchClient interface {
-	searchObjects(*execCtx, network.Address) ([]*object.ID, error)
+	searchObjects(*execCtx, network.AddressGroup) ([]*object.ID, error)
 }
 
 type ClientConstructor interface {
-	Get(network.Address) (client.Client, error)
+	Get(network.AddressGroup) (client.Client, error)
 }
 
 type cfg struct {
@@ -38,7 +38,7 @@ type cfg struct {
 	}
 
 	clientConstructor interface {
-		get(network.Address) (searchClient, error)
+		get(network.AddressGroup) (searchClient, error)
 	}
 
 	traverserGenerator interface {
diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go
index 81cecbacf..bb26bc392 100644
--- a/pkg/services/object/search/util.go
+++ b/pkg/services/object/search/util.go
@@ -68,7 +68,7 @@ func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error {
 	return w.writer.WriteIDs(list)
 }
 
-func (c *clientConstructorWrapper) get(addr network.Address) (searchClient, error) {
+func (c *clientConstructorWrapper) get(addr network.AddressGroup) (searchClient, error) {
 	clt, err := c.constructor.Get(addr)
 
 	return &clientWrapper{
@@ -76,7 +76,7 @@ func (c *clientConstructorWrapper) get(addr network.Address) (searchClient, erro
 	}, err
 }
 
-func (c *clientWrapper) searchObjects(exec *execCtx, addr network.Address) ([]*objectSDK.ID, error) {
+func (c *clientWrapper) searchObjects(exec *execCtx, addr network.AddressGroup) ([]*objectSDK.ID, error) {
 	if exec.prm.forwarder != nil {
 		return exec.prm.forwarder(addr, c.client)
 	}
diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go
index b2d4c5d33..ce8b2bd1a 100644
--- a/pkg/services/object/search/v2/util.go
+++ b/pkg/services/object/search/v2/util.go
@@ -46,7 +46,7 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
 	if !commonPrm.LocalOnly() {
 		var onceResign sync.Once
 
-		p.SetRequestForwarder(func(addr network.Address, c client.Client) ([]*objectSDK.ID, error) {
+		p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.Client) ([]*objectSDK.ID, error) {
 			var err error
 
 			// once compose and resign forwarding request
@@ -101,7 +101,7 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
 			}
 
 			return searchResult, nil
-		})
+		}))
 	}
 
 	body := req.GetBody()
@@ -110,3 +110,32 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
 
 	return p, nil
 }
+
+func groupAddressRequestForwarder(f func(network.Address, client.Client) ([]*objectSDK.ID, error)) searchsvc.RequestForwarder {
+	return func(addrGroup network.AddressGroup, c client.Client) ([]*objectSDK.ID, error) {
+		var (
+			firstErr error
+			res      []*objectSDK.ID
+		)
+
+		addrGroup.IterateAddresses(func(addr network.Address) (stop bool) {
+			var err error
+
+			defer func() {
+				stop = err == nil
+
+				if stop || firstErr == nil {
+					firstErr = err
+				}
+
+				// would be nice to log otherwise
+			}()
+
+			res, err = f(addr, c)
+
+			return
+		})
+
+		return res, firstErr
+	}
+}