Source-based routing support #1422

Merged
dstepanov-yadro merged 11 commits from dstepanov-yadro/frostfs-node:feat/multinet into master 2024-10-17 13:15:09 +00:00
32 changed files with 728 additions and 107 deletions

View file

@ -48,6 +48,8 @@ func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("node.kludge_compatibility_mode", false)
cfg.SetDefault("audit.enabled", false)
setMultinetDefaults(cfg)
}
func setControlDefaults(cfg *viper.Viper) {
@ -131,3 +133,11 @@ func setMorphDefaults(cfg *viper.Viper) {
cfg.SetDefault("morph.validators", []string{})
cfg.SetDefault("morph.switch_interval", 2*time.Minute)
}
func setMultinetDefaults(cfg *viper.Viper) {
cfg.SetDefault("multinet.enabled", false)
cfg.SetDefault("multinet.balancer", "")
cfg.SetDefault("multinet.restrict", false)
cfg.SetDefault("multinet.fallback_delay", "0s")
cfg.SetDefault("multinet.subnets", "")
fyrchik marked this conversation as resolved Outdated

Shouldn't it be an empty list?

Shouldn't it be an empty list?

It works so. To specify empty list it is required to add Subnet type to IR config, but this is only required to set defaults.

It works so. To specify empty list it is required to add `Subnet` type to IR config, but this is only required to set defaults.

I've meant "[]", but if it already works, ok.

I've meant `"[]"`, but if it already works, ok.
}

View file

@ -26,12 +26,14 @@ import (
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/multinet"
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
@ -436,6 +438,8 @@ type shared struct {
metricsCollector *metrics.NodeMetrics
metricsSvc *objectService.MetricCollector
dialerSource *internalNet.DialerSource
}
// dynamicConfiguration stores parameters of the
@ -760,12 +764,18 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
fatalOnErr(err)
nodeMetrics := metrics.NewNodeMetrics()
ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg, nodeMetrics.MultinetMetrics()))
fatalOnErr(err)
cacheOpts := cache.ClientCacheOpts{
DialTimeout: apiclientconfig.DialTimeout(appCfg),
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
Key: &key.PrivateKey,
AllowExternal: apiclientconfig.AllowExternal(appCfg),
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
DialerSource: ds,
}
return shared{
@ -777,10 +787,29 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
bgClientCache: cache.NewSDKClientCache(cacheOpts),
putClientCache: cache.NewSDKClientCache(cacheOpts),
persistate: persistate,
metricsCollector: metrics.NewNodeMetrics(),
metricsCollector: nodeMetrics,
dialerSource: ds,
}
}
func internalNetConfig(appCfg *config.Config, m metrics.MultinetMetrics) internalNet.Config {
result := internalNet.Config{
Enabled: multinet.Enabled(appCfg),
Balancer: multinet.Balancer(appCfg),
Restrict: multinet.Restrict(appCfg),
FallbackDelay: multinet.FallbackDelay(appCfg),
Metrics: m,
}
sn := multinet.Subnets(appCfg)
for _, s := range sn {
result.Subnets = append(result.Subnets, internalNet.Subnet{
Prefix: s.Mask,
SourceIPs: s.SourceIPs,
})
}
return result
}
func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) cfgNetmap {
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
@ -1336,6 +1365,11 @@ func (c *cfg) reloadConfig(ctx context.Context) {
}
}
if err := c.dialerSource.Update(internalNetConfig(c.appCfg, c.metricsCollector.MultinetMetrics())); err != nil {
c.log.Error(logs.FailedToUpdateMultinetConfiguration, zap.Error(err))
return
}
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
}

View file

@ -0,0 +1,62 @@
package multinet
import (
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
)
const (
subsection = "multinet"
FallbackDelayDefault = 300 * time.Millisecond
)
// Enabled returns the value of "enabled" config parameter from "multinet" section.
func Enabled(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "enabled")
}
type Subnet struct {
Mask string
SourceIPs []string
}
// Subnets returns the value of "subnets" config parameter from "multinet" section.
func Subnets(c *config.Config) []Subnet {
var result []Subnet
sub := c.Sub(subsection).Sub("subnets")
for i := 0; ; i++ {
s := sub.Sub(strconv.FormatInt(int64(i), 10))
mask := config.StringSafe(s, "mask")
if mask == "" {
break
}
sourceIPs := config.StringSliceSafe(s, "source_ips")
result = append(result, Subnet{
Mask: mask,
SourceIPs: sourceIPs,
})
}
return result
}
// Balancer returns the value of "balancer" config parameter from "multinet" section.
func Balancer(c *config.Config) string {
return config.StringSafe(c.Sub(subsection), "balancer")
}
// Restrict returns the value of "restrict" config parameter from "multinet" section.
func Restrict(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "restrict")
}
// FallbackDelay returns the value of "fallback_delay" config parameter from "multinet" section.
func FallbackDelay(c *config.Config) time.Duration {
fd := config.DurationSafe(c.Sub(subsection), "fallback_delay")
if fd != 0 { // negative value means no fallback
return fd
}
return FallbackDelayDefault
}

View file

@ -0,0 +1,52 @@
package multinet
import (
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
"github.com/stretchr/testify/require"
)
func TestMultinetSection(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
empty := configtest.EmptyConfig()
require.Equal(t, false, Enabled(empty))
require.Equal(t, ([]Subnet)(nil), Subnets(empty))
require.Equal(t, "", Balancer(empty))
require.Equal(t, false, Restrict(empty))
require.Equal(t, FallbackDelayDefault, FallbackDelay(empty))
})
const path = "../../../../config/example/node"
fileConfigTest := func(c *config.Config) {
require.Equal(t, true, Enabled(c))
require.Equal(t, []Subnet{
{
Mask: "192.168.219.174/24",
SourceIPs: []string{
"192.168.218.185",
"192.168.219.185",
},
},
{
Mask: "10.78.70.74/24",
SourceIPs: []string{
"10.78.70.185",
"10.78.71.185",
},
},
}, Subnets(c))
require.Equal(t, "roundrobin", Balancer(c))
require.Equal(t, false, Restrict(c))
require.Equal(t, 350*time.Millisecond, FallbackDelay(c))
}
configtest.ForEachFileType(path, fileConfigTest)
t.Run("ENV", func(t *testing.T) {
configtest.ForEnvFileType(t, path, fileConfigTest)
})
}

View file

@ -29,51 +29,12 @@ const (
)
func initMorphComponents(ctx context.Context, c *cfg) {
addresses := morphconfig.RPCEndpoint(c.appCfg)
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
// order of endpoints with the same priority.
rand.Shuffle(len(addresses), func(i, j int) {
addresses[i], addresses[j] = addresses[j], addresses[i]
})
cli, err := client.New(ctx,
c.key,
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
client.WithLogger(c.log),
client.WithMetrics(c.metricsCollector.MorphClientMetrics()),
client.WithEndpoints(addresses...),
client.WithConnLostCallback(func() {
c.internalErr <- errors.New("morph connection has been lost")
}),
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()),
)
if err != nil {
c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient,
zap.Any("endpoints", addresses),
zap.String("error", err.Error()),
)
fatalOnErr(err)
}
c.onShutdown(func() {
c.log.Info(logs.FrostFSNodeClosingMorphComponents)
cli.Close()
})
if err := cli.SetGroupSignerScope(); err != nil {
c.log.Info(logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err))
}
c.cfgMorph.client = cli
c.cfgMorph.notaryEnabled = cli.ProbeNotary()
initMorphClient(ctx, c)
lookupScriptHashesInNNS(c) // smart contract auto negotiation
if c.cfgMorph.notaryEnabled {
err = c.cfgMorph.client.EnableNotarySupport(
err := c.cfgMorph.client.EnableNotarySupport(
client.WithProxyContract(
c.cfgMorph.proxyScriptHash,
),
@ -111,6 +72,50 @@ func initMorphComponents(ctx context.Context, c *cfg) {
c.cfgNetmap.wrapper = wrap
}
func initMorphClient(ctx context.Context, c *cfg) {
addresses := morphconfig.RPCEndpoint(c.appCfg)
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
// order of endpoints with the same priority.
rand.Shuffle(len(addresses), func(i, j int) {
addresses[i], addresses[j] = addresses[j], addresses[i]
})
cli, err := client.New(ctx,
c.key,
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
client.WithLogger(c.log),
client.WithMetrics(c.metricsCollector.MorphClientMetrics()),
client.WithEndpoints(addresses...),
client.WithConnLostCallback(func() {
c.internalErr <- errors.New("morph connection has been lost")
}),
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()),
client.WithDialerSource(c.dialerSource),
)
if err != nil {
c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient,
zap.Any("endpoints", addresses),
zap.String("error", err.Error()),
)
fatalOnErr(err)
}
c.onShutdown(func() {
c.log.Info(logs.FrostFSNodeClosingMorphComponents)
cli.Close()
})
if err := cli.SetGroupSignerScope(); err != nil {
c.log.Info(logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err))
}
c.cfgMorph.client = cli
c.cfgMorph.notaryEnabled = cli.ProbeNotary()
}
func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
// skip notary deposit in non-notary environments
if !c.cfgMorph.notaryEnabled {

View file

@ -67,6 +67,7 @@ func initTreeService(c *cfg) {
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
tree.WithAPEMorphRuleStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage()),
tree.WithNetmapState(c.cfgNetmap.state),
tree.WithDialerSource(c.dialerSource),
)
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {

View file

@ -80,3 +80,12 @@ FROSTFS_IR_PPROF_MUTEX_RATE=10000
FROSTFS_IR_PROMETHEUS_ENABLED=true
FROSTFS_IR_PROMETHEUS_ADDRESS=localhost:9090
FROSTFS_IR_PROMETHEUS_SHUTDOWN_TIMEOUT=30s
FROSTFS_MULTINET_ENABLED=true
FROSTFS_MULTINET_SUBNETS_0_MASK="192.168.219.174/24"
FROSTFS_MULTINET_SUBNETS_0_SOURCE_IPS="192.168.218.185 192.168.219.185"
FROSTFS_MULTINET_SUBNETS_1_MASK="10.78.70.74/24"
FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185"
FROSTFS_MULTINET_BALANCER=roundrobin
FROSTFS_MULTINET_RESTRICT=false
FROSTFS_MULTINET_FALLBACK_DELAY=350ms

View file

@ -123,3 +123,18 @@ prometheus:
systemdnotify:
enabled: true
multinet:
enabled: true
subnets:
- mask: 192.168.219.174/24
source_ips:
- 192.168.218.185
- 192.168.219.185
- mask: 10.78.70.74/24
source_ips:
- 10.78.70.185
- 10.78.71.185
balancer: roundrobin
restrict: false
fallback_delay: 350ms

View file

@ -206,3 +206,13 @@ FROSTFS_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
# AUDIT section
FROSTFS_AUDIT_ENABLED=true
# MULTINET section
FROSTFS_MULTINET_ENABLED=true
FROSTFS_MULTINET_SUBNETS_0_MASK="192.168.219.174/24"
FROSTFS_MULTINET_SUBNETS_0_SOURCE_IPS="192.168.218.185 192.168.219.185"
FROSTFS_MULTINET_SUBNETS_1_MASK="10.78.70.74/24"
FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185"
FROSTFS_MULTINET_BALANCER=roundrobin
FROSTFS_MULTINET_RESTRICT=false
FROSTFS_MULTINET_FALLBACK_DELAY=350ms

View file

@ -264,5 +264,27 @@
},
"audit": {
"enabled": true
},
"multinet": {
"enabled": true,
"subnets": [
{
"mask": "192.168.219.174/24",
"source_ips": [
"192.168.218.185",
"192.168.219.185"
]
},
{
"mask": "10.78.70.74/24",
"source_ips":[
"10.78.70.185",
"10.78.71.185"
]
}
],
"balancer": "roundrobin",
"restrict": false,
"fallback_delay": "350ms"
}
}

View file

@ -240,3 +240,18 @@ runtime:
audit:
enabled: true
multinet:
enabled: true
subnets:
- mask: 192.168.219.174/24
source_ips:
- 192.168.218.185
- 192.168.219.185
- mask: 10.78.70.74/24
source_ips:
- 10.78.70.185
- 10.78.71.185
balancer: roundrobin
restrict: false
fallback_delay: 350ms

View file

@ -25,8 +25,8 @@ There are some custom types used for brevity:
| `replicator` | [Replicator service configuration](#replicator-section) |
| `storage` | [Storage engine configuration](#storage-section) |
| `runtime` | [Runtime configuration](#runtime-section) |
| `audit` | [Audit configuration](#audit-section) |
| `audit` | [Audit configuration](#audit-section) |
| `multinet` | [Multinet configuration](#multinet-section) |
# `control` section
```yaml
@ -435,6 +435,35 @@ audit:
enabled: true
```
| Parameter | Type | Default value | Description |
|---------------------|--------|---------------|---------------------------------------------------|
| `soft_memory_limit` | `bool` | false | If `true` then audit event logs will be recorded. |
| Parameter | Type | Default value | Description |
|-----------|--------|---------------|---------------------------------------------------|
| `enabled` | `bool` | false | If `true` then audit event logs will be recorded. |
# `multinet` section
Contains multinet parameters.
```yaml
multinet:
enabled: true
subnets:
- mask: 192.168.219.174/24
source_ips:
- 192.168.218.185
- 192.168.219.185
- mask: 10.78.70.74/24
source_ips:
- 10.78.70.185
- 10.78.71.185
balancer: roundrobin
restrict: false
fallback_delay: 350ms
```
| Parameter | Type | Default value | Description |
| ---------------- | ---------- | ------------- | -------------------------------------------------------------------------------------------------------------------------- |
| `enabled` | `bool` | false | If `true` then source-based routing is enabled. |
| `subnets` | `subnet` | empty | Resulting subnets. |
| `balancer` | `string` | "" | Balancer to select network interfaces, allowed values are "" (no balancing, use first suitable interface) or "roundrobin". |
| `restrict` | `bool` | false | If `true` then any requests that do not match `subnets` will fail. |
| `fallback_delay` | `duration` | 350ms | Delay before fallback to secondary IP addresses in case of hostname resolve. |

3
go.mod
View file

@ -11,6 +11,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
@ -132,4 +133,4 @@ require (
rsc.io/tmplfunc v0.0.3 // indirect
)
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07

6
go.sum
View file

@ -14,8 +14,10 @@ git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509/go.mod h1:jmb7yxzZota9jWbC10p+7YR+6wwJPBj7J/Fl5VDkXys=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928 h1:LK3mCkNZkY48eBA9jnk1N0eQZLsZhOG+XYw4EBoKUjM=
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg=
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI=
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07 h1:gPaqGsk6gSWQyNVjaStydfUz6Z/loHc9XyvGrJ5qSPY=
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 h1:vgbfkcnIexZUm3vREBBSa/Gv1Whjd1SFCUd0A+IaGPQ=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA=
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=

View file

@ -523,4 +523,5 @@ const (
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty"
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
WritecacheCantGetObject = "can't get an object from fstree"
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
)

View file

@ -22,6 +22,7 @@ const (
grpcServerSubsystem = "grpc_server"
policerSubsystem = "policer"
commonCacheSubsystem = "common_cache"
multinetSubsystem = "multinet"
successLabel = "success"
shardIDLabel = "shard_id"
@ -41,6 +42,7 @@ const (
endpointLabel = "endpoint"
hitLabel = "hit"
cacheLabel = "cache"
sourceIPLabel = "source_ip"
readWriteMode = "READ_WRITE"
readOnlyMode = "READ_ONLY"

View file

@ -17,6 +17,7 @@ type InnerRingServiceMetrics struct {
eventDuration *prometheus.HistogramVec
morphCacheMetrics *morphCacheMetrics
logMetrics logger.LogMetrics
multinet *multinetMetrics
// nolint: unused
appInfo *ApplicationInfo
}
@ -51,6 +52,7 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
morphCacheMetrics: newMorphCacheMetrics(innerRingNamespace),
appInfo: NewApplicationInfo(misc.Version),
logMetrics: logger.NewLogMetrics(innerRingNamespace),
multinet: newMultinetMetrics(innerRingNamespace),
}
}
@ -78,3 +80,7 @@ func (m *InnerRingServiceMetrics) MorphCacheMetrics() MorphCacheMetrics {
func (m *InnerRingServiceMetrics) LogMetrics() logger.LogMetrics {
return m.logMetrics
}
func (m *InnerRingServiceMetrics) Multinet() MultinetMetrics {
return m.multinet
}

View file

@ -0,0 +1,35 @@
package metrics
import (
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type multinetMetrics struct {
dials *prometheus.GaugeVec
}
type MultinetMetrics interface {
Dial(sourceIP string, success bool)
}
func newMultinetMetrics(ns string) *multinetMetrics {
return &multinetMetrics{
dials: metrics.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: ns,
Subsystem: multinetSubsystem,
Name: "dial_count",
Help: "Dials count performed by multinet",
}, []string{sourceIPLabel, successLabel}),
}
}
func (m *multinetMetrics) Dial(sourceIP string, success bool) {
m.dials.With(prometheus.Labels{
sourceIPLabel: sourceIP,
successLabel: strconv.FormatBool(success),
}).Inc()
}

View file

@ -25,6 +25,7 @@ type NodeMetrics struct {
morphClient *morphClientMetrics
morphCache *morphCacheMetrics
log logger.LogMetrics
multinet *multinetMetrics
// nolint: unused
appInfo *ApplicationInfo
}
@ -53,6 +54,7 @@ func NewNodeMetrics() *NodeMetrics {
morphCache: newMorphCacheMetrics(namespace),
log: logger.NewLogMetrics(namespace),
appInfo: NewApplicationInfo(misc.Version),
multinet: newMultinetMetrics(namespace),
}
}
@ -120,3 +122,7 @@ func (m *NodeMetrics) MorphCacheMetrics() MorphCacheMetrics {
func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
return m.log
}
func (m *NodeMetrics) MultinetMetrics() MultinetMetrics {
return m.multinet
}

69
internal/net/config.go Normal file
View file

@ -0,0 +1,69 @@
package net
import (
"errors"
"fmt"
"net/netip"
"slices"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
"git.frostfs.info/TrueCloudLab/multinet"
)
var errEmptySourceIPList = errors.New("empty source IP list")
type Subnet struct {
Prefix string
SourceIPs []string
}
type Config struct {
Enabled bool
Subnets []Subnet
Balancer string
Restrict bool
FallbackDelay time.Duration
Metrics metrics.MultinetMetrics
}
func (c Config) toMultinetConfig() (multinet.Config, error) {
var subnets []multinet.Subnet
for _, s := range c.Subnets {
var ms multinet.Subnet
p, err := netip.ParsePrefix(s.Prefix)
if err != nil {
return multinet.Config{}, fmt.Errorf("parse IP prefix '%s': %w", s.Prefix, err)
}
ms.Prefix = p
for _, ip := range s.SourceIPs {
addr, err := netip.ParseAddr(ip)
if err != nil {
return multinet.Config{}, fmt.Errorf("parse IP address '%s': %w", ip, err)
}
ms.SourceIPs = append(ms.SourceIPs, addr)
}
if len(ms.SourceIPs) == 0 {
return multinet.Config{}, errEmptySourceIPList
}
subnets = append(subnets, ms)
}
return multinet.Config{
Subnets: subnets,
Balancer: multinet.BalancerType(c.Balancer),
Restrict: c.Restrict,
FallbackDelay: c.FallbackDelay,
Dialer: newDefaulDialer(),
EventHandler: newEventHandler(c.Metrics),
}, nil
}
func (c Config) equals(other Config) bool {
return c.Enabled == other.Enabled &&
slices.EqualFunc(c.Subnets, other.Subnets, func(lhs, rhs Subnet) bool {
return lhs.Prefix == rhs.Prefix && slices.Equal(lhs.SourceIPs, rhs.SourceIPs)
}) &&
c.Balancer == other.Balancer &&
c.Restrict == other.Restrict &&
c.FallbackDelay == other.FallbackDelay
}

View file

@ -0,0 +1,54 @@
// NOTE: code is taken from https://github.com/grpc/grpc-go/blob/v1.68.x/internal/transport/http_util.go
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package net
import (
"net/url"
"strings"
)
// parseDialTarget returns the network and address to pass to dialer.
func parseDialTarget(target string) (string, string) {
net := "tcp"
m1 := strings.Index(target, ":")
m2 := strings.Index(target, ":/")
// handle unix:addr which will fail with url.Parse
if m1 >= 0 && m2 < 0 {
if n := target[0:m1]; n == "unix" {
return n, target[m1+1:]
}
}
if m2 >= 0 {
t, err := url.Parse(target)
if err != nil {
return net, target
}
scheme := t.Scheme
addr := t.Path
if scheme == "unix" {
if addr == "" {
addr = t.Host
}
return scheme, addr
}
}
return net, target
}

39
internal/net/dialer.go Normal file
View file

@ -0,0 +1,39 @@
package net
import (
"context"
"net"
"syscall"
"time"
"golang.org/x/sys/unix"
)
type Dialer interface {
DialContext(ctx context.Context, network, address string) (net.Conn, error)
}
func DialContextTCP(ctx context.Context, address string, d Dialer) (net.Conn, error) {
return d.DialContext(ctx, "tcp", address)
}
func newDefaulDialer() net.Dialer {
// From `grpc.WithContextDialer` comment:
fyrchik marked this conversation as resolved Outdated

Could you provide a link to github?

Could you provide a link to github?

done

done
//
// Note: All supported releases of Go (as of December 2023) override the OS
fyrchik marked this conversation as resolved Outdated

Why do we care about it in this PR?

Why do we care about it in this PR?

multinet is used primarily for gprs connections, so I think it's important to preserve the behavior of the standard grpc library.

`multinet` is used primarily for gprs connections, so I think it's important to preserve the behavior of the standard grpc library.

Also I think it's correct to use OS settings for keep alive.

Also I think it's correct to use OS settings for keep alive.
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
// with OS defaults for keepalive time and interval, use a net.Dialer that sets
// the KeepAlive field to a negative value, and sets the SO_KEEPALIVE socket
// option to true from the Control field. For a concrete example of how to do
// this, see internal.NetDialerWithTCPKeepalive().
//
// https://github.com/grpc/grpc-go/blob/830135e6c5a351abf75f0c9cfdf978e5df8daeba/dialoptions.go#L432
return net.Dialer{
KeepAlive: time.Duration(-1),
Control: func(_, _ string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) {
_ = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)
})
},
}
}

View file

@ -0,0 +1,83 @@
package net
import (
"context"
"net"
"sync"
"git.frostfs.info/TrueCloudLab/multinet"
)
type DialerSource struct {
guard sync.RWMutex
c Config
md multinet.Dialer
}
func NewDialerSource(c Config) (*DialerSource, error) {
result := &DialerSource{}
if err := result.build(c); err != nil {
return nil, err
}
return result, nil
}
func (s *DialerSource) build(c Config) error {
if c.Enabled {
mc, err := c.toMultinetConfig()
if err != nil {
return err
}
md, err := multinet.NewDialer(mc)
if err != nil {
return err
}
s.md = md
s.c = c
return nil
}
s.md = nil
s.c = c
return nil
}
// GrpcContextDialer returns grpc.WithContextDialer func.
fyrchik marked this conversation as resolved Outdated

Frankly, this is a rather generic package (it is even called net)
Why don't we return Dialer only?
It seems every client of this function has to check the second return value and use the same default otherwise.

Frankly, this is a rather generic package (it is even called `net`) Why don't we return `Dialer` only? It seems _every_ client of this function has to check the second return value and use _the same_ default otherwise.

fixed

fixed
// Returns nil if multinet disabled.
func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Conn, error) {
s.guard.RLock()
defer s.guard.RUnlock()
if s.c.Enabled {
return func(ctx context.Context, address string) (net.Conn, error) {
network, address := parseDialTarget(address)
return s.md.DialContext(ctx, network, address)
}
}
return nil
}
// NetContextDialer returns net.DialContext dial function.
// Returns nil if multinet disabled.
func (s *DialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
s.guard.RLock()
defer s.guard.RUnlock()
if s.c.Enabled {
return func(ctx context.Context, network, address string) (net.Conn, error) {
return s.md.DialContext(ctx, network, address)
}
}
return nil
}
func (s *DialerSource) Update(c Config) error {
s.guard.Lock()
defer s.guard.Unlock()
if s.c.equals(c) {
return nil
}
return s.build(c)
}

View file

@ -0,0 +1,29 @@
package net
import (
"net"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
"git.frostfs.info/TrueCloudLab/multinet"
)
var _ multinet.EventHandler = (*metricsEventHandler)(nil)
type metricsEventHandler struct {
m metrics.MultinetMetrics
}
func (m *metricsEventHandler) DialPerformed(sourceIP net.Addr, _ string, _ string, err error) {
sourceIPString := "undefined"
if sourceIP != nil {
sourceIPString = sourceIP.Network() + "://" + sourceIP.String()
}
m.m.Dial(sourceIPString, err == nil)
fyrchik marked this conversation as resolved Outdated

We have success bool in many other places, why do we go with status here?

We have `success bool` in many other places, why do we go with `status` here?

In the distant bright future, it may be necessary to have separated error types, for example, for failover scenarios.

In the distant bright future, it may be necessary to have separated error types, for example, for failover scenarios.

Do you mean for this error or for everything?
I would rather be consistent across the codebase.
Also, what different kinds of errors do you have in mind?

Do you mean for this error or for everything? I would rather be consistent across the codebase. Also, what different kinds of errors do you have in mind?

For example, it may be necessary to distinguish between errors of the server to which the connection is being made and errors of the network interface through which the connection is being made.

For example, it may be necessary to distinguish between errors of the server to which the connection is being made and errors of the network interface through which the connection is being made.

I doubt we will do sth like this in the near future (remember: it is Dial, not some protocol method, so there is no status)
And if we will or will need to, we can always add a label.

I doubt we will do sth like this in the near future (remember: it is `Dial`, not some protocol method, so there is no status) And if we will or will need to, we can always add a label.

fixed

fixed
}
func newEventHandler(m metrics.MultinetMetrics) multinet.EventHandler {
if m == nil {
return nil
}
return &metricsEventHandler{m: m}
}

View file

@ -463,6 +463,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
name: morphPrefix,
from: fromSideChainBlock,
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
multinetMetrics: s.irMetrics.Multinet(),
}
// create morph client

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
@ -116,6 +117,7 @@ type (
sgn *transaction.Signer
from uint32 // block height
morphCacheMetric metrics.MorphCacheMetrics
multinetMetrics metrics.MultinetMetrics
}
)
@ -486,6 +488,12 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
}
nc := parseMultinetConfig(p.cfg, p.multinetMetrics)
ds, err := internalNet.NewDialerSource(nc)
if err != nil {
return nil, fmt.Errorf("dialer source: %w", err)
}
return client.New(
ctx,
p.key,
@ -498,6 +506,7 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
}),
client.WithSwitchInterval(p.cfg.GetDuration(p.name+".switch_interval")),
client.WithMorphCacheMetrics(p.morphCacheMetric),
client.WithDialerSource(ds),
)
}
@ -542,6 +551,28 @@ func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) {
return extraWallets, nil
}
func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNet.Config {
nc := internalNet.Config{
Enabled: cfg.GetBool("multinet.enabled"),
Balancer: cfg.GetString("multinet.balancer"),
Restrict: cfg.GetBool("multinet.restrict"),
FallbackDelay: cfg.GetDuration("multinet.fallback_delay"),
Metrics: m,
}
for i := 0; ; i++ {
mask := cfg.GetString(fmt.Sprintf("multinet.subnets.%d.mask", i))
if mask == "" {
break
}
sourceIPs := cfg.GetStringSlice(fmt.Sprintf("multinet.subnets.%d.source_ips", i))
nc.Subnets = append(nc.Subnets, internalNet.Subnet{
Prefix: mask,
SourceIPs: sourceIPs,
})
}
return nc
}
func (s *Server) initConfigFromBlockchain() error {
// get current epoch
epoch, err := s.netmapClient.Epoch()

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2"
@ -41,13 +42,13 @@ type cfg struct {
endpoints []Endpoint
singleCli *rpcclient.WSClient // neo-go client for single client mode
inactiveModeCb Callback
switchInterval time.Duration
morphCacheMetrics metrics.MorphCacheMetrics
dialerSource *internalNet.DialerSource
}
const (
@ -124,40 +125,24 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
var err error
var act *actor.Actor
if cfg.singleCli != nil {
// return client in single RPC node mode that uses
// predefined WS client
//
// in case of the closing web socket connection:
// if extra endpoints were provided via options,
// they will be used in switch process, otherwise
// inactive mode will be enabled
cli.client = cfg.singleCli
act, err = newActor(cfg.singleCli, acc, *cfg)
var endpoint Endpoint
for cli.endpoints.curr, endpoint = range cli.endpoints.list {
cli.client, act, err = cli.newCli(ctx, endpoint)
if err != nil {
return nil, fmt.Errorf("could not create RPC actor: %w", err)
}
} else {
var endpoint Endpoint
for cli.endpoints.curr, endpoint = range cli.endpoints.list {
cli.client, act, err = cli.newCli(ctx, endpoint)
if err != nil {
cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint,
zap.Error(err), zap.String("endpoint", endpoint.Address))
} else {
cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
zap.String("endpoint", endpoint.Address))
if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 {
cli.switchIsActive.Store(true)
go cli.switchToMostPrioritized(ctx)
}
break
cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint,
zap.Error(err), zap.String("endpoint", endpoint.Address))
} else {
cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
zap.String("endpoint", endpoint.Address))
if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 {
cli.switchIsActive.Store(true)
go cli.switchToMostPrioritized(ctx)
}
break
}
if cli.client == nil {
return nil, ErrNoHealthyEndpoint
}
}
if cli.client == nil {
return nil, ErrNoHealthyEndpoint
}
cli.setActor(act)
@ -175,6 +160,7 @@ func (c *Client) newCli(ctx context.Context, endpoint Endpoint) (*rpcclient.WSCl
Options: rpcclient.Options{
DialTimeout: c.cfg.dialTimeout,
TLSClientConfig: cfg,
NetDialContext: c.cfg.dialerSource.NetContextDialer(),
},
})
if err != nil {
@ -285,17 +271,6 @@ func WithEndpoints(endpoints ...Endpoint) Option {
}
}
// WithSingleClient returns a client constructor option
// that specifies single neo-go client and forces Client
// to use it for requests.
//
// Passed client must already be initialized.
func WithSingleClient(cli *rpcclient.WSClient) Option {
return func(c *cfg) {
c.singleCli = cli
}
}
// WithConnLostCallback return a client constructor option
// that specifies a callback that is called when Client
// unsuccessfully tried to connect to all the specified
@ -320,3 +295,9 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
c.morphCacheMetrics = morphCacheMetrics
}
}
func WithDialerSource(ds *internalNet.DialerSource) Option {
return func(c *cfg) {
c.dialerSource = ds
}
}

View file

@ -5,6 +5,7 @@ import (
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
)
@ -25,6 +26,7 @@ type (
Key *ecdsa.PrivateKey
ResponseCallback func(client.ResponseMetaInfo) error
AllowExternal bool
DialerSource *net.DialerSource
}
)

View file

@ -60,18 +60,21 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
prmInit.Key = *x.opts.Key
}
grpcOpts := []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(),
),
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
}
prmDial := client.PrmDial{
Endpoint: addr.URIAddr(),
GRPCDialOptions: []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(),
),
},
Endpoint: addr.URIAddr(),
GRPCDialOptions: grpcOpts,
}
if x.opts.DialTimeout > 0 {
fyrchik marked this conversation as resolved Outdated

Unix domain sockets can be used:
830135e6c5/internal/transport/http2_client.go (L159)
We should parse the address and return an explicit error if we do not support this.

Unix domain sockets can be used: https://github.com/grpc/grpc-go/blob/830135e6c5a351abf75f0c9cfdf978e5df8daeba/internal/transport/http2_client.go#L159 We should parse the address and return an explicit error if we do not support this.

Fixed. unix sockets may be supported if restrict = false.

Fixed. unix sockets may be supported if `restrict = false`.
prmDial.DialTimeout = x.opts.DialTimeout

View file

@ -8,6 +8,7 @@ import (
"sync"
"time"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
@ -21,6 +22,7 @@ type clientCache struct {
sync.Mutex
simplelru.LRU[string, cacheItem]
key *ecdsa.PrivateKey
ds *internalNet.DialerSource
}
type cacheItem struct {
@ -36,7 +38,7 @@ const (
var errRecentlyFailed = errors.New("client has recently failed")
func (c *clientCache) init(pk *ecdsa.PrivateKey) {
func (c *clientCache) init(pk *ecdsa.PrivateKey, ds *internalNet.DialerSource) {
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) {
if conn := value.cc; conn != nil {
_ = conn.Close()
@ -44,6 +46,7 @@ func (c *clientCache) init(pk *ecdsa.PrivateKey) {
})
c.LRU = *l
c.key = pk
c.ds = ds
}
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
@ -99,6 +102,7 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(),
),
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
}
if !netAddr.IsTLSEnabled() {

View file

@ -4,6 +4,7 @@ import (
"crypto/ecdsa"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
@ -45,6 +46,7 @@ type cfg struct {
morphChainStorage policyengine.MorphRuleChainStorageReader
metrics MetricsRegister
ds *net.DialerSource
}
// Option represents configuration option for a tree service.
@ -161,3 +163,9 @@ func WithNetmapState(state netmap.State) Option {
c.state = state
}
}
func WithDialerSource(ds *net.DialerSource) Option {
return func(c *cfg) {
c.ds = ds
}
}

View file

@ -65,7 +65,7 @@ func New(opts ...Option) *Service {
s.log = &logger.Logger{Logger: zap.NewNop()}
}
s.cache.init(s.key)
s.cache.init(s.key, s.ds)
s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
s.replicateLocalCh = make(chan applyOp)