Compare commits

...

3 commits

Author SHA1 Message Date
efe1143ce2 [#1439] object: Sort nodes by priority metrics to compute GET request
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-10-28 12:40:57 +03:00
ca816de8e9 [#1439] node: Reduce usage of netmapAPI.NodeInfo
Remove outdated code from `netmap` service.

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-10-28 12:40:57 +03:00
bc8d79ddf9
[#1447] services/tree: Move relaying code to a separate function
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-24 10:01:03 +03:00
13 changed files with 480 additions and 108 deletions

View file

@ -58,6 +58,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source" tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
@ -109,6 +110,7 @@ type applicationConfiguration struct {
ObjectCfg struct { ObjectCfg struct {
tombstoneLifetime uint64 tombstoneLifetime uint64
priorityMetrics []placement.Metric
} }
EngineCfg struct { EngineCfg struct {
@ -232,6 +234,15 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
// Object // Object
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c) a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
var pm []placement.Metric
for _, raw := range objectconfig.Get(c).Priority() {
m, err := placement.ParseMetric(raw)
if err != nil {
return err
}
pm = append(pm, m)
}
a.ObjectCfg.priorityMetrics = pm
// Storage Engine // Storage Engine
@ -1179,17 +1190,15 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
return pool return pool
} }
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { func (c *cfg) LocalNodeInfo() *netmap.NodeInfo {
var res netmapV2.NodeInfo var res netmap.NodeInfo
ni, ok := c.cfgNetmap.state.getNodeInfo() ni, ok := c.cfgNetmap.state.getNodeInfo()
if ok { if ok {
ni.WriteToV2(&res) res = ni
} else { } else {
c.cfgNodeInfo.localInfo.WriteToV2(&res) res = c.cfgNodeInfo.localInfo
} }
return &res
return &res, nil
} }
// setContractNodeInfo rewrites local node info from the FrostFS network map. // setContractNodeInfo rewrites local node info from the FrostFS network map.

View file

@ -10,10 +10,17 @@ type PutConfig struct {
cfg *config.Config cfg *config.Config
} }
// GetConfig is a wrapper over "get" config section which provides access
// to object get pipeline configuration of object service.
type GetConfig struct {
cfg *config.Config
}
const ( const (
subsection = "object" subsection = "object"
putSubsection = "put" putSubsection = "put"
getSubsection = "get"
// PutPoolSizeDefault is a default value of routine pool size to // PutPoolSizeDefault is a default value of routine pool size to
// process object.Put requests in object service. // process object.Put requests in object service.
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
func (g PutConfig) SkipSessionTokenIssuerVerification() bool { func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification") return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
} }
// Get returns structure that provides access to "get" subsection of
// "object" section.
func Get(c *config.Config) GetConfig {
return GetConfig{
c.Sub(subsection).Sub(getSubsection),
}
}
// Priority returns the value of "priority" config parameter.
func (g GetConfig) Priority() []string {
return config.StringSliceSafe(g.cfg, "priority")
}

View file

@ -178,7 +178,8 @@ func initObjectService(c *cfg) {
sSearchV2 := createSearchSvcV2(sSearch, keyStorage) sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource) sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
c.ObjectCfg.priorityMetrics)
*c.cfgObject.getSvc = *sGet // need smth better *c.cfgObject.getSvc = *sGet // need smth better
@ -389,6 +390,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
coreConstructor *cache.ClientCache, coreConstructor *cache.ClientCache,
containerSource containercore.Source, containerSource containercore.Source,
priorityMetrics []placement.Metric,
) *getsvc.Service { ) *getsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage ls := c.cfgObject.cfgLocalStorage.localStorage
@ -398,6 +400,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
ls, ls,
traverseGen.WithTraverseOptions( traverseGen.WithTraverseOptions(
placement.SuccessAfter(1), placement.SuccessAfter(1),
placement.WithPriorityMetrics(priorityMetrics),
placement.WithNodeState(c),
), ),
coreConstructor, coreConstructor,
containerSource, containerSource,

View file

@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200 FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
# Storage engine section # Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_POOL_SIZE=15

View file

@ -131,6 +131,9 @@
"remote_pool_size": 100, "remote_pool_size": 100,
"local_pool_size": 200, "local_pool_size": 200,
"skip_session_token_issuer_verification": true "skip_session_token_issuer_verification": true
},
"get": {
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
} }
}, },
"storage": { "storage": {

View file

@ -114,6 +114,10 @@ object:
remote_pool_size: 100 # number of async workers for remote PUT operations remote_pool_size: 100 # number of async workers for remote PUT operations
local_pool_size: 200 # number of async workers for local PUT operations local_pool_size: 200 # number of async workers for local PUT operations
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
get:
priority: # list of metrics of nodes for prioritization
- $attribute:ClusterName
- $attribute:UN-LOCODE
storage: storage:
# note: shard configuration can be omitted for relay node (see `node.relay`) # note: shard configuration can be omitted for relay node (see `node.relay`)

View file

@ -407,13 +407,17 @@ Contains object-service related parameters.
object: object:
put: put:
remote_pool_size: 100 remote_pool_size: 100
get:
priority:
- $attribute:ClusterName
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------| |-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. | | `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. | | `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. | | `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
# `runtime` section # `runtime` section
Contains runtime parameters. Contains runtime parameters.

View file

@ -28,7 +28,7 @@ type executorSvc struct {
type NodeState interface { type NodeState interface {
// LocalNodeInfo must return current node state // LocalNodeInfo must return current node state
// in FrostFS API v2 NodeInfo structure. // in FrostFS API v2 NodeInfo structure.
LocalNodeInfo() (*netmap.NodeInfo, error) LocalNodeInfo() *netmapSDK.NodeInfo
// ReadCurrentNetMap reads current local network map of the storage node // ReadCurrentNetMap reads current local network map of the storage node
// into the given parameter. Returns any error encountered which prevented // into the given parameter. Returns any error encountered which prevented
@ -64,39 +64,15 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo,
func (s *executorSvc) LocalNodeInfo( func (s *executorSvc) LocalNodeInfo(
_ context.Context, _ context.Context,
req *netmap.LocalNodeInfoRequest, _ *netmap.LocalNodeInfoRequest,
) (*netmap.LocalNodeInfoResponse, error) { ) (*netmap.LocalNodeInfoResponse, error) {
verV2 := req.GetMetaHeader().GetVersion() ni := s.state.LocalNodeInfo()
if verV2 == nil { var nodeInfo netmap.NodeInfo
return nil, errors.New("missing version") ni.WriteToV2(&nodeInfo)
}
var ver versionsdk.Version
if err := ver.ReadFromV2(*verV2); err != nil {
return nil, fmt.Errorf("can't read version: %w", err)
}
ni, err := s.state.LocalNodeInfo()
if err != nil {
return nil, err
}
if addrNum := ni.NumberOfAddresses(); addrNum > 0 && ver.Minor() <= 7 {
ni2 := new(netmap.NodeInfo)
ni2.SetPublicKey(ni.GetPublicKey())
ni2.SetState(ni.GetState())
ni2.SetAttributes(ni.GetAttributes())
ni.IterateAddresses(func(s string) bool {
ni2.SetAddresses(s)
return true
})
ni = ni2
}
body := new(netmap.LocalNodeInfoResponseBody) body := new(netmap.LocalNodeInfoResponseBody)
body.SetVersion(&s.version) body.SetVersion(&s.version)
body.SetNodeInfo(ni) body.SetNodeInfo(&nodeInfo)
resp := new(netmap.LocalNodeInfoResponse) resp := new(netmap.LocalNodeInfoResponse)
resp.SetBody(body) resp.SetBody(body)

View file

@ -0,0 +1,43 @@
package placement
import (
"errors"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
const (
attrPrefix = "$attribute:"
)
type Metric interface {
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) int
}
func ParseMetric(raw string) (Metric, error) {
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
return NewAttributeMetric(attr), nil
}
return nil, errors.New("unsupported priority metric")
}
// attributeMetric describes priority metric based on attribute.
type attributeMetric struct {
attribute string
}
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
// the value of attribute is the same. In other case return [1].
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) int {
fromAttr := from.Attribute(am.attribute)
toAttr := to.Attribute(am.attribute)
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
return 0
}
return 1
}
func NewAttributeMetric(attr string) Metric {
return &attributeMetric{attribute: attr}
}

View file

@ -3,6 +3,7 @@ package placement
import ( import (
"errors" "errors"
"fmt" "fmt"
"slices"
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -23,6 +24,11 @@ type Builder interface {
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
} }
type NodeState interface {
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
LocalNodeInfo() *netmap.NodeInfo
}
// Option represents placement traverser option. // Option represents placement traverser option.
type Option func(*cfg) type Option func(*cfg)
@ -50,6 +56,10 @@ type cfg struct {
policy netmap.PlacementPolicy policy netmap.PlacementPolicy
builder Builder builder Builder
metrics []Metric
nodeState NodeState
} }
const invalidOptsMsg = "invalid traverser options" const invalidOptsMsg = "invalid traverser options"
@ -99,7 +109,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
} }
var rem []int var rem []int
if cfg.flatSuccess != nil { if len(cfg.metrics) > 0 && cfg.nodeState != nil {
rem = defaultCopiesVector(cfg.policy)
var unsortedVector []netmap.NodeInfo
var regularVector []netmap.NodeInfo
for i := range rem {
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
regularVector = append(regularVector, ns[i][rem[i]:]...)
}
rem = []int{-1, -1}
sortedVector, err := sortVector(cfg, unsortedVector)
if err != nil {
return nil, err
}
ns = [][]netmap.NodeInfo{sortedVector, regularVector}
} else if cfg.flatSuccess != nil {
ns = flatNodes(ns) ns = flatNodes(ns)
rem = []int{int(*cfg.flatSuccess)} rem = []int{int(*cfg.flatSuccess)}
} else { } else {
@ -157,6 +182,35 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
return [][]netmap.NodeInfo{flat} return [][]netmap.NodeInfo{flat}
} }
type nodeMetrics struct {
index int
metrics []int
}
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
nm := make([]nodeMetrics, len(unsortedVector))
node := cfg.nodeState.LocalNodeInfo()
for i := range unsortedVector {
m := make([]int, len(cfg.metrics))
for j, pm := range cfg.metrics {
m[j] = pm.CalculateValue(node, &unsortedVector[i])
}
nm[i] = nodeMetrics{
index: i,
metrics: m,
}
}
slices.SortFunc(nm, func(a, b nodeMetrics) int {
return slices.Compare(a.metrics, b.metrics)
})
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
for i := range unsortedVector {
sortedVector[i] = unsortedVector[nm[i].index]
}
return sortedVector, nil
}
// Node is a descriptor of storage node with information required for intra-container communication. // Node is a descriptor of storage node with information required for intra-container communication.
type Node struct { type Node struct {
addresses network.AddressGroup addresses network.AddressGroup
@ -322,3 +376,17 @@ func WithCopyNumbers(v []uint32) Option {
c.copyNumbers = v c.copyNumbers = v
} }
} }
// WithPriorityMetrics use provided priority metrics to sort nodes.
func WithPriorityMetrics(m []Metric) Option {
return func(c *cfg) {
c.metrics = m
}
}
// WithNodeState provide state of the current node.
func WithNodeState(s NodeState) Option {
return func(c *cfg) {
c.nodeState = s
}
}

View file

@ -22,7 +22,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
} }
func testNode(v uint32) (n netmap.NodeInfo) { func testNode(v uint32) (n netmap.NodeInfo) {
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))) ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
n.SetNetworkEndpoints(ip)
n.SetPublicKey([]byte(ip))
return n return n
} }
@ -40,7 +42,15 @@ func copyVectors(v [][]netmap.NodeInfo) [][]netmap.NodeInfo {
return vc return vc
} }
func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) { func testPlacement(ss []int, rs []int) ([][]netmap.NodeInfo, container.Container) {
return placement(ss, rs, nil)
}
func testECPlacement(ss []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
return placement(ss, nil, ec)
}
func placement(ss []int, rs []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
nodes := make([][]netmap.NodeInfo, 0, len(rs)) nodes := make([][]netmap.NodeInfo, 0, len(rs))
replicas := make([]netmap.ReplicaDescriptor, 0, len(rs)) replicas := make([]netmap.ReplicaDescriptor, 0, len(rs))
num := uint32(0) num := uint32(0)
@ -56,7 +66,12 @@ func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
nodes = append(nodes, ns) nodes = append(nodes, ns)
var rd netmap.ReplicaDescriptor var rd netmap.ReplicaDescriptor
if len(rs) > 0 {
rd.SetNumberOfObjects(uint32(rs[i])) rd.SetNumberOfObjects(uint32(rs[i]))
} else {
rd.SetECDataCount(uint32(ec[i][0]))
rd.SetECParityCount(uint32(ec[i][1]))
}
replicas = append(replicas, rd) replicas = append(replicas, rd)
} }
@ -134,7 +149,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
err = n.FromIterator(netmapcore.Node(nodes[1][0])) err = n.FromIterator(netmapcore.Node(nodes[1][0]))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []Node{{addresses: n}}, tr.Next()) require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
}) })
t.Run("put scenario", func(t *testing.T) { t.Run("put scenario", func(t *testing.T) {
@ -275,3 +290,268 @@ func TestTraverserRemValues(t *testing.T) {
}) })
} }
} }
type nodeState struct {
node *netmap.NodeInfo
}
func (n *nodeState) LocalNodeInfo() *netmap.NodeInfo {
return n.node
}
func TestTraverserPriorityMetrics(t *testing.T) {
t.Run("one rep one metric", func(t *testing.T) {
selectors := []int{4}
replicas := []int{3}
nodes, cnr := testPlacement(selectors, replicas)
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "B")
// Node_3, PK - ip4/0.0.0.0/tcp/3
nodes[0][3].SetAttribute("ClusterName", "B")
sdkNode := testNode(5)
sdkNode.SetAttribute("ClusterName", "B")
nodesCopy := copyVectors(nodes)
m := []Metric{NewAttributeMetric("ClusterName")}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Without priority metric `ClusterName` the order will be:
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
// With priority metric `ClusterName` and current node in cluster B
// the order should be:
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
next := tr.Next()
require.NotNil(t, next)
require.Equal(t, 3, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
next = tr.Next()
// The last node is
require.Equal(t, 1, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
t.Run("two reps two metrics", func(t *testing.T) {
selectors := []int{3, 3}
replicas := []int{2, 2}
nodes, cnr := testPlacement(selectors, replicas)
// REPLICA #1
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "A")
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
// REPLICA #2
// Node_3 ip4/0.0.0.0/tcp/3
nodes[1][0].SetAttribute("ClusterName", "B")
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
// Node_4, PK - ip4/0.0.0.0/tcp/4
nodes[1][1].SetAttribute("ClusterName", "B")
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
// Node_5, PK - ip4/0.0.0.0/tcp/5
nodes[1][2].SetAttribute("ClusterName", "B")
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
sdkNode := testNode(9)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
nodesCopy := copyVectors(nodes)
m := []Metric{
NewAttributeMetric("ClusterName"),
NewAttributeMetric("UN-LOCODE"),
}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Check that nodes in the same cluster and
// in the same location should be the first in slice.
// Nodes which are follow criteria but stay outside the replica
// should be in the next slice.
next := tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "A")
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
t.Run("ec container", func(t *testing.T) {
selectors := []int{4}
ec := [][]int{{2, 1}}
nodes, cnr := testECPlacement(selectors, ec)
// Node_0, PK - ip4/0.0.0.0/tcp/0
nodes[0][0].SetAttribute("ClusterName", "A")
// Node_1, PK - ip4/0.0.0.0/tcp/1
nodes[0][1].SetAttribute("ClusterName", "A")
// Node_2, PK - ip4/0.0.0.0/tcp/2
nodes[0][2].SetAttribute("ClusterName", "B")
// Node_3, PK - ip4/0.0.0.0/tcp/3
nodes[0][3].SetAttribute("ClusterName", "B")
sdkNode := testNode(5)
sdkNode.SetAttribute("ClusterName", "B")
nodesCopy := copyVectors(nodes)
m := []Metric{NewAttributeMetric("ClusterName")}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: &sdkNode,
}),
)
require.NoError(t, err)
// Without priority metric `ClusterName` the order will be:
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
// With priority metric `ClusterName` and current node in cluster B
// the order should be:
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
next := tr.Next()
require.NotNil(t, next)
require.Equal(t, 3, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
next = tr.Next()
// The last node is
require.Equal(t, 1, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
}

View file

@ -12,10 +12,24 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc"
) )
var errNoSuitableNode = errors.New("no node was found to execute the request") var errNoSuitableNode = errors.New("no node was found to execute the request")
func relayUnary[Req any, Resp any](ctx context.Context, s *Service, ns []netmapSDK.NodeInfo, req *Req, callback func(TreeServiceClient, context.Context, *Req, ...grpc.CallOption) (*Resp, error)) (*Resp, error) {
var resp *Resp
var outErr error
err := s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = callback(c, ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
}
// forEachNode executes callback for each node in the container until true is returned. // forEachNode executes callback for each node in the container until true is returned.
// Returns errNoSuitableNode if there was no successful attempt to dial any node. // Returns errNoSuitableNode if there was no successful attempt to dial any node.
func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error { func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error {

View file

@ -122,16 +122,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
var resp *AddResponse return relayUnary(ctx, s, ns, req, (TreeServiceClient).Add)
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Add(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
} }
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)} d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
@ -174,16 +165,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
var resp *AddByPathResponse return relayUnary(ctx, s, ns, req, (TreeServiceClient).AddByPath)
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.AddByPath(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
} }
meta := protoToMeta(b.GetMeta()) meta := protoToMeta(b.GetMeta())
@ -238,16 +220,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
var resp *RemoveResponse return relayUnary(ctx, s, ns, req, (TreeServiceClient).Remove)
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Remove(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
} }
if b.GetNodeId() == pilorama.RootID { if b.GetNodeId() == pilorama.RootID {
@ -291,16 +264,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
var resp *MoveResponse return relayUnary(ctx, s, ns, req, (TreeServiceClient).Move)
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Move(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
} }
if b.GetNodeId() == pilorama.RootID { if b.GetNodeId() == pilorama.RootID {
@ -343,16 +307,7 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
var resp *GetNodeByPathResponse return relayUnary(ctx, s, ns, req, (TreeServiceClient).GetNodeByPath)
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.GetNodeByPath(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
} }
attr := b.GetPathAttribute() attr := b.GetPathAttribute()
@ -763,16 +718,7 @@ func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeList
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
var resp *TreeListResponse return relayUnary(ctx, s, ns, req, (TreeServiceClient).TreeList)
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.TreeList(ctx, req)
return outErr == nil
})
if err != nil {
return nil, err
}
return resp, outErr
} }
ids, err := s.forest.TreeList(ctx, cid) ids, err := s.forest.TreeList(ctx, cid)