diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 6c3da104d..54388036b 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -303,7 +303,7 @@ func initObjectService(c *cfg) { sGet := getsvc.New( getsvc.WithLogger(c.log), getsvc.WithLocalStorageEngine(ls), - getsvc.WithClientConstructor(coreConstructor), + getsvc.WithClientConstructor(groupConstructor), getsvc.WithTraverserGenerator( traverseGen.WithTraverseOptions( placement.SuccessAfter(1), diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index 62e070c40..f56e72069 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -3,6 +3,7 @@ package getsvc import ( "context" + "github.com/nspcc-dev/neofs-node/pkg/network" "go.uber.org/zap" ) @@ -78,7 +79,7 @@ func (exec *execCtx) processCurrentEpoch() bool { // TODO: consider parallel execution // TODO: consider optimization: if status == SPLIT we can continue until // we reach the best result - split info with linking object ID. - if exec.processNode(ctx, addrs[i]) { + if exec.processNode(ctx, network.GroupFromAddress(addrs[i])) { exec.log.Debug("completing the operation") return true } diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 6848aa42f..d92a29d2c 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -270,9 +270,7 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) { } } -func (exec execCtx) remoteClient(node network.Address) (getClient, bool) { - log := exec.log.With(zap.Stringer("node", node)) - +func (exec execCtx) remoteClient(node network.AddressGroup) (getClient, bool) { c, err := exec.svc.clientCache.get(node) switch { @@ -280,7 +278,7 @@ func (exec execCtx) remoteClient(node network.Address) (getClient, bool) { exec.status = statusUndefined exec.err = err - log.Debug("could not construct remote node client") + exec.log.Debug("could not construct remote node client") case err == nil: return c, true } diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index e7ad1577d..94c2a3885 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -82,8 +82,8 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap return vs, nil } -func (c *testClientCache) get(mAddr network.Address) (getClient, error) { - v, ok := c.clients[mAddr.HostAddr()] +func (c *testClientCache) get(mAddr network.AddressGroup) (getClient, error) { + v, ok := c.clients[network.StringifyGroup(mAddr)] if !ok { return nil, errors.New("could not construct client") } @@ -100,7 +100,7 @@ func newTestClient() *testClient { } } -func (c *testClient) getObject(exec *execCtx, _ network.Address) (*objectSDK.Object, error) { +func (c *testClient) getObject(exec *execCtx, _ network.AddressGroup) (*objectSDK.Object, error) { v, ok := c.results[exec.address().String()] if !ok { return nil, object.ErrNotFound @@ -406,16 +406,16 @@ func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) { strconv.Itoa(60000+j), ) - var na network.Address - - err := na.FromString(a) - require.NoError(t, err) - - as[j] = na.HostAddr() - ni := netmap.NewNodeInfo() ni.SetAddress(a) + var na network.AddressGroup + + err := na.FromIterator(ni) + require.NoError(t, err) + + as[j] = network.StringifyGroup(na) + ns[j] = *ni } diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index e5c81db11..2caa48bf4 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -34,7 +34,7 @@ type RangeHashPrm struct { salt []byte } -type RequestForwarder func(network.Address, coreclient.Client) (*objectSDK.Object, error) +type RequestForwarder func(network.AddressGroup, coreclient.Client) (*objectSDK.Object, error) // HeadPrm groups parameters of Head service call. type HeadPrm struct { diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index fe700d785..269771059 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -10,10 +10,8 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) processNode(ctx context.Context, addr network.Address) bool { - log := exec.log.With(zap.Stringer("remote node", addr)) - - log.Debug("processing node...") +func (exec *execCtx) processNode(ctx context.Context, addr network.AddressGroup) bool { + exec.log.Debug("processing node...") client, ok := exec.remoteClient(addr) if !ok { @@ -29,7 +27,7 @@ func (exec *execCtx) processNode(ctx context.Context, addr network.Address) bool exec.status = statusUndefined exec.err = object.ErrNotFound - log.Debug("remote call failed", + exec.log.Debug("remote call failed", zap.String("error", err.Error()), ) case err == nil: diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index 76e07333e..8d23db48d 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -22,7 +22,7 @@ type Service struct { type Option func(*cfg) type getClient interface { - getObject(*execCtx, network.Address) (*objectSDK.Object, error) + getObject(*execCtx, network.AddressGroup) (*objectSDK.Object, error) } type cfg struct { @@ -35,7 +35,7 @@ type cfg struct { } clientCache interface { - get(network.Address) (getClient, error) + get(network.AddressGroup) (getClient, error) } traverserGenerator interface { @@ -93,7 +93,7 @@ func WithLocalStorageEngine(e *engine.StorageEngine) Option { } type ClientConstructor interface { - Get(network.Address) (client.Client, error) + Get(network.AddressGroup) (client.Client, error) } // WithClientConstructor returns option to set constructor of remote node clients. diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 098f94ca3..90ea40a59 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -73,7 +73,7 @@ func (s *SimpleObjectWriter) Object() *object.Object { return s.obj.Object() } -func (c *clientCacheWrapper) get(addr network.Address) (getClient, error) { +func (c *clientCacheWrapper) get(addr network.AddressGroup) (getClient, error) { clt, err := c.cache.Get(addr) return &clientWrapper{ @@ -81,7 +81,7 @@ func (c *clientCacheWrapper) get(addr network.Address) (getClient, error) { }, err } -func (c *clientWrapper) getObject(exec *execCtx, addr network.Address) (*objectSDK.Object, error) { +func (c *clientWrapper) getObject(exec *execCtx, addr network.AddressGroup) (*objectSDK.Object, error) { if !exec.assembling && exec.prm.forwarder != nil { return exec.prm.forwarder(addr, c.client) } diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 080a8eeab..6ecfb1c08 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -55,7 +55,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre if !commonPrm.LocalOnly() { var onceResign sync.Once - p.SetRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { + p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { var err error // once compose and resign forwarding request @@ -144,7 +144,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre // convert the object return objectSDK.NewFromV2(obj), nil - }) + })) } return p, nil @@ -177,7 +177,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get if !commonPrm.LocalOnly() { var onceResign sync.Once - p.SetRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { + p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { var err error // once compose and resign forwarding request @@ -242,7 +242,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get obj.SetPayload(payload) return obj.Object(), nil - }) + })) } return p, nil @@ -340,7 +340,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp if !commonPrm.LocalOnly() { var onceResign sync.Once - p.SetRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { + p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { var err error // once compose and resign forwarding request @@ -439,7 +439,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp // convert the object return raw.Object().SDK(), nil - }) + })) } return p, nil @@ -507,3 +507,32 @@ func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart { return sh } + +func groupAddressRequestForwarder(f func(network.Address, client.Client) (*objectSDK.Object, error)) getsvc.RequestForwarder { + return func(addrGroup network.AddressGroup, c client.Client) (*objectSDK.Object, error) { + var ( + firstErr error + res *objectSDK.Object + ) + + addrGroup.IterateAddresses(func(addr network.Address) (stop bool) { + var err error + + defer func() { + stop = err == nil + + if stop || firstErr == nil { + firstErr = err + } + + // would be nice to log otherwise + }() + + res, err = f(addr, c) + + return + }) + + return res, firstErr + } +}