[#xx] object: Sort nodes by priority metrics to compute GET/SEARCH requests

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-10-18 16:51:49 +03:00
parent c0a2f20eee
commit 448215d936
11 changed files with 395 additions and 11 deletions

View file

@ -58,6 +58,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
@ -109,6 +110,7 @@ type applicationConfiguration struct {
ObjectCfg struct {
tombstoneLifetime uint64
priorityMetrics []placement.Metric
}
EngineCfg struct {
@ -232,6 +234,11 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
// Object
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
var pm []placement.Metric
for _, raw := range objectconfig.Get(c).Priority() {
pm = append(pm, placement.ParseMetric(raw))
}
a.ObjectCfg.priorityMetrics = pm
// Storage Engine

View file

@ -10,10 +10,17 @@ type PutConfig struct {
cfg *config.Config
}
// GetConfig is a wrapper over "get" config section which provides access
// to object get pipeline configuration of object service.
type GetConfig struct {
cfg *config.Config
}
const (
subsection = "object"
putSubsection = "put"
getSubsection = "get"
// PutPoolSizeDefault is a default value of routine pool size to
// process object.Put requests in object service.
@ -56,3 +63,16 @@ func (g PutConfig) PoolSizeLocal() int {
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
}
// Get returns structure that provides access to "get" subsection of
// "object" section.
func Get(c *config.Config) GetConfig {
return GetConfig{
c.Sub(subsection).Sub(getSubsection),
}
}
// Priority returns the value of "priority" config parameter.
func (g GetConfig) Priority() []string {
return config.StringSliceSafe(g.cfg, "priority")
}

View file

@ -174,11 +174,13 @@ func initObjectService(c *cfg) {
sPutV2 := createPutSvcV2(sPut, keyStorage)
sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
c.ObjectCfg.priorityMetrics)
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource,
c.ObjectCfg.priorityMetrics)
*c.cfgObject.getSvc = *sGet // need smth better
@ -366,7 +368,10 @@ func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service) *patchsvc.Servic
return patchsvc.NewService(sPut.Config, sGet)
}
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache, containerSource containercore.Source) *searchsvc.Service {
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache,
containerSource containercore.Source,
priorityMetrics []placement.Metric,
) *searchsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
return searchsvc.New(
@ -374,6 +379,8 @@ func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Trav
coreConstructor,
traverseGen.WithTraverseOptions(
placement.WithoutSuccessTracking(),
placement.WithPriorityMetrics(priorityMetrics),
placement.WithNodeState(c),
),
c.netMapSource,
keyStorage,
@ -389,6 +396,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
coreConstructor *cache.ClientCache,
containerSource containercore.Source,
priorityMetrics []placement.Metric,
) *getsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
@ -398,6 +406,8 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
ls,
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
placement.WithPriorityMetrics(priorityMetrics),
placement.WithNodeState(c),
),
coreConstructor,
containerSource,

View file

@ -8,9 +8,11 @@ import (
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
)
@ -30,6 +32,13 @@ func validateConfig(c *config.Config) error {
return fmt.Errorf("invalid logger destination: %w", err)
}
// validate priority metrics for GET and SEARCH requests
for _, raw := range objectconfig.Get(c).Priority() {
if err := placement.ValidateMetric(raw); err != nil {
return err
}
}
// shard configuration validation
shardNum := 0

View file

@ -87,6 +87,7 @@ FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
# Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15

View file

@ -131,6 +131,9 @@
"remote_pool_size": 100,
"local_pool_size": 200,
"skip_session_token_issuer_verification": true
},
"get": {
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
}
},
"storage": {

View file

@ -114,6 +114,10 @@ object:
remote_pool_size: 100 # number of async workers for remote PUT operations
local_pool_size: 200 # number of async workers for local PUT operations
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
get:
priority: # list of metrics of nodes for prioritization
- $attribute:ClusterName
- $attribute:UN-LOCODE
storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)

View file

@ -407,13 +407,17 @@ Contains object-service related parameters.
object:
put:
remote_pool_size: 100
get:
priority:
- $attribute:ClusterName
```
| Parameter | Type | Default value | Description |
|-----------------------------|-------|---------------|------------------------------------------------------------------------------------------------|
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
# `runtime` section
Contains runtime parameters.

View file

@ -0,0 +1,51 @@
package placement
import (
"errors"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
const (
attrPrefix = "$attribute:"
)
type Metric interface {
CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) []byte
}
func ValidateMetric(raw string) error {
if strings.HasPrefix(raw, attrPrefix) {
return nil
}
return errors.New("unsupported priority metric")
}
func ParseMetric(raw string) Metric {
if attr, found := strings.CutPrefix(raw, attrPrefix); found {
return NewAttributeMetric(attr)
}
return nil
}
// attributeMetric describes priority metric based on attribute.
type attributeMetric struct {
attribute string
}
// CalculateValue return [0] if from and to contains attribute attributeMetric.attribute and
// the value of attribute is the same. In other case return [1].
func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) []byte {
fromAttr := from.Attribute(am.attribute)
toAttr := to.Attribute(am.attribute)
if len(fromAttr) > 0 && len(toAttr) > 0 && fromAttr == toAttr {
return []byte{0}
}
return []byte{1}
}
func NewAttributeMetric(raw string) Metric {
attr, _ := strings.CutPrefix(raw, attrPrefix)
return &attributeMetric{attribute: attr}
}

View file

@ -1,10 +1,14 @@
package placement
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"slices"
"sync"
netmapAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -23,6 +27,12 @@ type Builder interface {
BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
}
// NodeState encapsulates information about current node state.
type NodeState interface {
// LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure.
LocalNodeInfo() (*netmapAPI.NodeInfo, error)
}
// Option represents placement traverser option.
type Option func(*cfg)
@ -50,6 +60,10 @@ type cfg struct {
policy netmap.PlacementPolicy
builder Builder
metrics []Metric
nodeState NodeState
}
const invalidOptsMsg = "invalid traverser options"
@ -99,7 +113,26 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
}
var rem []int
if cfg.flatSuccess != nil {
if len(cfg.metrics) > 0 {
rem = defaultCopiesVector(cfg.policy)
var unsortedVector []netmap.NodeInfo
var regularVector []netmap.NodeInfo
for i := range rem {
unsortedVector = append(unsortedVector, ns[i][:rem[i]]...)
regularVector = append(regularVector, ns[i][rem[i]:]...)
}
rem = make([]int, 2)
rem[0] = -1
rem[1] = -1
sortedVector, err := sortVector(cfg, unsortedVector)
if err != nil {
return nil, err
}
ns = make([][]netmap.NodeInfo, 2)
ns[0] = sortedVector
ns[1] = regularVector
} else if cfg.flatSuccess != nil {
ns = flatNodes(ns)
rem = []int{int(*cfg.flatSuccess)}
} else {
@ -157,6 +190,37 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
return [][]netmap.NodeInfo{flat}
}
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
metrics := make([][]byte, len(unsortedVector))
var node netmap.NodeInfo
nodeV2, err := cfg.nodeState.LocalNodeInfo()
if err != nil {
return nil, err
}
err = node.ReadFromV2(*nodeV2)
if err != nil {
return nil, err
}
b := make([]byte, 2)
for i := range unsortedVector {
for _, m := range cfg.metrics {
metrics[i] = append(metrics[i], m.CalculateValue(&node, &unsortedVector[i])...)
}
binary.LittleEndian.PutUint16(b, uint16(i))
metrics[i] = append(metrics[i], b...)
}
count := len(metrics[0]) - 2
slices.SortFunc(metrics, func(a, b []byte) int {
return bytes.Compare(a[:count], b[:count])
})
sortedVector := make([]netmap.NodeInfo, len(unsortedVector))
for i := range unsortedVector {
index := binary.LittleEndian.Uint16(metrics[i][count:])
sortedVector[i] = unsortedVector[index]
}
return sortedVector, nil
}
// Node is a descriptor of storage node with information required for intra-container communication.
type Node struct {
addresses network.AddressGroup
@ -322,3 +386,17 @@ func WithCopyNumbers(v []uint32) Option {
c.copyNumbers = v
}
}
// WithPriorityMetrics use provided priority metrics to sort nodes.
func WithPriorityMetrics(m []Metric) Option {
return func(c *cfg) {
c.metrics = m
}
}
// WithNodeState provide state of the current node.
func WithNodeState(s NodeState) Option {
return func(c *cfg) {
c.nodeState = s
}
}

View file

@ -4,6 +4,7 @@ import (
"strconv"
"testing"
netmapAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
@ -22,7 +23,9 @@ func (b testBuilder) BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([]
}
func testNode(v uint32) (n netmap.NodeInfo) {
n.SetNetworkEndpoints("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v)))
ip := "/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))
n.SetNetworkEndpoints(ip)
n.SetPublicKey([]byte(ip))
return n
}
@ -134,7 +137,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
err = n.FromIterator(netmapcore.Node(nodes[1][0]))
require.NoError(t, err)
require.Equal(t, []Node{{addresses: n}}, tr.Next())
require.Equal(t, []Node{{addresses: n, key: []byte("/ip4/0.0.0.0/tcp/5")}}, tr.Next())
})
t.Run("put scenario", func(t *testing.T) {
@ -275,3 +278,197 @@ func TestTraverserRemValues(t *testing.T) {
})
}
}
type nodeState struct {
node *netmapAPI.NodeInfo
}
func (n *nodeState) LocalNodeInfo() (*netmapAPI.NodeInfo, error) {
return n.node, nil
}
func TestTraverserPriorityMetrics(t *testing.T) {
t.Run("one rep one metric", func(t *testing.T) {
selectors := []int{4}
replicas := []int{3}
nodes, cnr := testPlacement(selectors, replicas)
nodes[0][0].SetAttribute("ClusterName", "A")
nodes[0][1].SetAttribute("ClusterName", "A")
nodes[0][2].SetAttribute("ClusterName", "B")
nodes[0][3].SetAttribute("ClusterName", "B")
sdkNode := testNode(5)
sdkNode.SetAttribute("ClusterName", "B")
nodeAPI := &netmapAPI.NodeInfo{}
sdkNode.WriteToV2(nodeAPI)
nodesCopy := copyVectors(nodes)
m := []Metric{NewAttributeMetric("$attribute:ClusterName")}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: nodeAPI,
}),
)
require.NoError(t, err)
next := tr.Next()
require.NotNil(t, next)
require.Equal(t, 3, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[2].PublicKey()))
next = tr.Next()
require.Equal(t, 1, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
t.Run("two reps two metrics", func(t *testing.T) {
selectors := []int{3, 3}
replicas := []int{2, 2}
nodes, cnr := testPlacement(selectors, replicas)
nodes[0][0].SetAttribute("ClusterName", "A")
nodes[0][0].SetAttribute("UN-LOCODE", "RU LED")
nodes[0][1].SetAttribute("ClusterName", "A")
nodes[0][1].SetAttribute("UN-LOCODE", "FI HEL")
nodes[0][2].SetAttribute("ClusterName", "A")
nodes[0][2].SetAttribute("UN-LOCODE", "RU LED")
nodes[1][0].SetAttribute("ClusterName", "B")
nodes[1][0].SetAttribute("UN-LOCODE", "RU MOW")
nodes[1][1].SetAttribute("ClusterName", "B")
nodes[1][1].SetAttribute("UN-LOCODE", "RU DME")
nodes[1][2].SetAttribute("ClusterName", "B")
nodes[1][2].SetAttribute("UN-LOCODE", "RU MOW")
sdkNode := testNode(9)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU DME")
nodeAPI := &netmapAPI.NodeInfo{}
sdkNode.WriteToV2(nodeAPI)
nodesCopy := copyVectors(nodes)
m := []Metric{
NewAttributeMetric("$attribute:ClusterName"),
NewAttributeMetric("$attribute:UN-LOCODE"),
}
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: nodeAPI,
}),
)
require.NoError(t, err)
next := tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "B")
sdkNode.SetAttribute("UN-LOCODE", "RU MOW")
nodeAPI = &netmapAPI.NodeInfo{}
sdkNode.WriteToV2(nodeAPI)
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: nodeAPI,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
sdkNode.SetAttribute("ClusterName", "A")
sdkNode.SetAttribute("UN-LOCODE", "RU LED")
nodeAPI = &netmapAPI.NodeInfo{}
sdkNode.WriteToV2(nodeAPI)
nodesCopy = copyVectors(nodes)
tr, err = NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
WithoutSuccessTracking(),
WithPriorityMetrics(m),
WithNodeState(&nodeState{
node: nodeAPI,
}),
)
require.NoError(t, err)
next = tr.Next()
require.Equal(t, 4, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[1].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/3", string(next[2].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/4", string(next[3].PublicKey()))
next = tr.Next()
require.Equal(t, 2, len(next))
require.Equal(t, "/ip4/0.0.0.0/tcp/2", string(next[0].PublicKey()))
require.Equal(t, "/ip4/0.0.0.0/tcp/5", string(next[1].PublicKey()))
next = tr.Next()
require.Nil(t, next)
})
}