diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index 62e070c4..c5de3dba 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -78,7 +78,7 @@ func (exec *execCtx) processCurrentEpoch() bool { // TODO: consider parallel execution // TODO: consider optimization: if status == SPLIT we can continue until // we reach the best result - split info with linking object ID. - if exec.processNode(ctx, addrs[i]) { + if exec.processNode(ctx, addrs[i].Addresses()) { exec.log.Debug("completing the operation") return true } diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 9b48b157..1eea616a 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -25,7 +25,7 @@ type distributedTarget struct { nodeTargetInitializer func(network.AddressGroup) transformer.ObjectTarget - relay func(network.AddressGroup) error + relay func(placement.Node) error fmt *object.FormatValidator @@ -68,15 +68,15 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { return t.iteratePlacement(t.sendObject) } -func (t *distributedTarget) sendObject(addr network.AddressGroup) error { +func (t *distributedTarget) sendObject(node placement.Node) error { if t.relay != nil { - err := t.relay(addr) + err := t.relay(node) if err == nil || !errors.Is(err, errLocalAddress) { return err } } - target := t.nodeTargetInitializer(addr) + target := t.nodeTargetInitializer(node.Addresses()) if err := target.WriteHeader(t.obj); err != nil { return fmt.Errorf("could not write header: %w", err) @@ -86,7 +86,7 @@ func (t *distributedTarget) sendObject(addr network.AddressGroup) error { return nil } -func (t *distributedTarget) iteratePlacement(f func(network.AddressGroup) error) (*transformer.AccessIdentifiers, error) { +func (t *distributedTarget) iteratePlacement(f func(placement.Node) error) (*transformer.AccessIdentifiers, error) { traverser, err := placement.NewTraverser( append(t.traverseOpts, placement.ForObject(t.obj.ID()))..., ) @@ -111,7 +111,7 @@ loop: defer wg.Done() if err := f(addr); err != nil { - svcutil.LogServiceError(t.log, "PUT", addr, err) + svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err) return } diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 5474e071..5637ed81 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -147,9 +147,11 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { var errLocalAddress = errors.New("can't relay to local address") func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { - var relay func(network.AddressGroup) error + var relay func(placement.Node) error if p.relay != nil { - relay = func(addr network.AddressGroup) error { + relay = func(node placement.Node) error { + addr := node.Addresses() + if network.IsLocalAddress(p.localAddrSrc, addr) { return errLocalAddress } diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go index 25e3a4d2..d561f7c6 100644 --- a/pkg/services/object/search/container.go +++ b/pkg/services/object/search/container.go @@ -76,7 +76,7 @@ func (exec *execCtx) processCurrentEpoch() bool { } // TODO: consider parallel execution - exec.processNode(ctx, addrs[i]) + exec.processNode(ctx, addrs[i].Addresses()) } } diff --git a/pkg/services/object_manager/placement/traverser.go b/pkg/services/object_manager/placement/traverser.go index 381353bb..2bf18433 100644 --- a/pkg/services/object_manager/placement/traverser.go +++ b/pkg/services/object_manager/placement/traverser.go @@ -119,10 +119,20 @@ func flatNodes(ns []netmap.Nodes) []netmap.Nodes { return []netmap.Nodes{flat} } +// Node is a descriptor of storage node with information required for intra-container communication. +type Node struct { + addresses network.AddressGroup +} + +// Addresses returns group of network addresses. +func (x Node) Addresses() network.AddressGroup { + return x.addresses +} + // Next returns next unprocessed address of the object placement. // // Returns nil if no nodes left or traversal operation succeeded. -func (t *Traverser) Next() []network.AddressGroup { +func (t *Traverser) Next() []Node { t.mtx.Lock() defer t.mtx.Unlock() @@ -139,10 +149,10 @@ func (t *Traverser) Next() []network.AddressGroup { count = len(t.vectors[0]) } - addrs := make([]network.AddressGroup, count) + nodes := make([]Node, count) for i := 0; i < count; i++ { - err := addrs[i].FromIterator(t.vectors[0][i]) + err := nodes[i].addresses.FromIterator(t.vectors[0][i]) if err != nil { // TODO: log error return nil @@ -151,7 +161,7 @@ func (t *Traverser) Next() []network.AddressGroup { t.vectors[0] = t.vectors[0][count:] - return addrs + return nodes } func (t *Traverser) skipEmptyVectors() { diff --git a/pkg/services/object_manager/placement/traverser_test.go b/pkg/services/object_manager/placement/traverser_test.go index 59f83382..d567383a 100644 --- a/pkg/services/object_manager/placement/traverser_test.go +++ b/pkg/services/object_manager/placement/traverser_test.go @@ -95,7 +95,7 @@ func TestTraverserObjectScenarios(t *testing.T) { require.Len(t, addrs, len(nodes[i])) for j, n := range nodes[i] { - assertSameAddress(t, n.NodeInfo, addrs[j]) + assertSameAddress(t, n.NodeInfo, addrs[j].Addresses()) } } @@ -129,7 +129,7 @@ func TestTraverserObjectScenarios(t *testing.T) { err = n.FromIterator(nodes[1][0]) require.NoError(t, err) - require.Equal(t, []network.AddressGroup{n}, tr.Next()) + require.Equal(t, []Node{{addresses: n}}, tr.Next()) }) t.Run("put scenario", func(t *testing.T) { @@ -152,7 +152,7 @@ func TestTraverserObjectScenarios(t *testing.T) { require.Len(t, addrs, replicas[curVector]) for j := range addrs { - assertSameAddress(t, nodes[curVector][i+j].NodeInfo, addrs[j]) + assertSameAddress(t, nodes[curVector][i+j].NodeInfo, addrs[j].Addresses()) } }