From 81f4cdbb91589ddab47ecde57992ef8de10e9ec0 Mon Sep 17 00:00:00 2001
From: Anton Nikiforov <an.nikiforov@yadro.com>
Date: Tue, 22 Oct 2024 10:06:16 +0300
Subject: [PATCH] [#1439] object: Sort nodes by priority metrics to compute GET
 request

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
---
 cmd/frostfs-node/config.go                    |  11 +
 cmd/frostfs-node/config/object/config.go      |  20 ++
 cmd/frostfs-node/object.go                    |   6 +-
 config/example/node.env                       |   1 +
 config/example/node.json                      |   3 +
 config/example/node.yaml                      |   4 +
 docs/storage-node-configuration.md            |  14 +-
 .../object_manager/placement/metrics.go       |  43 +++
 .../object_manager/placement/traverser.go     |  70 ++++-
 .../placement/traverser_test.go               | 288 +++++++++++++++++-
 10 files changed, 449 insertions(+), 11 deletions(-)
 create mode 100644 pkg/services/object_manager/placement/metrics.go

diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go
index b2dcafbd7..800c49127 100644
--- a/cmd/frostfs-node/config.go
+++ b/cmd/frostfs-node/config.go
@@ -58,6 +58,7 @@ import (
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
 	objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
 	getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
+	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
 	tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
@@ -109,6 +110,7 @@ type applicationConfiguration struct {
 
 	ObjectCfg struct {
 		tombstoneLifetime uint64
+		priorityMetrics   []placement.Metric
 	}
 
 	EngineCfg struct {
@@ -232,6 +234,15 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
 	// Object
 
 	a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
+	var pm []placement.Metric
+	for _, raw := range objectconfig.Get(c).Priority() {
+		m, err := placement.ParseMetric(raw)
+		if err != nil {
+			return err
+		}
+		pm = append(pm, m)
+	}
+	a.ObjectCfg.priorityMetrics = pm
 
 	// Storage Engine
 
diff --git a/cmd/frostfs-node/config/object/config.go b/cmd/frostfs-node/config/object/config.go
index 876dc3ef1..6ff1fe2ab 100644
--- a/cmd/frostfs-node/config/object/config.go
+++ b/cmd/frostfs-node/config/object/config.go
@@ -10,10 +10,17 @@ type PutConfig struct {
 	cfg *config.Config
 }
 
+// GetConfig is a wrapper over "get" config section which provides access
+// to object get pipeline configuration of object service.
+type GetConfig struct {
+	cfg *config.Config
+}
+
 const (
 	subsection = "object"
 
 	putSubsection = "put"
+	getSubsection = "get"
 
 	// PutPoolSizeDefault is a default value of routine pool size to
 	// process object.Put requests in object service.
@@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
 func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
 	return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
 }
+
+// Get returns structure that provides access to "get" subsection of
+// "object" section.
+func Get(c *config.Config) GetConfig {
+	return GetConfig{
+		c.Sub(subsection).Sub(getSubsection),
+	}
+}
+
+// Priority returns the value of "priority" config parameter.
+func (g GetConfig) Priority() []string {
+	return config.StringSliceSafe(g.cfg, "priority")
+}
diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go
index c484c5d8c..c6bde2cff 100644
--- a/cmd/frostfs-node/object.go
+++ b/cmd/frostfs-node/object.go
@@ -178,7 +178,8 @@ func initObjectService(c *cfg) {
 
 	sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
 
-	sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
+	sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
+		c.ObjectCfg.priorityMetrics)
 
 	*c.cfgObject.getSvc = *sGet // need smth better
 
@@ -389,6 +390,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
 func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
 	coreConstructor *cache.ClientCache,
 	containerSource containercore.Source,
+	priorityMetrics []placement.Metric,
 ) *getsvc.Service {
 	ls := c.cfgObject.cfgLocalStorage.localStorage
 
@@ -398,6 +400,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
 		ls,
 		traverseGen.WithTraverseOptions(
 			placement.SuccessAfter(1),
+			placement.WithPriorityMetrics(priorityMetrics),
+			placement.WithNodeState(c),
 		),
 		coreConstructor,
 		containerSource,
diff --git a/config/example/node.env b/config/example/node.env
index 580d343fb..3979eb18f 100644
--- a/config/example/node.env
+++ b/config/example/node.env
@@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
 FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
 FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
 FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
+FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
 
 # Storage engine section
 FROSTFS_STORAGE_SHARD_POOL_SIZE=15
diff --git a/config/example/node.json b/config/example/node.json
index 3470d2d12..1ea28de6c 100644
--- a/config/example/node.json
+++ b/config/example/node.json
@@ -131,6 +131,9 @@
       "remote_pool_size": 100,
       "local_pool_size": 200,
       "skip_session_token_issuer_verification": true
+    },
+    "get": {
+      "priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
     }
   },
   "storage": {
diff --git a/config/example/node.yaml b/config/example/node.yaml
index 2a963fc0f..4a418dfcb 100644
--- a/config/example/node.yaml
+++ b/config/example/node.yaml
@@ -114,6 +114,10 @@ object:
     remote_pool_size: 100  # number of async workers for remote PUT operations
     local_pool_size: 200  # number of async workers for local PUT operations
     skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
+  get:
+    priority:  # list of metrics of nodes for prioritization
+      - $attribute:ClusterName
+      - $attribute:UN-LOCODE
 
 storage:
   # note: shard configuration can be omitted for relay node (see `node.relay`)
diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md
index 2b94400df..363520481 100644
--- a/docs/storage-node-configuration.md
+++ b/docs/storage-node-configuration.md
@@ -407,13 +407,17 @@ Contains object-service related parameters.
 object:
   put:
     remote_pool_size: 100
+  get:
+    priority:
+      - $attribute:ClusterName
 ```
 
-| Parameter                   | Type  | Default value | Description                                                                                    |
-|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
-| `delete.tombstone_lifetime` | `int` | `5`           | Tombstone lifetime for removed objects in epochs.                                              |
-| `put.remote_pool_size`      | `int` | `10`          | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
-| `put.local_pool_size`       | `int` | `10`          | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services.  |
+| Parameter                   | Type       | Default value | Description                                                                                          |
+|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
+| `delete.tombstone_lifetime` | `int`      | `5`           | Tombstone lifetime for removed objects in epochs.                                                    |
+| `put.remote_pool_size`      | `int`      | `10`          | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services.       |
+| `put.local_pool_size`       | `int`      | `10`          | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services.        |
+| `get.priority`              | `[]string` |               | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
 
 # `runtime` section
 Contains runtime parameters.
diff --git a/pkg/services/object_manager/placement/metrics.go b/pkg/services/object_manager/placement/metrics.go
new file mode 100644
index 000000000..45e6df339
--- /dev/null
+++ b/pkg/services/object_manager/placement/metrics.go
@@ -0,0 +1,43 @@
+package placement
+
+import (
+	"errors"
+	"strings"
+
+	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
+)
+
+const (
+	attrPrefix = "$attribute:"
+)
+
+type Metric interface {
+	CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) int
+}
+
+func ParseMetric(raw string) (Metric, error) {
+	if attr, found := strings.CutPrefix(raw, attrPrefix); found {
+		return NewAttributeMetric(attr), nil
+	}
+	return nil, errors.New("unsupported priority metric")
+}
+
+// attributeMetric describes priority metric based on attribute.
+type attributeMetric struct {
+	attribute string
+}
+
+// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
+// the value of attribute is the same. In other case return [1].
+func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) int {
+	fromAttr := from.Attribute(am.attribute)
+	toAttr := to.Attribute(am.attribute)
+	if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
+		return 0
+	}
+	return 1
+}
+
+func NewAttributeMetric(attr string) Metric {
+	return &attributeMetric{attribute: attr}
+}
diff --git a/pkg/services/object_manager/placement/traverser.go b/pkg/services/object_manager/placement/traverser.go
index 4e790628f..6440f187d 100644
--- a/pkg/services/object_manager/placement/traverser.go
+++ b/pkg/services/object_manager/placement/traverser.go
@@ -3,6 +3,7 @@ package placement
 import (
 	"errors"
 	"fmt"
+	"slices"
 	"sync"
 
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@@ -23,6 +24,11 @@ type Builder interface {
 	BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
 }
 
+type NodeState interface {
+	// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
+	LocalNodeInfo() *netmap.NodeInfo
+}
+
 // Option represents placement traverser option.
 type Option func(*cfg)
 
@@ -50,6 +56,10 @@ type cfg struct {
 	policy    netmap.PlacementPolicy
 
 	builder Builder
+
+	metrics []Metric
+
+	nodeState NodeState
 }
 
 const invalidOptsMsg = "invalid traverser options"
@@ -99,7 +109,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
 	}
 
 	var rem []int
-	if cfg.flatSuccess != nil {
+	if len(cfg.metrics) > 0 && cfg.nodeState != nil {
+		rem = defaultCopiesVector(cfg.policy)
+		var unsortedVector []netmap.NodeInfo
+		var regularVector []netmap.NodeInfo
+		for i := range rem {
+			unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
+			regularVector = append(regularVector, ns[i][rem[i]:]...)
+		}
+		rem = []int{-1, -1}
+
+		sortedVector, err := sortVector(cfg, unsortedVector)
+		if err != nil {
+			return nil, err
+		}
+		ns = [][]netmap.NodeInfo{sortedVector, regularVector}
+	} else if cfg.flatSuccess != nil {
 		ns = flatNodes(ns)
 		rem = []int{int(*cfg.flatSuccess)}
 	} else {
@@ -157,6 +182,35 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
 	return [][]netmap.NodeInfo{flat}
 }
 
+type nodeMetrics struct {
+	index   int
+	metrics []int
+}
+
+func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
+	nm := make([]nodeMetrics, len(unsortedVector))
+	node := cfg.nodeState.LocalNodeInfo()
+
+	for i := range unsortedVector {
+		m := make([]int, len(cfg.metrics))
+		for j, pm := range cfg.metrics {
+			m[j] = pm.CalculateValue(node, &unsortedVector[i])
+		}
+		nm[i] = nodeMetrics{
+			index:   i,
+			metrics: m,
+		}
+	}
+	slices.SortFunc(nm, func(a, b nodeMetrics) int {
+		return slices.Compare(a.metrics, b.metrics)
+	})
+	sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
+	for i := range unsortedVector {
+		sortedVector[i] = unsortedVector[nm[i].index]
+	}
+	return sortedVector, nil
+}
+
 // Node is a descriptor of storage node with information required for intra-container communication.
 type Node struct {
 	addresses network.AddressGroup
@@ -322,3 +376,17 @@ func WithCopyNumbers(v []uint32) Option {
 		c.copyNumbers = v
 	}
 }
+
+// WithPriorityMetrics use provided priority metrics to sort nodes.
+func WithPriorityMetrics(m []Metric) Option {
+	return func(c *cfg) {
+		c.metrics = m
+	}
+}
+
+// WithNodeState provide state of the current node.
+func WithNodeState(s NodeState) Option {
+	return func(c *cfg) {
+		c.nodeState = s
+	}
+}
diff --git a/pkg/services/object_manager/placement/traverser_test.go b/pkg/services/object_manager/placement/traverser_test.go
index b3b57677d..38f62aa07 100644
--- a/pkg/services/object_manager/placement/traverser_test.go
+++ b/pkg/services/object_manager/placement/traverser_test.go
@@ -22,7 +22,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
 }
 
 func testNode(v uint32) (n netmap.NodeInfo) {
-	n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v)))
+	ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
+	n.SetNetworkEndpoints(ip)
+	n.SetPublicKey([]byte(ip))
 
 	return n
 }
@@ -40,7 +42,15 @@ func copyVectors(v [][]netmap.NodeInfo) [][]netmap.NodeInfo {
 	return vc
 }
 
-func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
+func testPlacement(ss []int, rs []int) ([][]netmap.NodeInfo, container.Container) {
+	return placement(ss, rs, nil)
+}
+
+func testECPlacement(ss []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
+	return placement(ss, nil, ec)
+}
+
+func placement(ss []int, rs []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
 	nodes := make([][]netmap.NodeInfo, 0, len(rs))
 	replicas := make([]netmap.ReplicaDescriptor, 0, len(rs))
 	num := uint32(0)
@@ -56,7 +66,12 @@ func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
 		nodes = append(nodes, ns)
 
 		var rd netmap.ReplicaDescriptor
-		rd.SetNumberOfObjects(uint32(rs[i]))
+		if len(rs) > 0 {
+			rd.SetNumberOfObjects(uint32(rs[i]))
+		} else {
+			rd.SetECDataCount(uint32(ec[i][0]))
+			rd.SetECParityCount(uint32(ec[i][1]))
+		}
 
 		replicas = append(replicas, rd)
 	}
@@ -134,7 +149,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
 		err = n.FromIterator(netmapcore.Node(nodes[1][0]))
 		require.NoError(t, err)
 
-		require.Equal(t, []Node{{addresses: n}}, tr.Next())
+		require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
 	})
 
 	t.Run("put scenario", func(t *testing.T) {
@@ -275,3 +290,268 @@ func TestTraverserRemValues(t *testing.T) {
 		})
 	}
 }
+
+type nodeState struct {
+	node *netmap.NodeInfo
+}
+
+func (n *nodeState) LocalNodeInfo() *netmap.NodeInfo {
+	return n.node
+}
+
+func TestTraverserPriorityMetrics(t *testing.T) {
+	t.Run("one rep one metric", func(t *testing.T) {
+		selectors := []int{4}
+		replicas := []int{3}
+
+		nodes, cnr := testPlacement(selectors, replicas)
+
+		// Node_0, PK - ip4/0.0.0.0/tcp/0
+		nodes[0][0].SetAttribute("ClusterName", "A")
+		// Node_1, PK - ip4/0.0.0.0/tcp/1
+		nodes[0][1].SetAttribute("ClusterName", "A")
+		// Node_2, PK - ip4/0.0.0.0/tcp/2
+		nodes[0][2].SetAttribute("ClusterName", "B")
+		// Node_3, PK - ip4/0.0.0.0/tcp/3
+		nodes[0][3].SetAttribute("ClusterName", "B")
+
+		sdkNode := testNode(5)
+		sdkNode.SetAttribute("ClusterName", "B")
+
+		nodesCopy := copyVectors(nodes)
+
+		m := []Metric{NewAttributeMetric("ClusterName")}
+
+		tr, err := NewTraverser(
+			ForContainer(cnr),
+			UseBuilder(&testBuilder{
+				vectors: nodesCopy,
+			}),
+			WithoutSuccessTracking(),
+			WithPriorityMetrics(m),
+			WithNodeState(&nodeState{
+				node: &sdkNode,
+			}),
+		)
+		require.NoError(t, err)
+
+		// Without priority metric `ClusterName` the order will be:
+		// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
+		// With priority metric `ClusterName` and current node in cluster B
+		// the order should be:
+		// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
+		next := tr.Next()
+		require.NotNil(t, next)
+		require.Equal(t, 3, len(next))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
+
+		next = tr.Next()
+		// The last node is
+		require.Equal(t, 1, len(next))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
+
+		next = tr.Next()
+		require.Nil(t, next)
+	})
+
+	t.Run("two reps two metrics", func(t *testing.T) {
+		selectors := []int{3, 3}
+		replicas := []int{2, 2}
+
+		nodes, cnr := testPlacement(selectors, replicas)
+
+		// REPLICA #1
+		// Node_0, PK - ip4/0.0.0.0/tcp/0
+		nodes[0][0].SetAttribute("ClusterName", "A")
+		nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
+
+		// Node_1, PK - ip4/0.0.0.0/tcp/1
+		nodes[0][1].SetAttribute("ClusterName", "A")
+		nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
+
+		// Node_2, PK - ip4/0.0.0.0/tcp/2
+		nodes[0][2].SetAttribute("ClusterName", "A")
+		nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
+
+		// REPLICA #2
+		// Node_3 ip4/0.0.0.0/tcp/3
+		nodes[1][0].SetAttribute("ClusterName", "B")
+		nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
+
+		// Node_4, PK - ip4/0.0.0.0/tcp/4
+		nodes[1][1].SetAttribute("ClusterName", "B")
+		nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
+
+		// Node_5, PK - ip4/0.0.0.0/tcp/5
+		nodes[1][2].SetAttribute("ClusterName", "B")
+		nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
+
+		sdkNode := testNode(9)
+		sdkNode.SetAttribute("ClusterName", "B")
+		sdkNode.SetAttribute("UN-LOCODE", "RU DME")
+
+		nodesCopy := copyVectors(nodes)
+
+		m := []Metric{
+			NewAttributeMetric("ClusterName"),
+			NewAttributeMetric("UN-LOCODE"),
+		}
+
+		tr, err := NewTraverser(
+			ForContainer(cnr),
+			UseBuilder(&testBuilder{
+				vectors: nodesCopy,
+			}),
+			WithoutSuccessTracking(),
+			WithPriorityMetrics(m),
+			WithNodeState(&nodeState{
+				node: &sdkNode,
+			}),
+		)
+		require.NoError(t, err)
+
+		// Check that nodes in the same cluster and
+		// in the same location should be the first in slice.
+		// Nodes which are follow criteria but stay outside the replica
+		// should be in the next slice.
+
+		next := tr.Next()
+		require.Equal(t, 4, len(next))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
+
+		next = tr.Next()
+		require.Equal(t, 2, len(next))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
+
+		next = tr.Next()
+		require.Nil(t, next)
+
+		sdkNode.SetAttribute("ClusterName", "B")
+		sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
+
+		nodesCopy = copyVectors(nodes)
+
+		tr, err = NewTraverser(
+			ForContainer(cnr),
+			UseBuilder(&testBuilder{
+				vectors: nodesCopy,
+			}),
+			WithoutSuccessTracking(),
+			WithPriorityMetrics(m),
+			WithNodeState(&nodeState{
+				node: &sdkNode,
+			}),
+		)
+		require.NoError(t, err)
+
+		next = tr.Next()
+		require.Equal(t, 4, len(next))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
+
+		next = tr.Next()
+		require.Equal(t, 2, len(next))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
+
+		next = tr.Next()
+		require.Nil(t, next)
+
+		sdkNode.SetAttribute("ClusterName", "A")
+		sdkNode.SetAttribute("UN-LOCODE", "RU LED")
+
+		nodesCopy = copyVectors(nodes)
+
+		tr, err = NewTraverser(
+			ForContainer(cnr),
+			UseBuilder(&testBuilder{
+				vectors: nodesCopy,
+			}),
+			WithoutSuccessTracking(),
+			WithPriorityMetrics(m),
+			WithNodeState(&nodeState{
+				node: &sdkNode,
+			}),
+		)
+		require.NoError(t, err)
+
+		next = tr.Next()
+		require.Equal(t, 4, len(next))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
+
+		next = tr.Next()
+		require.Equal(t, 2, len(next))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
+
+		next = tr.Next()
+		require.Nil(t, next)
+	})
+
+	t.Run("ec container", func(t *testing.T) {
+		selectors := []int{4}
+		ec := [][]int{{2, 1}}
+
+		nodes, cnr := testECPlacement(selectors, ec)
+
+		// Node_0, PK - ip4/0.0.0.0/tcp/0
+		nodes[0][0].SetAttribute("ClusterName", "A")
+		// Node_1, PK - ip4/0.0.0.0/tcp/1
+		nodes[0][1].SetAttribute("ClusterName", "A")
+		// Node_2, PK - ip4/0.0.0.0/tcp/2
+		nodes[0][2].SetAttribute("ClusterName", "B")
+		// Node_3, PK - ip4/0.0.0.0/tcp/3
+		nodes[0][3].SetAttribute("ClusterName", "B")
+
+		sdkNode := testNode(5)
+		sdkNode.SetAttribute("ClusterName", "B")
+
+		nodesCopy := copyVectors(nodes)
+
+		m := []Metric{NewAttributeMetric("ClusterName")}
+
+		tr, err := NewTraverser(
+			ForContainer(cnr),
+			UseBuilder(&testBuilder{
+				vectors: nodesCopy,
+			}),
+			WithoutSuccessTracking(),
+			WithPriorityMetrics(m),
+			WithNodeState(&nodeState{
+				node: &sdkNode,
+			}),
+		)
+		require.NoError(t, err)
+
+		// Without priority metric `ClusterName` the order will be:
+		// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
+		// With priority metric `ClusterName` and current node in cluster B
+		// the order should be:
+		// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
+		next := tr.Next()
+		require.NotNil(t, next)
+		require.Equal(t, 3, len(next))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
+
+		next = tr.Next()
+		// The last node is
+		require.Equal(t, 1, len(next))
+		require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
+
+		next = tr.Next()
+		require.Nil(t, next)
+	})
+}