From 87adb6182ba37dccf68f9f13695e28261a3a2183 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 8 Oct 2024 15:24:01 +0300 Subject: [PATCH 01/11] [#1422] config: Add multinet config Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config/multinet/config.go | 62 +++++++++++++++++++ .../config/multinet/config_test.go | 52 ++++++++++++++++ config/example/ir.env | 9 +++ config/example/ir.yaml | 15 +++++ config/example/node.env | 10 +++ config/example/node.json | 22 +++++++ config/example/node.yaml | 15 +++++ docs/storage-node-configuration.md | 39 ++++++++++-- 8 files changed, 219 insertions(+), 5 deletions(-) create mode 100644 cmd/frostfs-node/config/multinet/config.go create mode 100644 cmd/frostfs-node/config/multinet/config_test.go diff --git a/cmd/frostfs-node/config/multinet/config.go b/cmd/frostfs-node/config/multinet/config.go new file mode 100644 index 000000000..f598efc51 --- /dev/null +++ b/cmd/frostfs-node/config/multinet/config.go @@ -0,0 +1,62 @@ +package multinet + +import ( + "strconv" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" +) + +const ( + subsection = "multinet" + + FallbackDelayDefault = 300 * time.Millisecond +) + +// Enabled returns the value of "enabled" config parameter from "multinet" section. +func Enabled(c *config.Config) bool { + return config.BoolSafe(c.Sub(subsection), "enabled") +} + +type Subnet struct { + Mask string + SourceIPs []string +} + +// Subnets returns the value of "subnets" config parameter from "multinet" section. +func Subnets(c *config.Config) []Subnet { + var result []Subnet + sub := c.Sub(subsection).Sub("subnets") + for i := 0; ; i++ { + s := sub.Sub(strconv.FormatInt(int64(i), 10)) + mask := config.StringSafe(s, "mask") + if mask == "" { + break + } + sourceIPs := config.StringSliceSafe(s, "source_ips") + result = append(result, Subnet{ + Mask: mask, + SourceIPs: sourceIPs, + }) + } + return result +} + +// Balancer returns the value of "balancer" config parameter from "multinet" section. +func Balancer(c *config.Config) string { + return config.StringSafe(c.Sub(subsection), "balancer") +} + +// Restrict returns the value of "restrict" config parameter from "multinet" section. +func Restrict(c *config.Config) bool { + return config.BoolSafe(c.Sub(subsection), "restrict") +} + +// FallbackDelay returns the value of "fallback_delay" config parameter from "multinet" section. +func FallbackDelay(c *config.Config) time.Duration { + fd := config.DurationSafe(c.Sub(subsection), "fallback_delay") + if fd != 0 { // negative value means no fallback + return fd + } + return FallbackDelayDefault +} diff --git a/cmd/frostfs-node/config/multinet/config_test.go b/cmd/frostfs-node/config/multinet/config_test.go new file mode 100644 index 000000000..5f7dc6d53 --- /dev/null +++ b/cmd/frostfs-node/config/multinet/config_test.go @@ -0,0 +1,52 @@ +package multinet + +import ( + "testing" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test" + "github.com/stretchr/testify/require" +) + +func TestMultinetSection(t *testing.T) { + t.Run("defaults", func(t *testing.T) { + empty := configtest.EmptyConfig() + require.Equal(t, false, Enabled(empty)) + require.Equal(t, ([]Subnet)(nil), Subnets(empty)) + require.Equal(t, "", Balancer(empty)) + require.Equal(t, false, Restrict(empty)) + require.Equal(t, FallbackDelayDefault, FallbackDelay(empty)) + }) + + const path = "../../../../config/example/node" + + fileConfigTest := func(c *config.Config) { + require.Equal(t, true, Enabled(c)) + require.Equal(t, []Subnet{ + { + Mask: "192.168.219.174/24", + SourceIPs: []string{ + "192.168.218.185", + "192.168.219.185", + }, + }, + { + Mask: "10.78.70.74/24", + SourceIPs: []string{ + "10.78.70.185", + "10.78.71.185", + }, + }, + }, Subnets(c)) + require.Equal(t, "roundrobin", Balancer(c)) + require.Equal(t, false, Restrict(c)) + require.Equal(t, 350*time.Millisecond, FallbackDelay(c)) + } + + configtest.ForEachFileType(path, fileConfigTest) + + t.Run("ENV", func(t *testing.T) { + configtest.ForEnvFileType(t, path, fileConfigTest) + }) +} diff --git a/config/example/ir.env b/config/example/ir.env index 7234a4b32..ebd91c243 100644 --- a/config/example/ir.env +++ b/config/example/ir.env @@ -80,3 +80,12 @@ FROSTFS_IR_PPROF_MUTEX_RATE=10000 FROSTFS_IR_PROMETHEUS_ENABLED=true FROSTFS_IR_PROMETHEUS_ADDRESS=localhost:9090 FROSTFS_IR_PROMETHEUS_SHUTDOWN_TIMEOUT=30s + +FROSTFS_MULTINET_ENABLED=true +FROSTFS_MULTINET_SUBNETS_0_MASK="192.168.219.174/24" +FROSTFS_MULTINET_SUBNETS_0_SOURCE_IPS="192.168.218.185 192.168.219.185" +FROSTFS_MULTINET_SUBNETS_1_MASK="10.78.70.74/24" +FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185" +FROSTFS_MULTINET_BALANCER=roundrobin +FROSTFS_MULTINET_RESTRICT=false +FROSTFS_MULTINET_FALLBACK_DELAY=350ms diff --git a/config/example/ir.yaml b/config/example/ir.yaml index 4c64f088b..49f9fd324 100644 --- a/config/example/ir.yaml +++ b/config/example/ir.yaml @@ -123,3 +123,18 @@ prometheus: systemdnotify: enabled: true + +multinet: + enabled: true + subnets: + - mask: 192.168.219.174/24 + source_ips: + - 192.168.218.185 + - 192.168.219.185 + - mask: 10.78.70.74/24 + source_ips: + - 10.78.70.185 + - 10.78.71.185 + balancer: roundrobin + restrict: false + fallback_delay: 350ms diff --git a/config/example/node.env b/config/example/node.env index 6618a981a..580d343fb 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -206,3 +206,13 @@ FROSTFS_RUNTIME_SOFT_MEMORY_LIMIT=1073741824 # AUDIT section FROSTFS_AUDIT_ENABLED=true + +# MULTINET section +FROSTFS_MULTINET_ENABLED=true +FROSTFS_MULTINET_SUBNETS_0_MASK="192.168.219.174/24" +FROSTFS_MULTINET_SUBNETS_0_SOURCE_IPS="192.168.218.185 192.168.219.185" +FROSTFS_MULTINET_SUBNETS_1_MASK="10.78.70.74/24" +FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185" +FROSTFS_MULTINET_BALANCER=roundrobin +FROSTFS_MULTINET_RESTRICT=false +FROSTFS_MULTINET_FALLBACK_DELAY=350ms diff --git a/config/example/node.json b/config/example/node.json index 0d100ed80..3470d2d12 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -264,5 +264,27 @@ }, "audit": { "enabled": true + }, + "multinet": { + "enabled": true, + "subnets": [ + { + "mask": "192.168.219.174/24", + "source_ips": [ + "192.168.218.185", + "192.168.219.185" + ] + }, + { + "mask": "10.78.70.74/24", + "source_ips":[ + "10.78.70.185", + "10.78.71.185" + ] + } + ], + "balancer": "roundrobin", + "restrict": false, + "fallback_delay": "350ms" } } diff --git a/config/example/node.yaml b/config/example/node.yaml index 2a80fba18..2a963fc0f 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -240,3 +240,18 @@ runtime: audit: enabled: true + +multinet: + enabled: true + subnets: + - mask: 192.168.219.174/24 + source_ips: + - 192.168.218.185 + - 192.168.219.185 + - mask: 10.78.70.74/24 + source_ips: + - 10.78.70.185 + - 10.78.71.185 + balancer: roundrobin + restrict: false + fallback_delay: 350ms diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index c74695e2b..2b94400df 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -25,8 +25,8 @@ There are some custom types used for brevity: | `replicator` | [Replicator service configuration](#replicator-section) | | `storage` | [Storage engine configuration](#storage-section) | | `runtime` | [Runtime configuration](#runtime-section) | -| `audit` | [Audit configuration](#audit-section) | - +| `audit` | [Audit configuration](#audit-section) | +| `multinet` | [Multinet configuration](#multinet-section) | # `control` section ```yaml @@ -435,6 +435,35 @@ audit: enabled: true ``` -| Parameter | Type | Default value | Description | -|---------------------|--------|---------------|---------------------------------------------------| -| `soft_memory_limit` | `bool` | false | If `true` then audit event logs will be recorded. | +| Parameter | Type | Default value | Description | +|-----------|--------|---------------|---------------------------------------------------| +| `enabled` | `bool` | false | If `true` then audit event logs will be recorded. | + + +# `multinet` section +Contains multinet parameters. + +```yaml +multinet: + enabled: true + subnets: + - mask: 192.168.219.174/24 + source_ips: + - 192.168.218.185 + - 192.168.219.185 + - mask: 10.78.70.74/24 + source_ips: + - 10.78.70.185 + - 10.78.71.185 + balancer: roundrobin + restrict: false + fallback_delay: 350ms +``` + +| Parameter | Type | Default value | Description | +| ---------------- | ---------- | ------------- | -------------------------------------------------------------------------------------------------------------------------- | +| `enabled` | `bool` | false | If `true` then source-based routing is enabled. | +| `subnets` | `subnet` | empty | Resulting subnets. | +| `balancer` | `string` | "" | Balancer to select network interfaces, allowed values are "" (no balancing, use first suitable interface) or "roundrobin". | +| `restrict` | `bool` | false | If `true` then any requests that do not match `subnets` will fail. | +| `fallback_delay` | `duration` | 350ms | Delay before fallback to secondary IP addresses in case of hostname resolve. | -- 2.45.3 From 20ad8e5df0656066501228c9c93d71712971d927 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 8 Oct 2024 17:25:37 +0300 Subject: [PATCH 02/11] [#1422] node: Add dialer source to config Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 30 +++++++++++++++ go.mod | 1 + go.sum | 2 + internal/logs/logs.go | 1 + internal/net/config.go | 66 +++++++++++++++++++++++++++++++++ internal/net/dial_target.go | 54 +++++++++++++++++++++++++++ internal/net/dialer.go | 35 ++++++++++++++++++ internal/net/dialer_source.go | 69 +++++++++++++++++++++++++++++++++++ 8 files changed, 258 insertions(+) create mode 100644 internal/net/config.go create mode 100644 internal/net/dial_target.go create mode 100644 internal/net/dialer.go create mode 100644 internal/net/dialer_source.go diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 3c7e310b4..dc1bad485 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -26,12 +26,14 @@ import ( fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree" loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger" morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/multinet" nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object" replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator" tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" + internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid" @@ -436,6 +438,8 @@ type shared struct { metricsCollector *metrics.NodeMetrics metricsSvc *objectService.MetricCollector + + dialerSource *internalNet.DialerSource } // dynamicConfiguration stores parameters of the @@ -760,6 +764,9 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path()) fatalOnErr(err) + ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg)) + fatalOnErr(err) + cacheOpts := cache.ClientCacheOpts{ DialTimeout: apiclientconfig.DialTimeout(appCfg), StreamTimeout: apiclientconfig.StreamTimeout(appCfg), @@ -778,9 +785,27 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt putClientCache: cache.NewSDKClientCache(cacheOpts), persistate: persistate, metricsCollector: metrics.NewNodeMetrics(), + dialerSource: ds, } } +func internalNetConfig(appCfg *config.Config) internalNet.Config { + result := internalNet.Config{ + Enabled: multinet.Enabled(appCfg), + Balancer: multinet.Balancer(appCfg), + Restrict: multinet.Restrict(appCfg), + FallbackDelay: multinet.FallbackDelay(appCfg), + } + sn := multinet.Subnets(appCfg) + for _, s := range sn { + result.Subnets = append(result.Subnets, internalNet.Subnet{ + Prefix: s.Mask, + SourceIPs: s.SourceIPs, + }) + } + return result +} + func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) cfgNetmap { netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) @@ -1336,6 +1361,11 @@ func (c *cfg) reloadConfig(ctx context.Context) { } } + if err := c.dialerSource.Update(internalNetConfig(c.appCfg)); err != nil { + c.log.Error(logs.FailedToUpdateMultinetConfiguration, zap.Error(err)) + return + } + c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully) } diff --git a/go.mod b/go.mod index 1468c12b2..a84d3122a 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509 git.frostfs.info/TrueCloudLab/hrw v1.2.1 + git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 git.frostfs.info/TrueCloudLab/tzhash v1.8.0 git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 diff --git a/go.sum b/go.sum index 5ce81807a..43d53aa40 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509/go.mod h1:jmb7yxzZota9jWbC10p+7YR+6wwJPBj7J/Fl5VDkXys= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= +git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8= +git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI= git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928 h1:LK3mCkNZkY48eBA9jnk1N0eQZLsZhOG+XYw4EBoKUjM= git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg= git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 h1:vgbfkcnIexZUm3vREBBSa/Gv1Whjd1SFCUd0A+IaGPQ= diff --git a/internal/logs/logs.go b/internal/logs/logs.go index b4bc31b0c..0e9d58f32 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -523,4 +523,5 @@ const ( WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty" BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file" WritecacheCantGetObject = "can't get an object from fstree" + FailedToUpdateMultinetConfiguration = "failed to update multinet configuration" ) diff --git a/internal/net/config.go b/internal/net/config.go new file mode 100644 index 000000000..10450db23 --- /dev/null +++ b/internal/net/config.go @@ -0,0 +1,66 @@ +package net + +import ( + "errors" + "fmt" + "net/netip" + "slices" + "time" + + "git.frostfs.info/TrueCloudLab/multinet" +) + +var errEmptySourceIPList = errors.New("empty source IP list") + +type Subnet struct { + Prefix string + SourceIPs []string +} + +type Config struct { + Enabled bool + Subnets []Subnet + Balancer string + Restrict bool + FallbackDelay time.Duration +} + +func (c Config) toMultinetConfig() (multinet.Config, error) { + var subnets []multinet.Subnet + for _, s := range c.Subnets { + var ms multinet.Subnet + p, err := netip.ParsePrefix(s.Prefix) + if err != nil { + return multinet.Config{}, fmt.Errorf("parse IP prefix '%s': %w", s.Prefix, err) + } + ms.Prefix = p + for _, ip := range s.SourceIPs { + addr, err := netip.ParseAddr(ip) + if err != nil { + return multinet.Config{}, fmt.Errorf("parse IP address '%s': %w", ip, err) + } + ms.SourceIPs = append(ms.SourceIPs, addr) + } + if len(ms.SourceIPs) == 0 { + return multinet.Config{}, errEmptySourceIPList + } + subnets = append(subnets, ms) + } + return multinet.Config{ + Subnets: subnets, + Balancer: multinet.BalancerType(c.Balancer), + Restrict: c.Restrict, + FallbackDelay: c.FallbackDelay, + Dialer: newDefaulDialer(), + }, nil +} + +func (c Config) equals(other Config) bool { + return c.Enabled == other.Enabled && + slices.EqualFunc(c.Subnets, other.Subnets, func(lhs, rhs Subnet) bool { + return lhs.Prefix == rhs.Prefix && slices.Equal(lhs.SourceIPs, rhs.SourceIPs) + }) && + c.Balancer == other.Balancer && + c.Restrict == other.Restrict && + c.FallbackDelay == other.FallbackDelay +} diff --git a/internal/net/dial_target.go b/internal/net/dial_target.go new file mode 100644 index 000000000..6265f1860 --- /dev/null +++ b/internal/net/dial_target.go @@ -0,0 +1,54 @@ +// NOTE: code is taken from https://github.com/grpc/grpc-go/blob/v1.68.x/internal/transport/http_util.go + +/* + * + * Copyright 2014 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package net + +import ( + "net/url" + "strings" +) + +// parseDialTarget returns the network and address to pass to dialer. +func parseDialTarget(target string) (string, string) { + net := "tcp" + m1 := strings.Index(target, ":") + m2 := strings.Index(target, ":/") + // handle unix:addr which will fail with url.Parse + if m1 >= 0 && m2 < 0 { + if n := target[0:m1]; n == "unix" { + return n, target[m1+1:] + } + } + if m2 >= 0 { + t, err := url.Parse(target) + if err != nil { + return net, target + } + scheme := t.Scheme + addr := t.Path + if scheme == "unix" { + if addr == "" { + addr = t.Host + } + return scheme, addr + } + } + return net, target +} diff --git a/internal/net/dialer.go b/internal/net/dialer.go new file mode 100644 index 000000000..4537490f6 --- /dev/null +++ b/internal/net/dialer.go @@ -0,0 +1,35 @@ +package net + +import ( + "context" + "net" + "syscall" + "time" + + "golang.org/x/sys/unix" +) + +type Dialer interface { + DialContext(ctx context.Context, network, address string) (net.Conn, error) +} + +func newDefaulDialer() net.Dialer { + // From `grpc.WithContextDialer` comment: + // + // Note: All supported releases of Go (as of December 2023) override the OS + // defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive + // with OS defaults for keepalive time and interval, use a net.Dialer that sets + // the KeepAlive field to a negative value, and sets the SO_KEEPALIVE socket + // option to true from the Control field. For a concrete example of how to do + // this, see internal.NetDialerWithTCPKeepalive(). + // + // https://github.com/grpc/grpc-go/blob/830135e6c5a351abf75f0c9cfdf978e5df8daeba/dialoptions.go#L432 + return net.Dialer{ + KeepAlive: time.Duration(-1), + Control: func(_, _ string, c syscall.RawConn) error { + return c.Control(func(fd uintptr) { + unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1) + }) + }, + } +} diff --git a/internal/net/dialer_source.go b/internal/net/dialer_source.go new file mode 100644 index 000000000..e6a142a08 --- /dev/null +++ b/internal/net/dialer_source.go @@ -0,0 +1,69 @@ +package net + +import ( + "context" + "net" + "sync" + + "git.frostfs.info/TrueCloudLab/multinet" +) + +type DialerSource struct { + guard sync.RWMutex + + c Config + + md multinet.Dialer +} + +func NewDialerSource(c Config) (*DialerSource, error) { + result := &DialerSource{} + if err := result.build(c); err != nil { + return nil, err + } + return result, nil +} + +func (s *DialerSource) build(c Config) error { + if c.Enabled { + mc, err := c.toMultinetConfig() + if err != nil { + return err + } + md, err := multinet.NewDialer(mc) + if err != nil { + return err + } + s.md = md + s.c = c + return nil + } + s.md = nil + s.c = c + return nil +} + +// GrpcContextDialer returns grpc.WithContextDialer func. +// Returns nil if multinet disabled. +func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Conn, error) { + s.guard.RLock() + defer s.guard.RUnlock() + + if s.c.Enabled { + return func(ctx context.Context, address string) (net.Conn, error) { + network, address := parseDialTarget(address) + return s.md.DialContext(ctx, network, address) + } + } + return nil +} + +func (s *DialerSource) Update(c Config) error { + s.guard.Lock() + defer s.guard.Unlock() + + if s.c.equals(c) { + return nil + } + return s.build(c) +} -- 2.45.3 From 41dde74243fea359f433c94d5e18e7a3f1b64969 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 9 Oct 2024 11:11:44 +0300 Subject: [PATCH 03/11] [#1422] node: Use dialer source for SDK cache Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 1 + internal/net/dialer.go | 6 +++++- pkg/network/cache/client.go | 2 ++ pkg/network/cache/multi.go | 25 ++++++++++++++----------- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index dc1bad485..d44597857 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -773,6 +773,7 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt Key: &key.PrivateKey, AllowExternal: apiclientconfig.AllowExternal(appCfg), ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg), + DialerSource: ds, } return shared{ diff --git a/internal/net/dialer.go b/internal/net/dialer.go index 4537490f6..daf0f815f 100644 --- a/internal/net/dialer.go +++ b/internal/net/dialer.go @@ -13,6 +13,10 @@ type Dialer interface { DialContext(ctx context.Context, network, address string) (net.Conn, error) } +func DialContextTCP(ctx context.Context, address string, d Dialer) (net.Conn, error) { + return d.DialContext(ctx, "tcp", address) +} + func newDefaulDialer() net.Dialer { // From `grpc.WithContextDialer` comment: // @@ -28,7 +32,7 @@ func newDefaulDialer() net.Dialer { KeepAlive: time.Duration(-1), Control: func(_, _ string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { - unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1) + _ = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1) }) }, } diff --git a/pkg/network/cache/client.go b/pkg/network/cache/client.go index 371d3c76f..63ae0bfdb 100644 --- a/pkg/network/cache/client.go +++ b/pkg/network/cache/client.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" ) @@ -25,6 +26,7 @@ type ( Key *ecdsa.PrivateKey ResponseCallback func(client.ResponseMetaInfo) error AllowExternal bool + DialerSource *net.DialerSource } ) diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index b83cbb217..e936ead65 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -60,18 +60,21 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address prmInit.Key = *x.opts.Key } + grpcOpts := []grpc.DialOption{ + grpc.WithChainUnaryInterceptor( + metrics.NewUnaryClientInterceptor(), + tracing.NewUnaryClientInteceptor(), + ), + grpc.WithChainStreamInterceptor( + metrics.NewStreamClientInterceptor(), + tracing.NewStreamClientInterceptor(), + ), + grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()), + } + prmDial := client.PrmDial{ - Endpoint: addr.URIAddr(), - GRPCDialOptions: []grpc.DialOption{ - grpc.WithChainUnaryInterceptor( - metrics.NewUnaryClientInterceptor(), - tracing.NewUnaryClientInteceptor(), - ), - grpc.WithChainStreamInterceptor( - metrics.NewStreamClientInterceptor(), - tracing.NewStreamClientInterceptor(), - ), - }, + Endpoint: addr.URIAddr(), + GRPCDialOptions: grpcOpts, } if x.opts.DialTimeout > 0 { prmDial.DialTimeout = x.opts.DialTimeout -- 2.45.3 From e6bed7be0ef9068b294aa180b6971999229ebd9b Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 9 Oct 2024 11:18:24 +0300 Subject: [PATCH 04/11] [#1422] tree: Use dialer source for tree service connections Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/tree.go | 1 + pkg/services/tree/cache.go | 6 +++++- pkg/services/tree/options.go | 8 ++++++++ pkg/services/tree/service.go | 2 +- 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/cmd/frostfs-node/tree.go b/cmd/frostfs-node/tree.go index 192f08471..f188e2fbc 100644 --- a/cmd/frostfs-node/tree.go +++ b/cmd/frostfs-node/tree.go @@ -67,6 +67,7 @@ func initTreeService(c *cfg) { tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()), tree.WithAPEMorphRuleStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage()), tree.WithNetmapState(c.cfgNetmap.state), + tree.WithDialerSource(c.dialerSource), ) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { diff --git a/pkg/services/tree/cache.go b/pkg/services/tree/cache.go index 38501b852..e490cb855 100644 --- a/pkg/services/tree/cache.go +++ b/pkg/services/tree/cache.go @@ -8,6 +8,7 @@ import ( "sync" "time" + internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc" tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" @@ -21,6 +22,7 @@ type clientCache struct { sync.Mutex simplelru.LRU[string, cacheItem] key *ecdsa.PrivateKey + ds *internalNet.DialerSource } type cacheItem struct { @@ -36,7 +38,7 @@ const ( var errRecentlyFailed = errors.New("client has recently failed") -func (c *clientCache) init(pk *ecdsa.PrivateKey) { +func (c *clientCache) init(pk *ecdsa.PrivateKey, ds *internalNet.DialerSource) { l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) { if conn := value.cc; conn != nil { _ = conn.Close() @@ -44,6 +46,7 @@ func (c *clientCache) init(pk *ecdsa.PrivateKey) { }) c.LRU = *l c.key = pk + c.ds = ds } func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) { @@ -99,6 +102,7 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (* metrics.NewStreamClientInterceptor(), tracing.NewStreamClientInterceptor(), ), + grpc.WithContextDialer(c.ds.GrpcContextDialer()), } if !netAddr.IsTLSEnabled() { diff --git a/pkg/services/tree/options.go b/pkg/services/tree/options.go index 1db5607f6..1633ae557 100644 --- a/pkg/services/tree/options.go +++ b/pkg/services/tree/options.go @@ -4,6 +4,7 @@ import ( "crypto/ecdsa" "time" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" @@ -45,6 +46,7 @@ type cfg struct { morphChainStorage policyengine.MorphRuleChainStorageReader metrics MetricsRegister + ds *net.DialerSource } // Option represents configuration option for a tree service. @@ -161,3 +163,9 @@ func WithNetmapState(state netmap.State) Option { c.state = state } } + +func WithDialerSource(ds *net.DialerSource) Option { + return func(c *cfg) { + c.ds = ds + } +} diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 60bb1a6ad..2cb2af294 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -65,7 +65,7 @@ func New(opts ...Option) *Service { s.log = &logger.Logger{Logger: zap.NewNop()} } - s.cache.init(s.key) + s.cache.init(s.key, s.ds) s.closeCh = make(chan struct{}) s.replicateCh = make(chan movePair, s.replicatorChannelCapacity) s.replicateLocalCh = make(chan applyOp) -- 2.45.3 From f3a745145366ee2a8a297eaaf0de814be6a2e8ee Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 9 Oct 2024 11:34:36 +0300 Subject: [PATCH 05/11] [#1422] morph: Drop single client as not used Signed-off-by: Dmitrii Stepanov --- pkg/morph/client/constructor.go | 59 +++++++++------------------------ 1 file changed, 15 insertions(+), 44 deletions(-) diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 78cb3e82f..60b5b4b97 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -41,8 +41,6 @@ type cfg struct { endpoints []Endpoint - singleCli *rpcclient.WSClient // neo-go client for single client mode - inactiveModeCb Callback switchInterval time.Duration @@ -124,40 +122,24 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er var err error var act *actor.Actor - if cfg.singleCli != nil { - // return client in single RPC node mode that uses - // predefined WS client - // - // in case of the closing web socket connection: - // if extra endpoints were provided via options, - // they will be used in switch process, otherwise - // inactive mode will be enabled - cli.client = cfg.singleCli - - act, err = newActor(cfg.singleCli, acc, *cfg) + var endpoint Endpoint + for cli.endpoints.curr, endpoint = range cli.endpoints.list { + cli.client, act, err = cli.newCli(ctx, endpoint) if err != nil { - return nil, fmt.Errorf("could not create RPC actor: %w", err) - } - } else { - var endpoint Endpoint - for cli.endpoints.curr, endpoint = range cli.endpoints.list { - cli.client, act, err = cli.newCli(ctx, endpoint) - if err != nil { - cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint, - zap.Error(err), zap.String("endpoint", endpoint.Address)) - } else { - cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint, - zap.String("endpoint", endpoint.Address)) - if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 { - cli.switchIsActive.Store(true) - go cli.switchToMostPrioritized(ctx) - } - break + cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint, + zap.Error(err), zap.String("endpoint", endpoint.Address)) + } else { + cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint, + zap.String("endpoint", endpoint.Address)) + if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 { + cli.switchIsActive.Store(true) + go cli.switchToMostPrioritized(ctx) } + break } - if cli.client == nil { - return nil, ErrNoHealthyEndpoint - } + } + if cli.client == nil { + return nil, ErrNoHealthyEndpoint } cli.setActor(act) @@ -285,17 +267,6 @@ func WithEndpoints(endpoints ...Endpoint) Option { } } -// WithSingleClient returns a client constructor option -// that specifies single neo-go client and forces Client -// to use it for requests. -// -// Passed client must already be initialized. -func WithSingleClient(cli *rpcclient.WSClient) Option { - return func(c *cfg) { - c.singleCli = cli - } -} - // WithConnLostCallback return a client constructor option // that specifies a callback that is called when Client // unsuccessfully tried to connect to all the specified -- 2.45.3 From 9c733bd1b027bbc740fa087da7c1a3784b6d6b85 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 9 Oct 2024 11:48:16 +0300 Subject: [PATCH 06/11] [#1422] mod: Bump neoneo-go version Signed-off-by: Dmitrii Stepanov --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a84d3122a..aefe2889a 100644 --- a/go.mod +++ b/go.mod @@ -133,4 +133,4 @@ require ( rsc.io/tmplfunc v0.0.3 // indirect ) -replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928 +replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07 diff --git a/go.sum b/go.sum index 43d53aa40..4d44079d4 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8l git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8= git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI= -git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928 h1:LK3mCkNZkY48eBA9jnk1N0eQZLsZhOG+XYw4EBoKUjM= -git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg= +git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07 h1:gPaqGsk6gSWQyNVjaStydfUz6Z/loHc9XyvGrJ5qSPY= +git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg= git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 h1:vgbfkcnIexZUm3vREBBSa/Gv1Whjd1SFCUd0A+IaGPQ= git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA= git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA= -- 2.45.3 From 6ecf74c3d336a2df078acd5c84c2de9d8b4870bf Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 9 Oct 2024 11:48:59 +0300 Subject: [PATCH 07/11] [#1422] morph: Add dialer source support Signed-off-by: Dmitrii Stepanov --- internal/net/dialer_source.go | 14 ++++++++++++++ pkg/morph/client/constructor.go | 15 +++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/internal/net/dialer_source.go b/internal/net/dialer_source.go index e6a142a08..3d94dedc7 100644 --- a/internal/net/dialer_source.go +++ b/internal/net/dialer_source.go @@ -58,6 +58,20 @@ func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Co return nil } +// NetContextDialer returns net.DialContext dial function. +// Returns nil if multinet disabled. +func (s *DialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) { + s.guard.RLock() + defer s.guard.RUnlock() + + if s.c.Enabled { + return func(ctx context.Context, network, address string) (net.Conn, error) { + return s.md.DialContext(ctx, network, address) + } + } + return nil +} + func (s *DialerSource) Update(c Config) error { s.guard.Lock() defer s.guard.Unlock() diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 60b5b4b97..2313222f0 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -4,10 +4,12 @@ import ( "context" "errors" "fmt" + "net" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" + internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net" morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" lru "github.com/hashicorp/golang-lru/v2" @@ -46,6 +48,8 @@ type cfg struct { switchInterval time.Duration morphCacheMetrics metrics.MorphCacheMetrics + + dialerSource *internalNet.DialerSource } const ( @@ -153,10 +157,15 @@ func (c *Client) newCli(ctx context.Context, endpoint Endpoint) (*rpcclient.WSCl if err != nil { return nil, nil, fmt.Errorf("read mtls certificates: %w", err) } + var netDialContext func(ctx context.Context, network, addr string) (net.Conn, error) + if c.cfg.dialerSource != nil { // TODO fix after IR + netDialContext = c.cfg.dialerSource.NetContextDialer() + } cli, err := rpcclient.NewWS(ctx, endpoint.Address, rpcclient.WSOptions{ Options: rpcclient.Options{ DialTimeout: c.cfg.dialTimeout, TLSClientConfig: cfg, + NetDialContext: netDialContext, }, }) if err != nil { @@ -291,3 +300,9 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option { c.morphCacheMetrics = morphCacheMetrics } } + +func WithDialerSource(ds *internalNet.DialerSource) Option { + return func(c *cfg) { + c.dialerSource = ds + } +} -- 2.45.3 From 480944d397ca32fc4bef92622012305121b8293a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 9 Oct 2024 11:49:23 +0300 Subject: [PATCH 08/11] [#1422] node: Use dialer source for morph Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/morph.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/frostfs-node/morph.go b/cmd/frostfs-node/morph.go index 1bfcb8ac9..f93f233eb 100644 --- a/cmd/frostfs-node/morph.go +++ b/cmd/frostfs-node/morph.go @@ -48,6 +48,7 @@ func initMorphComponents(ctx context.Context, c *cfg) { }), client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)), client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()), + client.WithDialerSource(c.dialerSource), ) if err != nil { c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient, -- 2.45.3 From 110f2fd9e363643b9a73db527d0adcbcccc7ee7e Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 14 Oct 2024 13:51:21 +0300 Subject: [PATCH 09/11] [#1422] ir: Add dialer source Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-ir/defaults.go | 10 ++++++++++ pkg/innerring/innerring.go | 29 +++++++++++++++++++++++++++++ pkg/morph/client/constructor.go | 7 +------ 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/cmd/frostfs-ir/defaults.go b/cmd/frostfs-ir/defaults.go index 899918d22..9b775252f 100644 --- a/cmd/frostfs-ir/defaults.go +++ b/cmd/frostfs-ir/defaults.go @@ -48,6 +48,8 @@ func defaultConfiguration(cfg *viper.Viper) { cfg.SetDefault("node.kludge_compatibility_mode", false) cfg.SetDefault("audit.enabled", false) + + setMultinetDefaults(cfg) } func setControlDefaults(cfg *viper.Viper) { @@ -131,3 +133,11 @@ func setMorphDefaults(cfg *viper.Viper) { cfg.SetDefault("morph.validators", []string{}) cfg.SetDefault("morph.switch_interval", 2*time.Minute) } + +func setMultinetDefaults(cfg *viper.Viper) { + cfg.SetDefault("multinet.enabled", false) + cfg.SetDefault("multinet.balancer", "") + cfg.SetDefault("multinet.restrict", false) + cfg.SetDefault("multinet.fallback_delay", "0s") + cfg.SetDefault("multinet.subnets", "") +} diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 53a07e36c..a4a52edec 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -9,6 +9,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" + internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" @@ -486,6 +487,12 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c return nil, fmt.Errorf("%s chain client endpoints not provided", p.name) } + nc := parseMultinetConfig(p.cfg) + ds, err := internalNet.NewDialerSource(nc) + if err != nil { + return nil, fmt.Errorf("dialer source: %w", err) + } + return client.New( ctx, p.key, @@ -498,6 +505,7 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c }), client.WithSwitchInterval(p.cfg.GetDuration(p.name+".switch_interval")), client.WithMorphCacheMetrics(p.morphCacheMetric), + client.WithDialerSource(ds), ) } @@ -542,6 +550,27 @@ func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) { return extraWallets, nil } +func parseMultinetConfig(cfg *viper.Viper) internalNet.Config { + nc := internalNet.Config{ + Enabled: cfg.GetBool("multinet.enabled"), + Balancer: cfg.GetString("multinet.balancer"), + Restrict: cfg.GetBool("multinet.restrict"), + FallbackDelay: cfg.GetDuration("multinet.fallback_delay"), + } + for i := 0; ; i++ { + mask := cfg.GetString(fmt.Sprintf("multinet.subnets.%d.mask", i)) + if mask == "" { + break + } + sourceIPs := cfg.GetStringSlice(fmt.Sprintf("multinet.subnets.%d.source_ips", i)) + nc.Subnets = append(nc.Subnets, internalNet.Subnet{ + Prefix: mask, + SourceIPs: sourceIPs, + }) + } + return nc +} + func (s *Server) initConfigFromBlockchain() error { // get current epoch epoch, err := s.netmapClient.Epoch() diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 2313222f0..a8efa76e7 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "net" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -157,15 +156,11 @@ func (c *Client) newCli(ctx context.Context, endpoint Endpoint) (*rpcclient.WSCl if err != nil { return nil, nil, fmt.Errorf("read mtls certificates: %w", err) } - var netDialContext func(ctx context.Context, network, addr string) (net.Conn, error) - if c.cfg.dialerSource != nil { // TODO fix after IR - netDialContext = c.cfg.dialerSource.NetContextDialer() - } cli, err := rpcclient.NewWS(ctx, endpoint.Address, rpcclient.WSOptions{ Options: rpcclient.Options{ DialTimeout: c.cfg.dialTimeout, TLSClientConfig: cfg, - NetDialContext: netDialContext, + NetDialContext: c.cfg.dialerSource.NetContextDialer(), }, }) if err != nil { -- 2.45.3 From f6fe5da02015474e09f53a8669292e0fa31647d2 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 14 Oct 2024 14:22:27 +0300 Subject: [PATCH 10/11] [#1422] morph: Resolve funlen linter Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/morph.go | 84 ++++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/cmd/frostfs-node/morph.go b/cmd/frostfs-node/morph.go index f93f233eb..197e50371 100644 --- a/cmd/frostfs-node/morph.go +++ b/cmd/frostfs-node/morph.go @@ -29,6 +29,50 @@ const ( ) func initMorphComponents(ctx context.Context, c *cfg) { + initMorphClient(ctx, c) + + lookupScriptHashesInNNS(c) // smart contract auto negotiation + + if c.cfgMorph.notaryEnabled { + err := c.cfgMorph.client.EnableNotarySupport( + client.WithProxyContract( + c.cfgMorph.proxyScriptHash, + ), + ) + fatalOnErr(err) + } + + c.log.Info(logs.FrostFSNodeNotarySupport, + zap.Bool("sidechain_enabled", c.cfgMorph.notaryEnabled), + ) + + wrap, err := nmClient.NewFromMorph(c.cfgMorph.client, c.cfgNetmap.scriptHash, 0, nmClient.TryNotary()) + fatalOnErr(err) + + var netmapSource netmap.Source + + c.cfgMorph.containerCacheSize = morphconfig.ContainerCacheSize(c.appCfg) + c.cfgMorph.cacheTTL = morphconfig.CacheTTL(c.appCfg) + + if c.cfgMorph.cacheTTL == 0 { + msPerBlock, err := c.cfgMorph.client.MsPerBlock() + fatalOnErr(err) + c.cfgMorph.cacheTTL = time.Duration(msPerBlock) * time.Millisecond + c.log.Debug(logs.FrostFSNodeMorphcacheTTLFetchedFromNetwork, zap.Duration("value", c.cfgMorph.cacheTTL)) + } + + if c.cfgMorph.cacheTTL < 0 { + netmapSource = wrap + } else { + // use RPC node as source of netmap (with caching) + netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap) + } + + c.netMapSource = netmapSource + c.cfgNetmap.wrapper = wrap +} + +func initMorphClient(ctx context.Context, c *cfg) { addresses := morphconfig.RPCEndpoint(c.appCfg) // Morph client stable-sorts endpoints by priority. Shuffle here to randomize @@ -70,46 +114,6 @@ func initMorphComponents(ctx context.Context, c *cfg) { c.cfgMorph.client = cli c.cfgMorph.notaryEnabled = cli.ProbeNotary() - - lookupScriptHashesInNNS(c) // smart contract auto negotiation - - if c.cfgMorph.notaryEnabled { - err = c.cfgMorph.client.EnableNotarySupport( - client.WithProxyContract( - c.cfgMorph.proxyScriptHash, - ), - ) - fatalOnErr(err) - } - - c.log.Info(logs.FrostFSNodeNotarySupport, - zap.Bool("sidechain_enabled", c.cfgMorph.notaryEnabled), - ) - - wrap, err := nmClient.NewFromMorph(c.cfgMorph.client, c.cfgNetmap.scriptHash, 0, nmClient.TryNotary()) - fatalOnErr(err) - - var netmapSource netmap.Source - - c.cfgMorph.containerCacheSize = morphconfig.ContainerCacheSize(c.appCfg) - c.cfgMorph.cacheTTL = morphconfig.CacheTTL(c.appCfg) - - if c.cfgMorph.cacheTTL == 0 { - msPerBlock, err := c.cfgMorph.client.MsPerBlock() - fatalOnErr(err) - c.cfgMorph.cacheTTL = time.Duration(msPerBlock) * time.Millisecond - c.log.Debug(logs.FrostFSNodeMorphcacheTTLFetchedFromNetwork, zap.Duration("value", c.cfgMorph.cacheTTL)) - } - - if c.cfgMorph.cacheTTL < 0 { - netmapSource = wrap - } else { - // use RPC node as source of netmap (with caching) - netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap) - } - - c.netMapSource = netmapSource - c.cfgNetmap.wrapper = wrap } func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) { -- 2.45.3 From 5b9aa3fcf343bfd14cbc6fff24bcac5ae537fa0e Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 15 Oct 2024 14:46:00 +0300 Subject: [PATCH 11/11] [#1422] multinet: Add metrics Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 11 +++++++---- internal/metrics/consts.go | 2 ++ internal/metrics/innerring.go | 6 ++++++ internal/metrics/multinet.go | 35 +++++++++++++++++++++++++++++++++ internal/metrics/node.go | 6 ++++++ internal/net/config.go | 3 +++ internal/net/event_handler.go | 29 +++++++++++++++++++++++++++ pkg/innerring/initialization.go | 1 + pkg/innerring/innerring.go | 6 ++++-- 9 files changed, 93 insertions(+), 6 deletions(-) create mode 100644 internal/metrics/multinet.go create mode 100644 internal/net/event_handler.go diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index d44597857..9d2b77210 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -764,7 +764,9 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path()) fatalOnErr(err) - ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg)) + nodeMetrics := metrics.NewNodeMetrics() + + ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg, nodeMetrics.MultinetMetrics())) fatalOnErr(err) cacheOpts := cache.ClientCacheOpts{ @@ -785,17 +787,18 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt bgClientCache: cache.NewSDKClientCache(cacheOpts), putClientCache: cache.NewSDKClientCache(cacheOpts), persistate: persistate, - metricsCollector: metrics.NewNodeMetrics(), + metricsCollector: nodeMetrics, dialerSource: ds, } } -func internalNetConfig(appCfg *config.Config) internalNet.Config { +func internalNetConfig(appCfg *config.Config, m metrics.MultinetMetrics) internalNet.Config { result := internalNet.Config{ Enabled: multinet.Enabled(appCfg), Balancer: multinet.Balancer(appCfg), Restrict: multinet.Restrict(appCfg), FallbackDelay: multinet.FallbackDelay(appCfg), + Metrics: m, } sn := multinet.Subnets(appCfg) for _, s := range sn { @@ -1362,7 +1365,7 @@ func (c *cfg) reloadConfig(ctx context.Context) { } } - if err := c.dialerSource.Update(internalNetConfig(c.appCfg)); err != nil { + if err := c.dialerSource.Update(internalNetConfig(c.appCfg, c.metricsCollector.MultinetMetrics())); err != nil { c.log.Error(logs.FailedToUpdateMultinetConfiguration, zap.Error(err)) return } diff --git a/internal/metrics/consts.go b/internal/metrics/consts.go index 3aa51c0f0..cb165de69 100644 --- a/internal/metrics/consts.go +++ b/internal/metrics/consts.go @@ -22,6 +22,7 @@ const ( grpcServerSubsystem = "grpc_server" policerSubsystem = "policer" commonCacheSubsystem = "common_cache" + multinetSubsystem = "multinet" successLabel = "success" shardIDLabel = "shard_id" @@ -41,6 +42,7 @@ const ( endpointLabel = "endpoint" hitLabel = "hit" cacheLabel = "cache" + sourceIPLabel = "source_ip" readWriteMode = "READ_WRITE" readOnlyMode = "READ_ONLY" diff --git a/internal/metrics/innerring.go b/internal/metrics/innerring.go index f3f529d05..d0cb8131f 100644 --- a/internal/metrics/innerring.go +++ b/internal/metrics/innerring.go @@ -17,6 +17,7 @@ type InnerRingServiceMetrics struct { eventDuration *prometheus.HistogramVec morphCacheMetrics *morphCacheMetrics logMetrics logger.LogMetrics + multinet *multinetMetrics // nolint: unused appInfo *ApplicationInfo } @@ -51,6 +52,7 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics { morphCacheMetrics: newMorphCacheMetrics(innerRingNamespace), appInfo: NewApplicationInfo(misc.Version), logMetrics: logger.NewLogMetrics(innerRingNamespace), + multinet: newMultinetMetrics(innerRingNamespace), } } @@ -78,3 +80,7 @@ func (m *InnerRingServiceMetrics) MorphCacheMetrics() MorphCacheMetrics { func (m *InnerRingServiceMetrics) LogMetrics() logger.LogMetrics { return m.logMetrics } + +func (m *InnerRingServiceMetrics) Multinet() MultinetMetrics { + return m.multinet +} diff --git a/internal/metrics/multinet.go b/internal/metrics/multinet.go new file mode 100644 index 000000000..6b1f99d46 --- /dev/null +++ b/internal/metrics/multinet.go @@ -0,0 +1,35 @@ +package metrics + +import ( + "strconv" + + "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type multinetMetrics struct { + dials *prometheus.GaugeVec +} + +type MultinetMetrics interface { + Dial(sourceIP string, success bool) +} + +func newMultinetMetrics(ns string) *multinetMetrics { + return &multinetMetrics{ + dials: metrics.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: ns, + Subsystem: multinetSubsystem, + Name: "dial_count", + Help: "Dials count performed by multinet", + }, []string{sourceIPLabel, successLabel}), + } +} + +func (m *multinetMetrics) Dial(sourceIP string, success bool) { + m.dials.With(prometheus.Labels{ + sourceIPLabel: sourceIP, + successLabel: strconv.FormatBool(success), + }).Inc() +} diff --git a/internal/metrics/node.go b/internal/metrics/node.go index 711387875..4ea3c7c24 100644 --- a/internal/metrics/node.go +++ b/internal/metrics/node.go @@ -25,6 +25,7 @@ type NodeMetrics struct { morphClient *morphClientMetrics morphCache *morphCacheMetrics log logger.LogMetrics + multinet *multinetMetrics // nolint: unused appInfo *ApplicationInfo } @@ -53,6 +54,7 @@ func NewNodeMetrics() *NodeMetrics { morphCache: newMorphCacheMetrics(namespace), log: logger.NewLogMetrics(namespace), appInfo: NewApplicationInfo(misc.Version), + multinet: newMultinetMetrics(namespace), } } @@ -120,3 +122,7 @@ func (m *NodeMetrics) MorphCacheMetrics() MorphCacheMetrics { func (m *NodeMetrics) LogMetrics() logger.LogMetrics { return m.log } + +func (m *NodeMetrics) MultinetMetrics() MultinetMetrics { + return m.multinet +} diff --git a/internal/net/config.go b/internal/net/config.go index 10450db23..b84ac3b35 100644 --- a/internal/net/config.go +++ b/internal/net/config.go @@ -7,6 +7,7 @@ import ( "slices" "time" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" "git.frostfs.info/TrueCloudLab/multinet" ) @@ -23,6 +24,7 @@ type Config struct { Balancer string Restrict bool FallbackDelay time.Duration + Metrics metrics.MultinetMetrics } func (c Config) toMultinetConfig() (multinet.Config, error) { @@ -52,6 +54,7 @@ func (c Config) toMultinetConfig() (multinet.Config, error) { Restrict: c.Restrict, FallbackDelay: c.FallbackDelay, Dialer: newDefaulDialer(), + EventHandler: newEventHandler(c.Metrics), }, nil } diff --git a/internal/net/event_handler.go b/internal/net/event_handler.go new file mode 100644 index 000000000..024e5cf7c --- /dev/null +++ b/internal/net/event_handler.go @@ -0,0 +1,29 @@ +package net + +import ( + "net" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" + "git.frostfs.info/TrueCloudLab/multinet" +) + +var _ multinet.EventHandler = (*metricsEventHandler)(nil) + +type metricsEventHandler struct { + m metrics.MultinetMetrics +} + +func (m *metricsEventHandler) DialPerformed(sourceIP net.Addr, _ string, _ string, err error) { + sourceIPString := "undefined" + if sourceIP != nil { + sourceIPString = sourceIP.Network() + "://" + sourceIP.String() + } + m.m.Dial(sourceIPString, err == nil) +} + +func newEventHandler(m metrics.MultinetMetrics) multinet.EventHandler { + if m == nil { + return nil + } + return &metricsEventHandler{m: m} +} diff --git a/pkg/innerring/initialization.go b/pkg/innerring/initialization.go index c4aaeda56..cb0654b6e 100644 --- a/pkg/innerring/initialization.go +++ b/pkg/innerring/initialization.go @@ -463,6 +463,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- name: morphPrefix, from: fromSideChainBlock, morphCacheMetric: s.irMetrics.MorphCacheMetrics(), + multinetMetrics: s.irMetrics.Multinet(), } // create morph client diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index a4a52edec..b94312645 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -117,6 +117,7 @@ type ( sgn *transaction.Signer from uint32 // block height morphCacheMetric metrics.MorphCacheMetrics + multinetMetrics metrics.MultinetMetrics } ) @@ -487,7 +488,7 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c return nil, fmt.Errorf("%s chain client endpoints not provided", p.name) } - nc := parseMultinetConfig(p.cfg) + nc := parseMultinetConfig(p.cfg, p.multinetMetrics) ds, err := internalNet.NewDialerSource(nc) if err != nil { return nil, fmt.Errorf("dialer source: %w", err) @@ -550,12 +551,13 @@ func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) { return extraWallets, nil } -func parseMultinetConfig(cfg *viper.Viper) internalNet.Config { +func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNet.Config { nc := internalNet.Config{ Enabled: cfg.GetBool("multinet.enabled"), Balancer: cfg.GetString("multinet.balancer"), Restrict: cfg.GetBool("multinet.restrict"), FallbackDelay: cfg.GetDuration("multinet.fallback_delay"), + Metrics: m, } for i := 0; ; i++ { mask := cfg.GetString(fmt.Sprintf("multinet.subnets.%d.mask", i)) -- 2.45.3