From 56d09a99579a59b41f194f4c90a1dd582a9c88bb Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Fri, 7 Feb 2025 17:09:08 +0300 Subject: [PATCH] [#1640] object: Add priority metric based on geo distance Change-Id: I3a7ea4fc4807392bf50e6ff1389c61367c953074 Signed-off-by: Anton Nikiforov --- cmd/frostfs-node/config.go | 17 +- cmd/frostfs-node/config/node/config.go | 5 + config/example/node.env | 1 + config/example/node.json | 3 +- config/example/node.yaml | 1 + docs/storage-node-configuration.md | 53 ++++--- .../object_manager/placement/metrics.go | 150 +++++++++++++++++- .../placement/traverser_test.go | 49 ++++++ 8 files changed, 241 insertions(+), 38 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index b167439e05..8ceef2c314 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -247,15 +247,16 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { // Object a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c) - var pm []placement.Metric - for _, raw := range objectconfig.Get(c).Priority() { - m, err := placement.ParseMetric(raw) - if err != nil { - return err - } - pm = append(pm, m) + locodeDBPath := nodeconfig.LocodeDBPath(c) + parser, err := placement.NewMetricsParser(locodeDBPath) + if err != nil { + return fmt.Errorf("metrics parser creation: %w", err) } - a.ObjectCfg.priorityMetrics = pm + m, err := parser.ParseMetrics(objectconfig.Get(c).Priority()) + if err != nil { + return fmt.Errorf("parse metrics: %w", err) + } + a.ObjectCfg.priorityMetrics = m // Storage Engine diff --git a/cmd/frostfs-node/config/node/config.go b/cmd/frostfs-node/config/node/config.go index 969d773964..18aa254f1c 100644 --- a/cmd/frostfs-node/config/node/config.go +++ b/cmd/frostfs-node/config/node/config.go @@ -217,3 +217,8 @@ func (l PersistentPolicyRulesConfig) NoSync() bool { func CompatibilityMode(c *config.Config) bool { return config.BoolSafe(c.Sub(subsection), "kludge_compatibility_mode") } + +// LocodeDBPath returns path to LOCODE database. +func LocodeDBPath(c *config.Config) string { + return config.String(c.Sub(subsection), "locode_db_path") +} diff --git a/config/example/node.env b/config/example/node.env index dfb250341f..b501d38360 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -23,6 +23,7 @@ FROSTFS_NODE_ATTRIBUTE_1="UN-LOCODE:RU MSK" FROSTFS_NODE_RELAY=true FROSTFS_NODE_PERSISTENT_SESSIONS_PATH=/sessions FROSTFS_NODE_PERSISTENT_STATE_PATH=/state +FROSTFS_NODE_LOCODE_DB_PATH=/path/to/locode/db # Tree service section FROSTFS_TREE_ENABLED=true diff --git a/config/example/node.json b/config/example/node.json index 0b061a3d45..b02f43f600 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -37,7 +37,8 @@ }, "persistent_state": { "path": "/state" - } + }, + "locode_db_path": "/path/to/locode/db" }, "grpc": { "0": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 46e4ebdbea..ba32adb82f 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -36,6 +36,7 @@ node: path: /sessions # path to persistent session tokens file of Storage node (default: in-memory sessions) persistent_state: path: /state # path to persistent state file of Storage node + "locode_db_path": "/path/to/locode/db" grpc: - endpoint: s01.frostfs.devenv:8080 # endpoint for gRPC server diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 5fe011ece2..248b54ea4c 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -12,22 +12,23 @@ There are some custom types used for brevity: # Structure -| Section | Description | -|------------------------|---------------------------------------------------------------------| -| `logger` | [Logging parameters](#logger-section) | -| `pprof` | [PProf configuration](#pprof-section) | -| `prometheus` | [Prometheus metrics configuration](#prometheus-section) | -| `control` | [Control service configuration](#control-section) | -| `contracts` | [Override FrostFS contracts hashes](#contracts-section) | -| `morph` | [N3 blockchain client configuration](#morph-section) | -| `apiclient` | [FrostFS API client configuration](#apiclient-section) | -| `policer` | [Policer service configuration](#policer-section) | -| `replicator` | [Replicator service configuration](#replicator-section) | -| `storage` | [Storage engine configuration](#storage-section) | -| `runtime` | [Runtime configuration](#runtime-section) | -| `audit` | [Audit configuration](#audit-section) | -| `multinet` | [Multinet configuration](#multinet-section) | -| `qos` | [QoS configuration](#qos-section) | +| Section | Description | +|--------------|---------------------------------------------------------| +| `node` | [Node parameters](#node-section) | +| `logger` | [Logging parameters](#logger-section) | +| `pprof` | [PProf configuration](#pprof-section) | +| `prometheus` | [Prometheus metrics configuration](#prometheus-section) | +| `control` | [Control service configuration](#control-section) | +| `contracts` | [Override FrostFS contracts hashes](#contracts-section) | +| `morph` | [N3 blockchain client configuration](#morph-section) | +| `apiclient` | [FrostFS API client configuration](#apiclient-section) | +| `policer` | [Policer service configuration](#policer-section) | +| `replicator` | [Replicator service configuration](#replicator-section) | +| `storage` | [Storage engine configuration](#storage-section) | +| `runtime` | [Runtime configuration](#runtime-section) | +| `audit` | [Audit configuration](#audit-section) | +| `multinet` | [Multinet configuration](#multinet-section) | +| `qos` | [QoS configuration](#qos-section) | # `control` section ```yaml @@ -384,17 +385,19 @@ node: path: /sessions persistent_state: path: /state + locode_db_path: "/path/to/locode/db" ``` -| Parameter | Type | Default value | Description | -|-----------------------|---------------------------------------------------------------|---------------|-------------------------------------------------------------------------| -| `key` | `string` | | Path to the binary-encoded private key. | -| `wallet` | [Wallet config](#wallet-subsection) | | Wallet configuration. Has no effect if `key` is provided. | -| `addresses` | `[]string` | | Addresses advertised in the netmap. | -| `attribute` | `[]string` | | Node attributes as a list of key-value pairs in `:` format. | -| `relay` | `bool` | | Enable relay mode. | -| `persistent_sessions` | [Persistent sessions config](#persistent_sessions-subsection) | | Persistent session token store configuration. | -| `persistent_state` | [Persistent state config](#persistent_state-subsection) | | Persistent state configuration. | +| Parameter | Type | Default value | Description | +|-----------------------|---------------------------------------------------------------|---------------|-----------------------------------------------------------------------------------------------------| +| `key` | `string` | | Path to the binary-encoded private key. | +| `wallet` | [Wallet config](#wallet-subsection) | | Wallet configuration. Has no effect if `key` is provided. | +| `addresses` | `[]string` | | Addresses advertised in the netmap. | +| `attribute` | `[]string` | | Node attributes as a list of key-value pairs in `:` format. | +| `relay` | `bool` | | Enable relay mode. | +| `persistent_sessions` | [Persistent sessions config](#persistent_sessions-subsection) | | Persistent session token store configuration. | +| `persistent_state` | [Persistent state config](#persistent_state-subsection) | | Persistent state configuration. | +| `locode_db_path` | `string` | empty | Path to UN/LOCODE [database](https://git.frostfs.info/TrueCloudLab/frostfs-locode-db/) for FrostFS. | ## `wallet` subsection N3 wallet configuration. diff --git a/pkg/services/object_manager/placement/metrics.go b/pkg/services/object_manager/placement/metrics.go index 45e6df3397..0f24a9d96c 100644 --- a/pkg/services/object_manager/placement/metrics.go +++ b/pkg/services/object_manager/placement/metrics.go @@ -2,24 +2,90 @@ package placement import ( "errors" + "fmt" + "maps" + "math" "strings" + "sync" + "sync/atomic" + locodedb "git.frostfs.info/TrueCloudLab/frostfs-locode-db/pkg/locode/db" + locodebolt "git.frostfs.info/TrueCloudLab/frostfs-locode-db/pkg/locode/db/boltdb" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" ) const ( attrPrefix = "$attribute:" + + geoDistance = "$geoDistance" ) type Metric interface { CalculateValue(*netmap.NodeInfo, *netmap.NodeInfo) int } -func ParseMetric(raw string) (Metric, error) { - if attr, found := strings.CutPrefix(raw, attrPrefix); found { - return NewAttributeMetric(attr), nil +type metricsParser struct { + locodeDBPath string + locodes map[string]locodedb.Point +} + +type MetricParser interface { + ParseMetrics([]string) ([]Metric, error) +} + +func NewMetricsParser(locodeDBPath string) (MetricParser, error) { + return &metricsParser{ + locodeDBPath: locodeDBPath, + }, nil +} + +func (p *metricsParser) initLocodes() error { + if len(p.locodes) != 0 { + return nil } - return nil, errors.New("unsupported priority metric") + if len(p.locodeDBPath) > 0 { + p.locodes = make(map[string]locodedb.Point) + locodeDB := locodebolt.New(locodebolt.Prm{ + Path: p.locodeDBPath, + }, + locodebolt.ReadOnly(), + ) + err := locodeDB.Open() + if err != nil { + return err + } + defer locodeDB.Close() + err = locodeDB.IterateOverLocodes(func(k string, v locodedb.Point) { + p.locodes[k] = v + }) + if err != nil { + return err + } + return nil + } + return errors.New("set path to locode database") +} + +func (p *metricsParser) ParseMetrics(priority []string) ([]Metric, error) { + var metrics []Metric + for _, raw := range priority { + if attr, found := strings.CutPrefix(raw, attrPrefix); found { + metrics = append(metrics, NewAttributeMetric(attr)) + } else if raw == geoDistance { + err := p.initLocodes() + if err != nil { + return nil, err + } + if len(p.locodes) == 0 { + return nil, fmt.Errorf("provide locodes database for metric %s", raw) + } + m := NewGeoDistanceMetric(p.locodes) + metrics = append(metrics, m) + } else { + return nil, fmt.Errorf("unsupported priority metric %s", raw) + } + } + return metrics, nil } // attributeMetric describes priority metric based on attribute. @@ -41,3 +107,79 @@ func (am *attributeMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.Node func NewAttributeMetric(attr string) Metric { return &attributeMetric{attribute: attr} } + +// geoDistanceMetric describes priority metric based on attribute. +type geoDistanceMetric struct { + locodes map[string]locodedb.Point + distance *atomic.Pointer[map[string]int] + mtx sync.Mutex +} + +func NewGeoDistanceMetric(locodes map[string]locodedb.Point) Metric { + d := atomic.Pointer[map[string]int]{} + m := make(map[string]int) + d.Store(&m) + gm := &geoDistanceMetric{ + locodes: locodes, + distance: &d, + } + return gm +} + +// CalculateValue return distance in kilometers between current node and provided, +// if coordinates for provided node found. In other case return math.MaxInt. +func (gm *geoDistanceMetric) CalculateValue(from *netmap.NodeInfo, to *netmap.NodeInfo) int { + fl := from.LOCODE() + tl := to.LOCODE() + if fl == tl { + return 0 + } + m := gm.distance.Load() + if v, ok := (*m)[fl+tl]; ok { + return v + } + return gm.calculateDistance(fl, tl) +} + +func (gm *geoDistanceMetric) calculateDistance(from, to string) int { + gm.mtx.Lock() + defer gm.mtx.Unlock() + od := gm.distance.Load() + if v, ok := (*od)[from+to]; ok { + return v + } + nd := maps.Clone(*od) + var dist int + pointFrom, okFrom := gm.locodes[from] + pointTo, okTo := gm.locodes[to] + if okFrom && okTo { + dist = int(distance(pointFrom.Latitude(), pointFrom.Longitude(), pointTo.Latitude(), pointTo.Longitude())) + } else { + dist = math.MaxInt + } + nd[from+to] = dist + gm.distance.Store(&nd) + + return dist +} + +// distance return amount of KM between two points. +// Parameters are latitude and longitude of point 1 and 2 in decimal degrees. +// Original implementation can be found here https://www.geodatasource.com/developers/go. +func distance(lt1 float64, ln1 float64, lt2 float64, ln2 float64) float64 { + radLat1 := math.Pi * lt1 / 180 + radLat2 := math.Pi * lt2 / 180 + radTheta := math.Pi * (ln1 - ln2) / 180 + + dist := math.Sin(radLat1)*math.Sin(radLat2) + math.Cos(radLat1)*math.Cos(radLat2)*math.Cos(radTheta) + + if dist > 1 { + dist = 1 + } + + dist = math.Acos(dist) + dist = dist * 180 / math.Pi + dist = dist * 60 * 1.1515 * 1.609344 + + return dist +} diff --git a/pkg/services/object_manager/placement/traverser_test.go b/pkg/services/object_manager/placement/traverser_test.go index 9c825bf193..d1370f21e2 100644 --- a/pkg/services/object_manager/placement/traverser_test.go +++ b/pkg/services/object_manager/placement/traverser_test.go @@ -601,4 +601,53 @@ func TestTraverserPriorityMetrics(t *testing.T) { next = tr.Next() require.Nil(t, next) }) + + t.Run("one rep one geo metric", func(t *testing.T) { + t.Skip() + selectors := []int{2} + replicas := []int{2} + + nodes, cnr := testPlacement(selectors, replicas) + + // Node_0, PK - ip4/0.0.0.0/tcp/0 + nodes[0][0].SetAttribute("UN-LOCODE", "RU MOW") + // Node_1, PK - ip4/0.0.0.0/tcp/1 + nodes[0][1].SetAttribute("UN-LOCODE", "RU LED") + + sdkNode := testNode(2) + sdkNode.SetAttribute("UN-LOCODE", "FI HEL") + + nodesCopy := copyVectors(nodes) + + parser, err := NewMetricsParser("/path/to/locode_db") + require.NoError(t, err) + m, err := parser.ParseMetrics([]string{geoDistance}) + require.NoError(t, err) + + tr, err := NewTraverser(context.Background(), + ForContainer(cnr), + UseBuilder(&testBuilder{ + vectors: nodesCopy, + }), + WithoutSuccessTracking(), + WithPriorityMetrics(m), + WithNodeState(&nodeState{ + node: &sdkNode, + }), + ) + require.NoError(t, err) + + // Without priority metric `$geoDistance` the order will be: + // [ {Node_0 RU MOW}, {Node_1 RU LED}] + // With priority metric `$geoDistance` the order should be: + // [ {Node_1 RU LED}, {Node_0 RU MOW}] + next := tr.Next() + require.NotNil(t, next) + require.Equal(t, 2, len(next)) + require.Equal(t, "/ip4/0.0.0.0/tcp/1", string(next[0].PublicKey())) + require.Equal(t, "/ip4/0.0.0.0/tcp/0", string(next[1].PublicKey())) + + next = tr.Next() + require.Nil(t, next) + }) }