object: Sort nodes by priority metrics to compute GET requests #1439
|
@ -58,6 +58,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
||||
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
|
@ -109,6 +110,7 @@ type applicationConfiguration struct {
|
|||
|
||||
ObjectCfg struct {
|
||||
tombstoneLifetime uint64
|
||||
priorityMetrics []placement.Metric
|
||||
}
|
||||
|
||||
EngineCfg struct {
|
||||
|
@ -232,6 +234,15 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
|||
// Object
|
||||
|
||||
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
|
||||
var pm []placement.Metric
|
||||
for _, raw := range objectconfig.Get(c).Priority() {
|
||||
m, err := placement.ParseMetric(raw)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pm = append(pm, m)
|
||||
}
|
||||
a.ObjectCfg.priorityMetrics = pm
|
||||
|
||||
// Storage Engine
|
||||
|
||||
|
@ -1179,17 +1190,15 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
|||
return pool
|
||||
}
|
||||
|
||||
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
|
||||
var res netmapV2.NodeInfo
|
||||
|
||||
func (c *cfg) LocalNodeInfo() *netmap.NodeInfo {
|
||||
var res netmap.NodeInfo
|
||||
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
||||
if ok {
|
||||
ni.WriteToV2(&res)
|
||||
res = ni
|
||||
} else {
|
||||
c.cfgNodeInfo.localInfo.WriteToV2(&res)
|
||||
res = c.cfgNodeInfo.localInfo
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
return &res
|
||||
}
|
||||
|
||||
// setContractNodeInfo rewrites local node info from the FrostFS network map.
|
||||
|
|
|
@ -10,10 +10,17 @@ type PutConfig struct {
|
|||
cfg *config.Config
|
||||
}
|
||||
|
||||
// GetConfig is a wrapper over "get" config section which provides access
|
||||
// to object get pipeline configuration of object service.
|
||||
type GetConfig struct {
|
||||
cfg *config.Config
|
||||
}
|
||||
|
||||
const (
|
||||
subsection = "object"
|
||||
|
||||
putSubsection = "put"
|
||||
getSubsection = "get"
|
||||
|
||||
// PutPoolSizeDefault is a default value of routine pool size to
|
||||
// process object.Put requests in object service.
|
||||
|
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
|
|||
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
||||
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
||||
}
|
||||
|
||||
// Get returns structure that provides access to "get" subsection of
|
||||
// "object" section.
|
||||
func Get(c *config.Config) GetConfig {
|
||||
return GetConfig{
|
||||
c.Sub(subsection).Sub(getSubsection),
|
||||
}
|
||||
}
|
||||
|
||||
// Priority returns the value of "priority" config parameter.
|
||||
func (g GetConfig) Priority() []string {
|
||||
return config.StringSliceSafe(g.cfg, "priority")
|
||||
}
|
||||
|
|
|
@ -178,7 +178,8 @@ func initObjectService(c *cfg) {
|
|||
|
||||
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
||||
|
||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
|
||||
c.ObjectCfg.priorityMetrics)
|
||||
|
||||
*c.cfgObject.getSvc = *sGet // need smth better
|
||||
|
||||
|
@ -389,6 +390,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
|
|||
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
||||
coreConstructor *cache.ClientCache,
|
||||
containerSource containercore.Source,
|
||||
priorityMetrics []placement.Metric,
|
||||
) *getsvc.Service {
|
||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
|
@ -398,6 +400,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
|||
ls,
|
||||
traverseGen.WithTraverseOptions(
|
||||
placement.SuccessAfter(1),
|
||||
placement.WithPriorityMetrics(priorityMetrics),
|
||||
placement.WithNodeState(c),
|
||||
),
|
||||
coreConstructor,
|
||||
containerSource,
|
||||
|
|
|
@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
|||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
||||
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
|
||||
|
||||
# Storage engine section
|
||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||
|
|
|
@ -131,6 +131,9 @@
|
|||
"remote_pool_size": 100,
|
||||
"local_pool_size": 200,
|
||||
"skip_session_token_issuer_verification": true
|
||||
},
|
||||
"get": {
|
||||
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
|
||||
}
|
||||
},
|
||||
"storage": {
|
||||
|
|
|
@ -114,6 +114,10 @@ object:
|
|||
remote_pool_size: 100 # number of async workers for remote PUT operations
|
||||
local_pool_size: 200 # number of async workers for local PUT operations
|
||||
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
||||
get:
|
||||
priority: # list of metrics of nodes for prioritization
|
||||
- $attribute:ClusterName
|
||||
- $attribute:UN-LOCODE
|
||||
|
||||
storage:
|
||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||
|
|
|
@ -407,13 +407,17 @@ Contains object-service related parameters.
|
|||
object:
|
||||
put:
|
||||
remote_pool_size: 100
|
||||
get:
|
||||
priority:
|
||||
- $attribute:ClusterName
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|
||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
|
||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
|
||||
|
||||
# `runtime` section
|
||||
Contains runtime parameters.
|
||||
|
|
|
@ -28,7 +28,7 @@ type executorSvc struct {
|
|||
type NodeState interface {
|
||||
// LocalNodeInfo must return current node state
|
||||
// in FrostFS API v2 NodeInfo structure.
|
||||
LocalNodeInfo() (*netmap.NodeInfo, error)
|
||||
LocalNodeInfo() *netmapSDK.NodeInfo
|
||||
|
||||
// ReadCurrentNetMap reads current local network map of the storage node
|
||||
// into the given parameter. Returns any error encountered which prevented
|
||||
|
@ -64,39 +64,15 @@ func NewExecutionService(s NodeState, v versionsdk.Version, netInfo NetworkInfo,
|
|||
|
||||
func (s *executorSvc) LocalNodeInfo(
|
||||
_ context.Context,
|
||||
req *netmap.LocalNodeInfoRequest,
|
||||
_ *netmap.LocalNodeInfoRequest,
|
||||
) (*netmap.LocalNodeInfoResponse, error) {
|
||||
verV2 := req.GetMetaHeader().GetVersion()
|
||||
if verV2 == nil {
|
||||
return nil, errors.New("missing version")
|
||||
}
|
||||
|
||||
var ver versionsdk.Version
|
||||
if err := ver.ReadFromV2(*verV2); err != nil {
|
||||
return nil, fmt.Errorf("can't read version: %w", err)
|
||||
}
|
||||
|
||||
ni, err := s.state.LocalNodeInfo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if addrNum := ni.NumberOfAddresses(); addrNum > 0 && ver.Minor() <= 7 {
|
||||
ni2 := new(netmap.NodeInfo)
|
||||
ni2.SetPublicKey(ni.GetPublicKey())
|
||||
ni2.SetState(ni.GetState())
|
||||
ni2.SetAttributes(ni.GetAttributes())
|
||||
ni.IterateAddresses(func(s string) bool {
|
||||
ni2.SetAddresses(s)
|
||||
return true
|
||||
})
|
||||
|
||||
ni = ni2
|
||||
}
|
||||
ni := s.state.LocalNodeInfo()
|
||||
var nodeInfo netmap.NodeInfo
|
||||
ni.WriteToV2(&nodeInfo)
|
||||
|
||||
body := new(netmap.LocalNodeInfoResponseBody)
|
||||
body.SetVersion(&s.version)
|
||||
body.SetNodeInfo(ni)
|
||||
body.SetNodeInfo(&nodeInfo)
|
||||
|
||||
resp := new(netmap.LocalNodeInfoResponse)
|
||||
resp.SetBody(body)
|
||||
|
|
43
pkg/services/object_manager/placement/metrics.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package placement
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
)
|
||||
|
||||
const (
|
||||
attrPrefix = "$attribute:"
|
||||
)
|
||||
|
||||
type Metric interface {
|
||||
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) int
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
}
|
||||
|
||||
func ParseMetric(raw string) (Metric, error) {
|
||||
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
|
||||
return NewAttributeMetric(attr), nil
|
||||
}
|
||||
return nil, errors.New("unsupported priority metric")
|
||||
}
|
||||
|
||||
// attributeMetric describes priority metric based on attribute.
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Honestly, I don't like separating Honestly, I don't like separating `Parse*` and `Validate*`. Parsing performs validation _automatically_.
It may bite us in unexpected places, like silent errors on SIGHUP. And now we need to be sure we always validate.
Can we stay with `ParseMetric() (Metric, error)`?
acid-ant
commented
That was intentionally, because we don't have an ability to exit gracefully when reading config - only panic is possible. That was intentionally, because we don't have an ability to exit gracefully when reading config - only [panic](https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/8b6ec57c6147e5b784d78bc891144dd55493503d/cmd/frostfs-node/config.go#L677) is possible.
fyrchik
commented
I don't see how this is related. I don't see how this is related.
If we currently panic on invalid config -- be it.
SIGHUP doesn't panic anyway.
acid-ant
commented
Updated. Updated.
|
||||
type attributeMetric struct {
|
||||
attribute string
|
||||
}
|
||||
|
||||
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
|
||||
// the value of attribute is the same. In other case return [1].
|
||||
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) int {
|
||||
fromAttr := from.Attribute(am.attribute)
|
||||
toAttr := to.Attribute(am.attribute)
|
||||
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why does an empty pair of attributes have distance 1? Why does an empty pair of attributes have distance 1?
Not that I oppose, but do we have this in the RFC?
acid-ant
commented
Yes, in RFC we have this line:
Yes, in RFC we have this line:
```
If target storage node and local storage node have the same attribute and the value of this attribute is equal, the value of priority metric is 0, in other cases - 1.
```
acid-ant
commented
Is it ok that a storage node contains an empty attribute? Is it ok that a storage node contains an empty attribute?
fyrchik
commented
It may not contain an attribute at all, that is ok. It may not contain an attribute at all, that is ok.
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func NewAttributeMetric(attr string) Metric {
|
||||
return &attributeMetric{attribute: attr}
|
||||
}
|
|
@ -3,6 +3,7 @@ package placement
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
|
@ -23,6 +24,11 @@ type Builder interface {
|
|||
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
||||
}
|
||||
|
||||
type NodeState interface {
|
||||
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
|
||||
LocalNodeInfo() *netmap.NodeInfo
|
||||
}
|
||||
|
||||
// Option represents placement traverser option.
|
||||
type Option func(*cfg)
|
||||
|
||||
|
@ -50,6 +56,10 @@ type cfg struct {
|
|||
policy netmap.PlacementPolicy
|
||||
|
||||
builder Builder
|
||||
|
||||
metrics []Metric
|
||||
|
||||
nodeState NodeState
|
||||
}
|
||||
|
||||
const invalidOptsMsg = "invalid traverser options"
|
||||
fyrchik
commented
irrelevant change irrelevant change
acid-ant
commented
Revert it. Revert it.
|
||||
|
@ -99,7 +109,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
|
|||
}
|
||||
|
||||
var rem []int
|
||||
if cfg.flatSuccess != nil {
|
||||
if len(cfg.metrics) > 0 && cfg.nodeState != nil {
|
||||
rem = defaultCopiesVector(cfg.policy)
|
||||
var unsortedVector []netmap.NodeInfo
|
||||
var regularVector []netmap.NodeInfo
|
||||
for i := range rem {
|
||||
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
|
||||
regularVector = append(regularVector, ns[i][rem[i]:]...)
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Have you checked this behaves properly with EC? Have you checked this behaves properly with EC?
acid-ant
commented
Thanks, added test for EC container. Works as expected. Thanks, added test for EC container. Works as expected.
|
||||
rem = []int{-1, -1}
|
||||
|
||||
sortedVector, err := sortVector(cfg, unsortedVector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ns = [][]netmap.NodeInfo{sortedVector, regularVector}
|
||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
I believe compile-time defined I believe compile-time defined `[]int{-1, -1}` must be better :) The same is fair for the lines below
acid-ant
commented
Fixed. Fixed.
|
||||
} else if cfg.flatSuccess != nil {
|
||||
ns = flatNodes(ns)
|
||||
rem = []int{int(*cfg.flatSuccess)}
|
||||
} else {
|
||||
|
@ -157,6 +182,35 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
|
|||
return [][]netmap.NodeInfo{flat}
|
||||
}
|
||||
|
||||
type nodeMetrics struct {
|
||||
index int
|
||||
metrics []int
|
||||
}
|
||||
|
||||
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
|
||||
nm := make([]nodeMetrics, len(unsortedVector))
|
||||
node := cfg.nodeState.LocalNodeInfo()
|
||||
|
||||
for i := range unsortedVector {
|
||||
m := make([]int, len(cfg.metrics))
|
||||
for j, pm := range cfg.metrics {
|
||||
m[j] = pm.CalculateValue(node, &unsortedVector[i])
|
||||
}
|
||||
nm[i] = nodeMetrics{
|
||||
index: i,
|
||||
metrics: m,
|
||||
}
|
||||
}
|
||||
slices.SortFunc(nm, func(a, b nodeMetrics) int {
|
||||
return slices.Compare(a.metrics, b.metrics)
|
||||
})
|
||||
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
|
||||
for i := range unsortedVector {
|
||||
sortedVector[i] = unsortedVector[nm[i].index]
|
||||
}
|
||||
return sortedVector, nil
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
There is There is `binary.LittleEndian.AppendUint16`
acid-ant
commented
Updated. Updated.
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why do we need to append an index? Why do we need to append an index?
acid-ant
commented
We need to store somewhere the relation between metric value and node. It is impossible to use slice as a key for map. We need to store somewhere the relation between metric value and node. It is impossible to use slice as a key for map.
fyrchik
commented
Can we use a struct with 2 fields then? Can we use a struct with 2 fields then?
Don't see how `[]byte` with overloaded meaning is better.
acid-ant
commented
Updated, please review. Updated, please review.
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
Could we consider to use
instead of using the suffix - honestly, I spent some time to understand these steps Could we consider to use
```go
type nodeMetrics struct
index int
metrics []byte
}
for i := range unsortedVector {
nm := nodeMetrics{index: i}
for _, m := range cfg.metrics {
nm.metrics = append(nm.metrics, m.CalculateValue(&node, &unsortedVector[i])...)
}
nms = append(nms, nm)
}
```
instead of using the suffix - honestly, I spent some time to understand these steps
acid-ant
commented
Yes, I'll rewrite this part to be more readable and simpler, thanks. Yes, I'll rewrite this part to be more readable and simpler, thanks.
acid-ant
commented
Updated. Updated.
|
||||
|
||||
// Node is a descriptor of storage node with information required for intra-container communication.
|
||||
type Node struct {
|
||||
addresses network.AddressGroup
|
||||
|
@ -322,3 +376,17 @@ func WithCopyNumbers(v []uint32) Option {
|
|||
c.copyNumbers = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithPriorityMetrics use provided priority metrics to sort nodes.
|
||||
func WithPriorityMetrics(m []Metric) Option {
|
||||
return func(c *cfg) {
|
||||
c.metrics = m
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
There is only usage in tests. Is it ok? There is only usage in tests. Is it ok?
acid-ant
commented
It is used when GET service initialized. It is used when GET service initialized.
https://git.frostfs.info/TrueCloudLab/frostfs-node/pulls/1439/files#diff-c477b7caf5394d02ccd1310a83d619293d7de526
|
||||
}
|
||||
}
|
||||
|
||||
// WithNodeState provide state of the current node.
|
||||
func WithNodeState(s NodeState) Option {
|
||||
return func(c *cfg) {
|
||||
c.nodeState = s
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
|
|||||
}
|
||||||
|
||||||
func testNode(v uint32) (n netmap.NodeInfo) {
|
||||||
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v)))
|
||||||
ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
|
||||||
n.SetNetworkEndpoints(ip)
|
||||||
n.SetPublicKey([]byte(ip))
|
||||||
|
||||||
return n
|
||||||
}
|
||||||
|
@ -40,7 +42,15 @@ func copyVectors(v [][]netmap.NodeInfo) [][]netmap.NodeInfo {
|
|||||
return vc
|
||||||
}
|
||||||
|
||||||
func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
||||||
func testPlacement(ss []int, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
||||||
fyrchik
commented
Could you explain, what is the reasoning behing this refactoring? Could you explain, what is the reasoning behing this refactoring?
Replica is a number, placement policy contains a list of replicas. I don't see why do we need a 2-dimensional list here.
acid-ant
commented
That was done for EC container. Reduce amount of changes. That was done for EC container. Reduce amount of changes.
fyrchik
commented
I don't like such overloading, it overcomplicates the code for no real reason. I don't like such overloading, it overcomplicates the code for no real reason.
So we have `rs` (replicas) 2-dimensional slice, but it is really 1-dimensional, but each slice can contain only 1 or 2 elements and if it contains 2, it is suddenly a EC policy, not a replica.
`testECPlacement` is not that hard to write and common parts can be reused.
acid-ant
commented
You are right, changes were reverted yesterday, please review. You are right, changes were reverted yesterday, please review.
|
||||||
return placement(ss, rs, nil)
|
||||||
}
|
||||||
|
||||||
func testECPlacement(ss []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
|
||||||
return placement(ss, nil, ec)
|
||||||
}
|
||||||
|
||||||
func placement(ss []int, rs []int, ec [][]int) ([][]netmap.NodeInfo, container.Container) {
|
||||||
nodes := make([][]netmap.NodeInfo, 0, len(rs))
|
||||||
replicas := make([]netmap.ReplicaDescriptor, 0, len(rs))
|
||||||
num := uint32(0)
|
||||||
|
@ -56,7 +66,12 @@ func testPlacement(ss, rs []int) ([][]netmap.NodeInfo, container.Container) {
|
|||||
nodes = append(nodes, ns)
|
||||||
|
||||||
var rd netmap.ReplicaDescriptor
|
||||||
rd.SetNumberOfObjects(uint32(rs[i]))
|
||||||
if len(rs) > 0 {
|
||||||
rd.SetNumberOfObjects(uint32(rs[i]))
|
||||||
} else {
|
||||||
rd.SetECDataCount(uint32(ec[i][0]))
|
||||||
rd.SetECParityCount(uint32(ec[i][1]))
|
||||||
}
|
||||||
|
||||||
replicas = append(replicas, rd)
|
||||||
}
|
||||||
|
@ -134,7 +149,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
|
|||||
err = n.FromIterator(netmapcore.Node(nodes[1][0]))
|
||||||
require.NoError(t, err)
|
||||||
|
||||||
require.Equal(t, []Node{{addresses: n}}, tr.Next())
|
||||||
require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
|
||||||
})
|
||||||
|
||||||
t.Run("put scenario", func(t *testing.T) {
|
||||||
|
@ -275,3 +290,268 @@ func TestTraverserRemValues(t *testing.T) {
|
|||||
})
|
||||||
}
|
||||||
}
|
||||||
|
||||||
type nodeState struct {
|
||||||
node *netmap.NodeInfo
|
||||||
}
|
||||||
|
||||||
func (n *nodeState) LocalNodeInfo() *netmap.NodeInfo {
|
||||||
return n.node
|
||||||
}
|
||||||
|
||||||
func TestTraverserPriorityMetrics(t *testing.T) {
|
||||||
t.Run("one rep one metric", func(t *testing.T) {
|
||||||
selectors := []int{4}
|
||||||
replicas := []int{3}
|
||||||
|
||||||
nodes, cnr := testPlacement(selectors, replicas)
|
||||||
|
||||||
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||||
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||||
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||||
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||||
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||||
nodes[0][2].SetAttribute("ClusterName", "B")
|
||||||
// Node_3, PK - ip4/0.0.0.0/tcp/3
|
||||||
nodes[0][3].SetAttribute("ClusterName", "B")
|
||||||
|
||||||
sdkNode := testNode(5)
|
||||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
You provide this interface each time the traverser is created. You provide this interface each time the traverser is created.
Why have you decided to use an interface instead of directly providing local node info?
acid-ant
commented
Because we are changing this value at each epoch. Also, that will be helpful when we have an ability to change the list of the storage node attributes in runtime. Because we are changing this value at each epoch. Also, that will be helpful when we have an ability to change the list of the storage node attributes in runtime.
fyrchik
commented
New traverser is created for each request, so we would rather like this value not to be changed inside the traverser. New traverser is created for each request, so we would rather like this value _not_ to be changed inside the traverser.
acid-ant
commented
For each call of For each [call](https://git.frostfs.info/TrueCloudLab/frostfs-node/commit/e17837b51289980653d707cadf9c2d277aee21c0#diff-6fd77bc0dc6ba5fa64f50dad79b0be734983db72) of `LocalNodeInfo()` we are creating copy of `NodeInfo`.
If we want to reject using of interface in traverser we need to check [here](https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/8b6ec57c6147e5b784d78bc891144dd55493503d/pkg/services/object/util/placement.go#L125) that we need to set the value for local node info, it looks ugly.
fyrchik
commented
What do you mean? We have What do you mean? We have `g.cnrSrc.Get()` and `g.netMapSrc.GetNetMapByEpoch()`, I suggest having `g.nodeInfoSrc.Get()` along the lines.
acid-ant
commented
When we create When we create `traverser` there is no flag which indicates that we need to set local node info or not, because we set priority metrics via a list of options. I thought we don't need to set it always for each traverser.
|
||||||
|
||||||
nodesCopy := copyVectors(nodes)
|
||||||
|
||||||
m := []Metric{NewAttributeMetric("ClusterName")}
|
||||||
|
||||||
tr, err := NewTraverser(
|
||||||
ForContainer(cnr),
|
||||||
UseBuilder(&testBuilder{
|
||||||
fyrchik
commented
This This `2, 0, 1` seems completely random. What is the reason for this exact sequence?
acid-ant
commented
Added comments. Added comments.
|
||||||
vectors: nodesCopy,
|
||||||
}),
|
||||||
WithoutSuccessTracking(),
|
||||||
WithPriorityMetrics(m),
|
||||||
WithNodeState(&nodeState{
|
||||||
node: &sdkNode,
|
||||||
fyrchik
commented
That's what I was talking about -- we build the test based on the HRW result (after we know it) Line 336 in TrueCloudLab/frostfs-sdk-go@79f3873
We just need to check that first nodes are ordered by clustername and the rest of them are equal to the traverser result without node state. Not important, feel free to dismiss. That's what I was talking about -- we build the test based on the HRW result (after we know it)
What about sth similar that you have in the SDK?
https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/commit/79f387317a1bbd53ae41b97bbfaad3a5fc9c8553/netmap/selector_test.go#L336
We just need to check that first nodes are ordered by clustername and the rest of them are equal to the traverser result without node state.
Not important, feel free to dismiss.
acid-ant
commented
`Next` return slice of structs, which contains only addresses and key. There are no way to check attributes.
|
||||||
}),
|
||||||
)
|
||||||
require.NoError(t, err)
|
||||||
|
||||||
// Without priority metric `ClusterName` the order will be:
|
||||||
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
|
||||||
// With priority metric `ClusterName` and current node in cluster B
|
||||||
// the order should be:
|
||||||
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
|
||||||
next := tr.Next()
|
||||||
require.NotNil(t, next)
|
||||||
require.Equal(t, 3, len(next))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
|
||||||
|
||||||
next = tr.Next()
|
||||||
// The last node is
|
||||||
require.Equal(t, 1, len(next))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||||
|
||||||
next = tr.Next()
|
||||||
require.Nil(t, next)
|
||||||
})
|
||||||
|
||||||
t.Run("two reps two metrics", func(t *testing.T) {
|
||||||
selectors := []int{3, 3}
|
||||||
replicas := []int{2, 2}
|
||||||
|
||||||
nodes, cnr := testPlacement(selectors, replicas)
|
||||||
|
||||||
// REPLICA #1
|
||||||
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||||
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||||
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
||||||
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||||
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||||
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
|
||||||
|
||||||
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||||
nodes[0][2].SetAttribute("ClusterName", "A")
|
||||||
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
||||||
// REPLICA #2
|
||||||
// Node_3 ip4/0.0.0.0/tcp/3
|
||||||
nodes[1][0].SetAttribute("ClusterName", "B")
|
||||||
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
||||||
// Node_4, PK - ip4/0.0.0.0/tcp/4
|
||||||
nodes[1][1].SetAttribute("ClusterName", "B")
|
||||||
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
|
||||||
|
||||||
// Node_5, PK - ip4/0.0.0.0/tcp/5
|
||||||
nodes[1][2].SetAttribute("ClusterName", "B")
|
||||||
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
||||||
sdkNode := testNode(9)
|
||||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
|
||||||
|
||||||
nodesCopy := copyVectors(nodes)
|
||||||
|
||||||
m := []Metric{
|
||||||
NewAttributeMetric("ClusterName"),
|
||||||
NewAttributeMetric("UN-LOCODE"),
|
||||||
}
|
||||||
|
||||||
tr, err := NewTraverser(
|
||||||
ForContainer(cnr),
|
||||||
UseBuilder(&testBuilder{
|
||||||
vectors: nodesCopy,
|
||||||
}),
|
||||||
WithoutSuccessTracking(),
|
||||||
WithPriorityMetrics(m),
|
||||||
WithNodeState(&nodeState{
|
||||||
node: &sdkNode,
|
||||||
}),
|
||||||
)
|
||||||
require.NoError(t, err)
|
||||||
|
||||||
// Check that nodes in the same cluster and
|
||||||
// in the same location should be the first in slice.
|
||||||
// Nodes which are follow criteria but stay outside the replica
|
||||||
// should be in the next slice.
|
||||||
|
||||||
next := tr.Next()
|
||||||
require.Equal(t, 4, len(next))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||||
|
||||||
next = tr.Next()
|
||||||
require.Equal(t, 2, len(next))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
||||||
next = tr.Next()
|
||||||
require.Nil(t, next)
|
||||||
|
||||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
|
||||||
|
||||||
nodesCopy = copyVectors(nodes)
|
||||||
|
||||||
tr, err = NewTraverser(
|
||||||
ForContainer(cnr),
|
||||||
UseBuilder(&testBuilder{
|
||||||
vectors: nodesCopy,
|
||||||
}),
|
||||||
WithoutSuccessTracking(),
|
||||||
WithPriorityMetrics(m),
|
||||||
WithNodeState(&nodeState{
|
||||||
node: &sdkNode,
|
||||||
}),
|
||||||
)
|
||||||
require.NoError(t, err)
|
||||||
|
||||||
next = tr.Next()
|
||||||
require.Equal(t, 4, len(next))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
|
||||||
|
||||||
next = tr.Next()
|
||||||
require.Equal(t, 2, len(next))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
||||||
next = tr.Next()
|
||||||
require.Nil(t, next)
|
||||||
|
||||||
sdkNode.SetAttribute("ClusterName", "A")
|
||||||
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
|
||||||
|
||||||
nodesCopy = copyVectors(nodes)
|
||||||
|
||||||
tr, err = NewTraverser(
|
||||||
ForContainer(cnr),
|
||||||
UseBuilder(&testBuilder{
|
||||||
vectors: nodesCopy,
|
||||||
}),
|
||||||
WithoutSuccessTracking(),
|
||||||
WithPriorityMetrics(m),
|
||||||
WithNodeState(&nodeState{
|
||||||
node: &sdkNode,
|
||||||
}),
|
||||||
)
|
||||||
require.NoError(t, err)
|
||||||
|
||||||
next = tr.Next()
|
||||||
require.Equal(t, 4, len(next))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
|
||||||
|
||||||
next = tr.Next()
|
||||||
require.Equal(t, 2, len(next))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
|
||||||
|
||||||
next = tr.Next()
|
||||||
require.Nil(t, next)
|
||||||
})
|
||||||
|
||||||
t.Run("ec container", func(t *testing.T) {
|
||||||
selectors := []int{4}
|
||||||
ec := [][]int{{2, 1}}
|
||||||
|
||||||
nodes, cnr := testECPlacement(selectors, ec)
|
||||||
|
||||||
// Node_0, PK - ip4/0.0.0.0/tcp/0
|
||||||
nodes[0][0].SetAttribute("ClusterName", "A")
|
||||||
// Node_1, PK - ip4/0.0.0.0/tcp/1
|
||||||
nodes[0][1].SetAttribute("ClusterName", "A")
|
||||||
// Node_2, PK - ip4/0.0.0.0/tcp/2
|
||||||
nodes[0][2].SetAttribute("ClusterName", "B")
|
||||||
// Node_3, PK - ip4/0.0.0.0/tcp/3
|
||||||
nodes[0][3].SetAttribute("ClusterName", "B")
|
||||||
|
||||||
sdkNode := testNode(5)
|
||||||
sdkNode.SetAttribute("ClusterName", "B")
|
||||||
|
||||||
nodesCopy := copyVectors(nodes)
|
||||||
|
||||||
m := []Metric{NewAttributeMetric("ClusterName")}
|
||||||
|
||||||
tr, err := NewTraverser(
|
||||||
ForContainer(cnr),
|
||||||
UseBuilder(&testBuilder{
|
||||||
vectors: nodesCopy,
|
||||||
}),
|
||||||
WithoutSuccessTracking(),
|
||||||
WithPriorityMetrics(m),
|
||||||
WithNodeState(&nodeState{
|
||||||
node: &sdkNode,
|
||||||
}),
|
||||||
)
|
||||||
require.NoError(t, err)
|
||||||
|
||||||
// Without priority metric `ClusterName` the order will be:
|
||||||
// [ {Node_0 A}, {Node_1 A}, {Node_2 B}, {Node_3 B}]
|
||||||
// With priority metric `ClusterName` and current node in cluster B
|
||||||
// the order should be:
|
||||||
// [ {Node_2 B}, {Node_0 A}, {Node_1 A}, {Node_3 B}]
|
||||||
next := tr.Next()
|
||||||
require.NotNil(t, next)
|
||||||
require.Equal(t, 3, len(next))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
|
||||||
|
||||||
next = tr.Next()
|
||||||
// The last node is
|
||||||
require.Equal(t, 1, len(next))
|
||||||
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
|
||||||
|
||||||
next = tr.Next()
|
||||||
require.Nil(t, next)
|
||||||
})
|
||||||
}
|
||||||
|
|
What does the resulting type represent?
I expect metric to be a number (int or float)
The idea was to use already existed api for sort instead of writing the custom for comparing
[]int/float
lexicographically. ForgeoDistance
metric, just convert distance touint32
and then to[]byte
. The same is for any other metrics with one thing in mind - need to compare the resulting[]byte
lexicographically.It is not that hard to implement. And we have
slices.Compare
.Thanks, fixed.