[#1439] object: Sort nodes by priority metrics to compute GET request
All checks were successful
DCO action / DCO (pull_request) Successful in 1m12s
Vulncheck / Vulncheck (pull_request) Successful in 2m6s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m38s
Tests and linters / Run gofumpt (pull_request) Successful in 2m37s
Build / Build Components (pull_request) Successful in 2m49s
Tests and linters / Staticcheck (pull_request) Successful in 3m9s
Tests and linters / gopls check (pull_request) Successful in 3m12s
Tests and linters / Lint (pull_request) Successful in 3m31s
Tests and linters / Tests (pull_request) Successful in 4m51s
Tests and linters / Tests with -race (pull_request) Successful in 6m8s
All checks were successful
DCO action / DCO (pull_request) Successful in 1m12s
Vulncheck / Vulncheck (pull_request) Successful in 2m6s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m38s
Tests and linters / Run gofumpt (pull_request) Successful in 2m37s
Build / Build Components (pull_request) Successful in 2m49s
Tests and linters / Staticcheck (pull_request) Successful in 3m9s
Tests and linters / gopls check (pull_request) Successful in 3m12s
Tests and linters / Lint (pull_request) Successful in 3m31s
Tests and linters / Tests (pull_request) Successful in 4m51s
Tests and linters / Tests with -race (pull_request) Successful in 6m8s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
e17837b512
commit
df784dd93d
11 changed files with 393 additions and 10 deletions
|
@ -58,6 +58,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
||||||
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
|
@ -109,6 +110,7 @@ type applicationConfiguration struct {
|
||||||
|
|
||||||
ObjectCfg struct {
|
ObjectCfg struct {
|
||||||
tombstoneLifetime uint64
|
tombstoneLifetime uint64
|
||||||
|
priorityMetrics []placement.Metric
|
||||||
}
|
}
|
||||||
|
|
||||||
EngineCfg struct {
|
EngineCfg struct {
|
||||||
|
@ -232,6 +234,11 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
||||||
// Object
|
// Object
|
||||||
|
|
||||||
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
|
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
|
||||||
|
var pm []placement.Metric
|
||||||
|
for _, raw := range objectconfig.Get(c).Priority() {
|
||||||
|
pm = append(pm, placement.ParseMetric(raw))
|
||||||
|
}
|
||||||
|
a.ObjectCfg.priorityMetrics = pm
|
||||||
|
|
||||||
// Storage Engine
|
// Storage Engine
|
||||||
|
|
||||||
|
|
|
@ -10,10 +10,17 @@ type PutConfig struct {
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetConfig is a wrapper over "get" config section which provides access
|
||||||
|
// to object get pipeline configuration of object service.
|
||||||
|
type GetConfig struct {
|
||||||
|
cfg *config.Config
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
subsection = "object"
|
subsection = "object"
|
||||||
|
|
||||||
putSubsection = "put"
|
putSubsection = "put"
|
||||||
|
getSubsection = "get"
|
||||||
|
|
||||||
// PutPoolSizeDefault is a default value of routine pool size to
|
// PutPoolSizeDefault is a default value of routine pool size to
|
||||||
// process object.Put requests in object service.
|
// process object.Put requests in object service.
|
||||||
|
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
|
||||||
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
||||||
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get returns structure that provides access to "get" subsection of
|
||||||
|
// "object" section.
|
||||||
|
func Get(c *config.Config) GetConfig {
|
||||||
|
return GetConfig{
|
||||||
|
c.Sub(subsection).Sub(getSubsection),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Priority returns the value of "priority" config parameter.
|
||||||
|
func (g GetConfig) Priority() []string {
|
||||||
|
return config.StringSliceSafe(g.cfg, "priority")
|
||||||
|
}
|
||||||
|
|
|
@ -178,7 +178,8 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
||||||
|
|
||||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
|
||||||
|
c.ObjectCfg.priorityMetrics)
|
||||||
|
|
||||||
*c.cfgObject.getSvc = *sGet // need smth better
|
*c.cfgObject.getSvc = *sGet // need smth better
|
||||||
|
|
||||||
|
@ -389,6 +390,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
|
||||||
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
||||||
coreConstructor *cache.ClientCache,
|
coreConstructor *cache.ClientCache,
|
||||||
containerSource containercore.Source,
|
containerSource containercore.Source,
|
||||||
|
priorityMetrics []placement.Metric,
|
||||||
) *getsvc.Service {
|
) *getsvc.Service {
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
|
@ -398,6 +400,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
||||||
ls,
|
ls,
|
||||||
traverseGen.WithTraverseOptions(
|
traverseGen.WithTraverseOptions(
|
||||||
placement.SuccessAfter(1),
|
placement.SuccessAfter(1),
|
||||||
|
placement.WithPriorityMetrics(priorityMetrics),
|
||||||
|
placement.WithNodeState(c),
|
||||||
),
|
),
|
||||||
coreConstructor,
|
coreConstructor,
|
||||||
containerSource,
|
containerSource,
|
||||||
|
|
|
@ -8,9 +8,11 @@ import (
|
||||||
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
||||||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||||
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
||||||
|
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
||||||
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
|
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -30,6 +32,13 @@ func validateConfig(c *config.Config) error {
|
||||||
return fmt.Errorf("invalid logger destination: %w", err)
|
return fmt.Errorf("invalid logger destination: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validate priority metrics for GET and SEARCH requests
|
||||||
|
for _, raw := range objectconfig.Get(c).Priority() {
|
||||||
|
if err := placement.ValidateMetric(raw); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// shard configuration validation
|
// shard configuration validation
|
||||||
|
|
||||||
shardNum := 0
|
shardNum := 0
|
||||||
|
|
|
@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
||||||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||||
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
||||||
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||||
|
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
|
||||||
|
|
||||||
# Storage engine section
|
# Storage engine section
|
||||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||||
|
|
|
@ -131,6 +131,9 @@
|
||||||
"remote_pool_size": 100,
|
"remote_pool_size": 100,
|
||||||
"local_pool_size": 200,
|
"local_pool_size": 200,
|
||||||
"skip_session_token_issuer_verification": true
|
"skip_session_token_issuer_verification": true
|
||||||
|
},
|
||||||
|
"get": {
|
||||||
|
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"storage": {
|
"storage": {
|
||||||
|
|
|
@ -114,6 +114,10 @@ object:
|
||||||
remote_pool_size: 100 # number of async workers for remote PUT operations
|
remote_pool_size: 100 # number of async workers for remote PUT operations
|
||||||
local_pool_size: 200 # number of async workers for local PUT operations
|
local_pool_size: 200 # number of async workers for local PUT operations
|
||||||
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
||||||
|
get:
|
||||||
|
priority: # list of metrics of nodes for prioritization
|
||||||
|
- $attribute:ClusterName
|
||||||
|
- $attribute:UN-LOCODE
|
||||||
|
|
||||||
storage:
|
storage:
|
||||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||||
|
|
|
@ -407,13 +407,17 @@ Contains object-service related parameters.
|
||||||
object:
|
object:
|
||||||
put:
|
put:
|
||||||
remote_pool_size: 100
|
remote_pool_size: 100
|
||||||
|
get:
|
||||||
|
priority:
|
||||||
|
- $attribute:ClusterName
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
|
||||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||||
|
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
|
||||||
|
|
||||||
# `runtime` section
|
# `runtime` section
|
||||||
Contains runtime parameters.
|
Contains runtime parameters.
|
||||||
|
|
50
pkg/services/object_manager/placement/metrics.go
Normal file
50
pkg/services/object_manager/placement/metrics.go
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
package placement
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
attrPrefix = "$attribute:"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Metric interface {
|
||||||
|
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) int
|
||||||
|
}
|
||||||
|
|
||||||
|
func ValidateMetric(raw string) error {
|
||||||
|
if strings.HasPrefix(raw, attrPrefix) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errors.New("unsupported priority metric")
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseMetric(raw string) Metric {
|
||||||
|
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
|
||||||
|
return NewAttributeMetric(attr)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// attributeMetric describes priority metric based on attribute.
|
||||||
|
type attributeMetric struct {
|
||||||
|
attribute string
|
||||||
|
}
|
||||||
|
|
||||||
|
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
|
||||||
|
// the value of attribute is the same. In other case return [1].
|
||||||
|
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) int {
|
||||||
|
fromAttr := from.Attribute(am.attribute)
|
||||||
|
toAttr := to.Attribute(am.attribute)
|
||||||
|
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAttributeMetric(attr string) Metric {
|
||||||
|
return &attributeMetric{attribute: attr}
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ package placement
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
|
@ -23,6 +24,11 @@ type Builder interface {
|
||||||
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type NodeState interface {
|
||||||
|
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
|
||||||
|
LocalNodeInfo() *netmap.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
// Option represents placement traverser option.
|
// Option represents placement traverser option.
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
|
@ -50,9 +56,15 @@ type cfg struct {
|
||||||
policy netmap.PlacementPolicy
|
policy netmap.PlacementPolicy
|
||||||
|
|
||||||
builder Builder
|
builder Builder
|
||||||
|
|
||||||
|
metrics []Metric
|
||||||
|
|
||||||
|
nodeState NodeState
|
||||||
}
|
}
|
||||||
|
|
||||||
const invalidOptsMsg = "invalid traverser options"
|
const (
|
||||||
|
invalidOptsMsg = "invalid traverser options"
|
||||||
|
)
|
||||||
|
|
||||||
var errNilBuilder = errors.New("placement builder is nil")
|
var errNilBuilder = errors.New("placement builder is nil")
|
||||||
|
|
||||||
|
@ -99,7 +111,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var rem []int
|
var rem []int
|
||||||
if cfg.flatSuccess != nil {
|
if len(cfg.metrics) > 0 {
|
||||||
|
rem = defaultCopiesVector(cfg.policy)
|
||||||
|
var unsortedVector []netmap.NodeInfo
|
||||||
|
var regularVector []netmap.NodeInfo
|
||||||
|
for i := range rem {
|
||||||
|
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
|
||||||
|
regularVector = append(regularVector, ns[i][rem[i]:]...)
|
||||||
|
}
|
||||||
|
rem = []int{-1, -1}
|
||||||
|
|
||||||
|
sortedVector, err := sortVector(cfg, unsortedVector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ns = [][]netmap.NodeInfo{sortedVector, regularVector}
|
||||||
|
} else if cfg.flatSuccess != nil {
|
||||||
ns = flatNodes(ns)
|
ns = flatNodes(ns)
|
||||||
rem = []int{int(*cfg.flatSuccess)}
|
rem = []int{int(*cfg.flatSuccess)}
|
||||||
} else {
|
} else {
|
||||||
|
@ -157,6 +184,35 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
|
||||||
return [][]netmap.NodeInfo{flat}
|
return [][]netmap.NodeInfo{flat}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nodeMetrics struct {
|
||||||
|
index int
|
||||||
|
metrics []int
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
|
||||||
|
nm := make([]nodeMetrics, len(unsortedVector))
|
||||||
|
node := cfg.nodeState.LocalNodeInfo()
|
||||||
|
|
||||||
|
for i := range unsortedVector {
|
||||||
|
m := make([]int, len(cfg.metrics))
|
||||||
|
for j, pm := range cfg.metrics {
|
||||||
|
m[j] = pm.CalculateValue(node, &unsortedVector[i])
|
||||||
|
}
|
||||||
|
nm[i] = nodeMetrics{
|
||||||
|
index: i,
|
||||||
|
metrics: m,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
slices.SortFunc(nm, func(a, b nodeMetrics) int {
|
||||||
|
return slices.Compare(a.metrics, b.metrics)
|
||||||
|
})
|
||||||
|
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
|
||||||
|
for i := range unsortedVector {
|
||||||
|
sortedVector[i] = unsortedVector[nm[i].index]
|
||||||
|
}
|
||||||
|
return sortedVector, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Node is a descriptor of storage node with information required for intra-container communication.
|
// Node is a descriptor of storage node with information required for intra-container communication.
|
||||||
type Node struct {
|
type Node struct {
|
||||||
addresses network.AddressGroup
|
addresses network.AddressGroup
|
||||||
|
@ -322,3 +378,17 @@ func WithCopyNumbers(v []uint32) Option {
|
||||||
c.copyNumbers = v
|
c.copyNumbers = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithPriorityMetrics use provided priority metrics to sort nodes.
|
||||||
|
func WithPriorityMetrics(m []Metric) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.metrics = m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithNodeState provide state of the current node.
|
||||||
|
func WithNodeState(s NodeState) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.nodeState = s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -22,7 +22,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
|
||||||
}
|
}
|
||||||
|
|
||||||
func testNode(v uint32) (n netmap.NodeInfo) {
|
func testNode(v uint32) (n netmap.NodeInfo) {
|
||||||
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v)))
|
ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
|
||||||
|
n.SetNetworkEndpoints(ip)
|
||||||
|
n.SetPublicKey([]byte(ip))
|
||||||
|
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
@ -134,7 +136,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
|
||||||
err = n.FromIterator(netmapcore.Node(nodes[1][0]))
|
err = n.FromIterator(netmapcore.Node(nodes[1][0]))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, []Node{{addresses: n}}, tr.Next())
|
require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put scenario", func(t *testing.T) {
|
t.Run("put scenario", func(t *testing.T) {
|
||||||
|
@ -275,3 +277,212 @@ func TestTraverserRemValues(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nodeState struct {
|
||||||
|
node *netmap.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *nodeState) LocalNodeInfo() *netmap.NodeInfo {
|
||||||
|
return n.node
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTraverserPriorityMetrics(t *testing.T) {
|
||||||
|
t.Run("one rep one metric", func(t *testing.T) {
|
||||||
|
selectors := []int{4}
|
||||||
|
replicas := []int{3}
|
||||||
|
|
||||||
|
nodes, cnr := testPlacement(selectors, replicas)
|
||||||
|
|
||||||
|
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||||
|
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||||
|
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||||
|
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||||
|
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||||
|
nodes[0][2].SetAttribute("ClusterName", "B")
|
||||||
|
// Node_3, PK - ip4/0.0.0.0/tcp/3
|
||||||
|
nodes[0][3].SetAttribute("ClusterName", "B")
|
||||||
|
|
||||||
|
sdkNode := testNode(5)
|
||||||
|
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
|
|
||||||
|
nodesCopy := copyVectors(nodes)
|
||||||
|
|
||||||
|
m := []Metric{NewAttributeMetric("ClusterName")}
|
||||||
|
|
||||||
|
tr, err := NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: &sdkNode,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Without priority metric `ClusterName` the order will be:
|
||||||
|
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
|
||||||
|
// With priority metric `ClusterName` and current node in cluster B
|
||||||
|
// the order should be:
|
||||||
|
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
|
||||||
|
next := tr.Next()
|
||||||
|
require.NotNil(t, next)
|
||||||
|
require.Equal(t, 3, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
// The last node is
|
||||||
|
require.Equal(t, 1, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("two reps two metrics", func(t *testing.T) {
|
||||||
|
selectors := []int{3, 3}
|
||||||
|
replicas := []int{2, 2}
|
||||||
|
|
||||||
|
nodes, cnr := testPlacement(selectors, replicas)
|
||||||
|
|
||||||
|
// REPLICA #1
|
||||||
|
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||||
|
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||||
|
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
|
||||||
|
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||||
|
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||||
|
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
|
||||||
|
|
||||||
|
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||||
|
nodes[0][2].SetAttribute("ClusterName", "A")
|
||||||
|
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
|
||||||
|
// REPLICA #2
|
||||||
|
// Node_3 ip4/0.0.0.0/tcp/3
|
||||||
|
nodes[1][0].SetAttribute("ClusterName", "B")
|
||||||
|
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
|
||||||
|
// Node_4, PK - ip4/0.0.0.0/tcp/4
|
||||||
|
nodes[1][1].SetAttribute("ClusterName", "B")
|
||||||
|
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
|
||||||
|
|
||||||
|
// Node_5, PK - ip4/0.0.0.0/tcp/5
|
||||||
|
nodes[1][2].SetAttribute("ClusterName", "B")
|
||||||
|
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
|
||||||
|
sdkNode := testNode(9)
|
||||||
|
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
|
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
|
||||||
|
|
||||||
|
nodesCopy := copyVectors(nodes)
|
||||||
|
|
||||||
|
m := []Metric{
|
||||||
|
NewAttributeMetric("ClusterName"),
|
||||||
|
NewAttributeMetric("UN-LOCODE"),
|
||||||
|
}
|
||||||
|
|
||||||
|
tr, err := NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: &sdkNode,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Check that nodes in the same cluster and
|
||||||
|
// in the same location should be the first in slice.
|
||||||
|
// Nodes which are follow criteria but stay outside the replica
|
||||||
|
// should be in the next slice.
|
||||||
|
|
||||||
|
next := tr.Next()
|
||||||
|
require.Equal(t, 4, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 2, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
|
||||||
|
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
|
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
|
||||||
|
nodesCopy = copyVectors(nodes)
|
||||||
|
|
||||||
|
tr, err = NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: &sdkNode,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 4, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 2, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
|
||||||
|
sdkNode.SetAttribute("ClusterName", "A")
|
||||||
|
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
|
||||||
|
nodesCopy = copyVectors(nodes)
|
||||||
|
|
||||||
|
tr, err = NewTraverser(
|
||||||
|
ForContainer(cnr),
|
||||||
|
UseBuilder(&testBuilder{
|
||||||
|
vectors: nodesCopy,
|
||||||
|
}),
|
||||||
|
WithoutSuccessTracking(),
|
||||||
|
WithPriorityMetrics(m),
|
||||||
|
WithNodeState(&nodeState{
|
||||||
|
node: &sdkNode,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 4, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Equal(t, 2, len(next))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
|
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
|
||||||
|
next = tr.Next()
|
||||||
|
require.Nil(t, next)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue