Source-based routing support #1422
32 changed files with 724 additions and 105 deletions
|
@ -48,6 +48,8 @@ func defaultConfiguration(cfg *viper.Viper) {
|
|||
cfg.SetDefault("node.kludge_compatibility_mode", false)
|
||||
|
||||
cfg.SetDefault("audit.enabled", false)
|
||||
|
||||
setMultinetDefaults(cfg)
|
||||
}
|
||||
|
||||
func setControlDefaults(cfg *viper.Viper) {
|
||||
|
@ -131,3 +133,11 @@ func setMorphDefaults(cfg *viper.Viper) {
|
|||
cfg.SetDefault("morph.validators", []string{})
|
||||
cfg.SetDefault("morph.switch_interval", 2*time.Minute)
|
||||
}
|
||||
|
||||
func setMultinetDefaults(cfg *viper.Viper) {
|
||||
cfg.SetDefault("multinet.enabled", false)
|
||||
cfg.SetDefault("multinet.balancer", "")
|
||||
cfg.SetDefault("multinet.restrict", false)
|
||||
cfg.SetDefault("multinet.fallback_delay", "0s")
|
||||
cfg.SetDefault("multinet.subnets", "")
|
||||
}
|
||||
|
|
|
@ -26,12 +26,14 @@ import (
|
|||
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
||||
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/multinet"
|
||||
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
||||
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
||||
|
@ -436,6 +438,8 @@ type shared struct {
|
|||
metricsCollector *metrics.NodeMetrics
|
||||
|
||||
metricsSvc *objectService.MetricCollector
|
||||
|
||||
dialerSource *internalNet.DialerSource
|
||||
}
|
||||
|
||||
// dynamicConfiguration stores parameters of the
|
||||
|
@ -760,12 +764,18 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
|
|||
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
|
||||
fatalOnErr(err)
|
||||
|
||||
nodeMetrics := metrics.NewNodeMetrics()
|
||||
|
||||
ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg, nodeMetrics.MultinetMetrics()))
|
||||
fatalOnErr(err)
|
||||
|
||||
cacheOpts := cache.ClientCacheOpts{
|
||||
DialTimeout: apiclientconfig.DialTimeout(appCfg),
|
||||
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
|
||||
Key: &key.PrivateKey,
|
||||
AllowExternal: apiclientconfig.AllowExternal(appCfg),
|
||||
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
|
||||
DialerSource: ds,
|
||||
}
|
||||
|
||||
return shared{
|
||||
|
@ -777,10 +787,29 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
|
|||
bgClientCache: cache.NewSDKClientCache(cacheOpts),
|
||||
putClientCache: cache.NewSDKClientCache(cacheOpts),
|
||||
persistate: persistate,
|
||||
metricsCollector: metrics.NewNodeMetrics(),
|
||||
metricsCollector: nodeMetrics,
|
||||
dialerSource: ds,
|
||||
}
|
||||
}
|
||||
|
||||
func internalNetConfig(appCfg *config.Config, m metrics.MultinetMetrics) internalNet.Config {
|
||||
result := internalNet.Config{
|
||||
Enabled: multinet.Enabled(appCfg),
|
||||
Balancer: multinet.Balancer(appCfg),
|
||||
Restrict: multinet.Restrict(appCfg),
|
||||
FallbackDelay: multinet.FallbackDelay(appCfg),
|
||||
Metrics: m,
|
||||
}
|
||||
sn := multinet.Subnets(appCfg)
|
||||
for _, s := range sn {
|
||||
result.Subnets = append(result.Subnets, internalNet.Subnet{
|
||||
Prefix: s.Mask,
|
||||
SourceIPs: s.SourceIPs,
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) cfgNetmap {
|
||||
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
|
||||
fatalOnErr(err)
|
||||
|
@ -1336,6 +1365,11 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
if err := c.dialerSource.Update(internalNetConfig(c.appCfg, c.metricsCollector.MultinetMetrics())); err != nil {
|
||||
c.log.Error(logs.FailedToUpdateMultinetConfiguration, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
||||
}
|
||||
|
||||
|
|
62
cmd/frostfs-node/config/multinet/config.go
Normal file
62
cmd/frostfs-node/config/multinet/config.go
Normal file
|
@ -0,0 +1,62 @@
|
|||
package multinet
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
)
|
||||
|
||||
const (
|
||||
subsection = "multinet"
|
||||
|
||||
FallbackDelayDefault = 300 * time.Millisecond
|
||||
)
|
||||
|
||||
// Enabled returns the value of "enabled" config parameter from "multinet" section.
|
||||
func Enabled(c *config.Config) bool {
|
||||
return config.BoolSafe(c.Sub(subsection), "enabled")
|
||||
}
|
||||
|
||||
type Subnet struct {
|
||||
Mask string
|
||||
SourceIPs []string
|
||||
}
|
||||
|
||||
// Subnets returns the value of "subnets" config parameter from "multinet" section.
|
||||
func Subnets(c *config.Config) []Subnet {
|
||||
var result []Subnet
|
||||
sub := c.Sub(subsection).Sub("subnets")
|
||||
for i := 0; ; i++ {
|
||||
s := sub.Sub(strconv.FormatInt(int64(i), 10))
|
||||
mask := config.StringSafe(s, "mask")
|
||||
if mask == "" {
|
||||
break
|
||||
}
|
||||
sourceIPs := config.StringSliceSafe(s, "source_ips")
|
||||
result = append(result, Subnet{
|
||||
Mask: mask,
|
||||
SourceIPs: sourceIPs,
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Balancer returns the value of "balancer" config parameter from "multinet" section.
|
||||
func Balancer(c *config.Config) string {
|
||||
return config.StringSafe(c.Sub(subsection), "balancer")
|
||||
}
|
||||
|
||||
// Restrict returns the value of "restrict" config parameter from "multinet" section.
|
||||
func Restrict(c *config.Config) bool {
|
||||
return config.BoolSafe(c.Sub(subsection), "restrict")
|
||||
}
|
||||
|
||||
// FallbackDelay returns the value of "fallback_delay" config parameter from "multinet" section.
|
||||
func FallbackDelay(c *config.Config) time.Duration {
|
||||
fd := config.DurationSafe(c.Sub(subsection), "fallback_delay")
|
||||
if fd != 0 { // negative value means no fallback
|
||||
return fd
|
||||
}
|
||||
return FallbackDelayDefault
|
||||
}
|
52
cmd/frostfs-node/config/multinet/config_test.go
Normal file
52
cmd/frostfs-node/config/multinet/config_test.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package multinet
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMultinetSection(t *testing.T) {
|
||||
t.Run("defaults", func(t *testing.T) {
|
||||
empty := configtest.EmptyConfig()
|
||||
require.Equal(t, false, Enabled(empty))
|
||||
require.Equal(t, ([]Subnet)(nil), Subnets(empty))
|
||||
require.Equal(t, "", Balancer(empty))
|
||||
require.Equal(t, false, Restrict(empty))
|
||||
require.Equal(t, FallbackDelayDefault, FallbackDelay(empty))
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
require.Equal(t, true, Enabled(c))
|
||||
require.Equal(t, []Subnet{
|
||||
{
|
||||
Mask: "192.168.219.174/24",
|
||||
SourceIPs: []string{
|
||||
"192.168.218.185",
|
||||
"192.168.219.185",
|
||||
},
|
||||
},
|
||||
{
|
||||
Mask: "10.78.70.74/24",
|
||||
SourceIPs: []string{
|
||||
"10.78.70.185",
|
||||
"10.78.71.185",
|
||||
},
|
||||
},
|
||||
}, Subnets(c))
|
||||
require.Equal(t, "roundrobin", Balancer(c))
|
||||
require.Equal(t, false, Restrict(c))
|
||||
require.Equal(t, 350*time.Millisecond, FallbackDelay(c))
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
||||
t.Run("ENV", func(t *testing.T) {
|
||||
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||
})
|
||||
}
|
|
@ -29,51 +29,12 @@ const (
|
|||
)
|
||||
|
||||
func initMorphComponents(ctx context.Context, c *cfg) {
|
||||
addresses := morphconfig.RPCEndpoint(c.appCfg)
|
||||
|
||||
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
|
||||
// order of endpoints with the same priority.
|
||||
rand.Shuffle(len(addresses), func(i, j int) {
|
||||
addresses[i], addresses[j] = addresses[j], addresses[i]
|
||||
})
|
||||
|
||||
cli, err := client.New(ctx,
|
||||
c.key,
|
||||
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
|
||||
client.WithLogger(c.log),
|
||||
client.WithMetrics(c.metricsCollector.MorphClientMetrics()),
|
||||
client.WithEndpoints(addresses...),
|
||||
client.WithConnLostCallback(func() {
|
||||
c.internalErr <- errors.New("morph connection has been lost")
|
||||
}),
|
||||
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
|
||||
client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()),
|
||||
)
|
||||
if err != nil {
|
||||
c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient,
|
||||
zap.Any("endpoints", addresses),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
fatalOnErr(err)
|
||||
}
|
||||
|
||||
c.onShutdown(func() {
|
||||
c.log.Info(logs.FrostFSNodeClosingMorphComponents)
|
||||
cli.Close()
|
||||
})
|
||||
|
||||
if err := cli.SetGroupSignerScope(); err != nil {
|
||||
c.log.Info(logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err))
|
||||
}
|
||||
|
||||
c.cfgMorph.client = cli
|
||||
c.cfgMorph.notaryEnabled = cli.ProbeNotary()
|
||||
initMorphClient(ctx, c)
|
||||
|
||||
lookupScriptHashesInNNS(c) // smart contract auto negotiation
|
||||
|
||||
if c.cfgMorph.notaryEnabled {
|
||||
err = c.cfgMorph.client.EnableNotarySupport(
|
||||
err := c.cfgMorph.client.EnableNotarySupport(
|
||||
client.WithProxyContract(
|
||||
c.cfgMorph.proxyScriptHash,
|
||||
),
|
||||
|
@ -111,6 +72,50 @@ func initMorphComponents(ctx context.Context, c *cfg) {
|
|||
c.cfgNetmap.wrapper = wrap
|
||||
}
|
||||
|
||||
func initMorphClient(ctx context.Context, c *cfg) {
|
||||
addresses := morphconfig.RPCEndpoint(c.appCfg)
|
||||
|
||||
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
|
||||
// order of endpoints with the same priority.
|
||||
rand.Shuffle(len(addresses), func(i, j int) {
|
||||
addresses[i], addresses[j] = addresses[j], addresses[i]
|
||||
})
|
||||
|
||||
cli, err := client.New(ctx,
|
||||
c.key,
|
||||
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
|
||||
client.WithLogger(c.log),
|
||||
client.WithMetrics(c.metricsCollector.MorphClientMetrics()),
|
||||
client.WithEndpoints(addresses...),
|
||||
client.WithConnLostCallback(func() {
|
||||
c.internalErr <- errors.New("morph connection has been lost")
|
||||
}),
|
||||
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
|
||||
client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()),
|
||||
client.WithDialerSource(c.dialerSource),
|
||||
)
|
||||
if err != nil {
|
||||
c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient,
|
||||
zap.Any("endpoints", addresses),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
fatalOnErr(err)
|
||||
}
|
||||
|
||||
c.onShutdown(func() {
|
||||
c.log.Info(logs.FrostFSNodeClosingMorphComponents)
|
||||
cli.Close()
|
||||
})
|
||||
|
||||
if err := cli.SetGroupSignerScope(); err != nil {
|
||||
c.log.Info(logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err))
|
||||
}
|
||||
|
||||
c.cfgMorph.client = cli
|
||||
c.cfgMorph.notaryEnabled = cli.ProbeNotary()
|
||||
}
|
||||
|
||||
func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
|
||||
// skip notary deposit in non-notary environments
|
||||
if !c.cfgMorph.notaryEnabled {
|
||||
|
|
|
@ -67,6 +67,7 @@ func initTreeService(c *cfg) {
|
|||
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
|
||||
tree.WithAPEMorphRuleStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage()),
|
||||
tree.WithNetmapState(c.cfgNetmap.state),
|
||||
tree.WithDialerSource(c.dialerSource),
|
||||
)
|
||||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
|
|
|
@ -80,3 +80,12 @@ FROSTFS_IR_PPROF_MUTEX_RATE=10000
|
|||
FROSTFS_IR_PROMETHEUS_ENABLED=true
|
||||
FROSTFS_IR_PROMETHEUS_ADDRESS=localhost:9090
|
||||
FROSTFS_IR_PROMETHEUS_SHUTDOWN_TIMEOUT=30s
|
||||
|
||||
FROSTFS_MULTINET_ENABLED=true
|
||||
FROSTFS_MULTINET_SUBNETS_0_MASK="192.168.219.174/24"
|
||||
FROSTFS_MULTINET_SUBNETS_0_SOURCE_IPS="192.168.218.185 192.168.219.185"
|
||||
FROSTFS_MULTINET_SUBNETS_1_MASK="10.78.70.74/24"
|
||||
FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185"
|
||||
FROSTFS_MULTINET_BALANCER=roundrobin
|
||||
FROSTFS_MULTINET_RESTRICT=false
|
||||
FROSTFS_MULTINET_FALLBACK_DELAY=350ms
|
||||
|
|
|
@ -123,3 +123,18 @@ prometheus:
|
|||
|
||||
systemdnotify:
|
||||
enabled: true
|
||||
|
||||
multinet:
|
||||
enabled: true
|
||||
subnets:
|
||||
- mask: 192.168.219.174/24
|
||||
source_ips:
|
||||
- 192.168.218.185
|
||||
- 192.168.219.185
|
||||
- mask: 10.78.70.74/24
|
||||
source_ips:
|
||||
- 10.78.70.185
|
||||
- 10.78.71.185
|
||||
balancer: roundrobin
|
||||
restrict: false
|
||||
fallback_delay: 350ms
|
||||
|
|
|
@ -206,3 +206,13 @@ FROSTFS_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
|||
|
||||
# AUDIT section
|
||||
FROSTFS_AUDIT_ENABLED=true
|
||||
|
||||
# MULTINET section
|
||||
FROSTFS_MULTINET_ENABLED=true
|
||||
FROSTFS_MULTINET_SUBNETS_0_MASK="192.168.219.174/24"
|
||||
FROSTFS_MULTINET_SUBNETS_0_SOURCE_IPS="192.168.218.185 192.168.219.185"
|
||||
FROSTFS_MULTINET_SUBNETS_1_MASK="10.78.70.74/24"
|
||||
FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185"
|
||||
FROSTFS_MULTINET_BALANCER=roundrobin
|
||||
FROSTFS_MULTINET_RESTRICT=false
|
||||
FROSTFS_MULTINET_FALLBACK_DELAY=350ms
|
||||
|
|
|
@ -264,5 +264,27 @@
|
|||
},
|
||||
"audit": {
|
||||
"enabled": true
|
||||
},
|
||||
"multinet": {
|
||||
"enabled": true,
|
||||
"subnets": [
|
||||
{
|
||||
"mask": "192.168.219.174/24",
|
||||
"source_ips": [
|
||||
"192.168.218.185",
|
||||
"192.168.219.185"
|
||||
]
|
||||
},
|
||||
{
|
||||
"mask": "10.78.70.74/24",
|
||||
"source_ips":[
|
||||
"10.78.70.185",
|
||||
"10.78.71.185"
|
||||
]
|
||||
}
|
||||
],
|
||||
"balancer": "roundrobin",
|
||||
"restrict": false,
|
||||
"fallback_delay": "350ms"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -240,3 +240,18 @@ runtime:
|
|||
|
||||
audit:
|
||||
enabled: true
|
||||
|
||||
multinet:
|
||||
enabled: true
|
||||
subnets:
|
||||
- mask: 192.168.219.174/24
|
||||
source_ips:
|
||||
- 192.168.218.185
|
||||
- 192.168.219.185
|
||||
- mask: 10.78.70.74/24
|
||||
source_ips:
|
||||
- 10.78.70.185
|
||||
- 10.78.71.185
|
||||
balancer: roundrobin
|
||||
restrict: false
|
||||
fallback_delay: 350ms
|
||||
|
|
|
@ -25,8 +25,8 @@ There are some custom types used for brevity:
|
|||
| `replicator` | [Replicator service configuration](#replicator-section) |
|
||||
| `storage` | [Storage engine configuration](#storage-section) |
|
||||
| `runtime` | [Runtime configuration](#runtime-section) |
|
||||
| `audit` | [Audit configuration](#audit-section) |
|
||||
|
||||
| `audit` | [Audit configuration](#audit-section) |
|
||||
| `multinet` | [Multinet configuration](#multinet-section) |
|
||||
|
||||
# `control` section
|
||||
```yaml
|
||||
|
@ -435,6 +435,35 @@ audit:
|
|||
enabled: true
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|---------------------|--------|---------------|---------------------------------------------------|
|
||||
| `soft_memory_limit` | `bool` | false | If `true` then audit event logs will be recorded. |
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------|--------|---------------|---------------------------------------------------|
|
||||
| `enabled` | `bool` | false | If `true` then audit event logs will be recorded. |
|
||||
|
||||
|
||||
# `multinet` section
|
||||
Contains multinet parameters.
|
||||
|
||||
```yaml
|
||||
multinet:
|
||||
enabled: true
|
||||
subnets:
|
||||
- mask: 192.168.219.174/24
|
||||
source_ips:
|
||||
- 192.168.218.185
|
||||
- 192.168.219.185
|
||||
- mask: 10.78.70.74/24
|
||||
source_ips:
|
||||
- 10.78.70.185
|
||||
- 10.78.71.185
|
||||
balancer: roundrobin
|
||||
restrict: false
|
||||
fallback_delay: 350ms
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
| ---------------- | ---------- | ------------- | -------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `enabled` | `bool` | false | If `true` then source-based routing is enabled. |
|
||||
| `subnets` | `subnet` | empty | Resulting subnets. |
|
||||
| `balancer` | `string` | "" | Balancer to select network interfaces, allowed values are "" (no balancing, use first suitable interface) or "roundrobin". |
|
||||
| `restrict` | `bool` | false | If `true` then any requests that do not match `subnets` will fail. |
|
||||
| `fallback_delay` | `duration` | 350ms | Delay before fallback to secondary IP addresses in case of hostname resolve. |
|
||||
|
|
3
go.mod
3
go.mod
|
@ -11,6 +11,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
|
@ -132,4 +133,4 @@ require (
|
|||
rsc.io/tmplfunc v0.0.3 // indirect
|
||||
)
|
||||
|
||||
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928
|
||||
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -523,4 +523,5 @@ const (
|
|||
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty"
|
||||
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
||||
WritecacheCantGetObject = "can't get an object from fstree"
|
||||
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
||||
)
|
||||
|
|
|
@ -22,6 +22,7 @@ const (
|
|||
grpcServerSubsystem = "grpc_server"
|
||||
policerSubsystem = "policer"
|
||||
commonCacheSubsystem = "common_cache"
|
||||
multinetSubsystem = "multinet"
|
||||
|
||||
successLabel = "success"
|
||||
shardIDLabel = "shard_id"
|
||||
|
@ -41,6 +42,7 @@ const (
|
|||
endpointLabel = "endpoint"
|
||||
hitLabel = "hit"
|
||||
cacheLabel = "cache"
|
||||
sourceIPLabel = "source_ip"
|
||||
|
||||
readWriteMode = "READ_WRITE"
|
||||
readOnlyMode = "READ_ONLY"
|
||||
|
|
|
@ -17,6 +17,7 @@ type InnerRingServiceMetrics struct {
|
|||
eventDuration *prometheus.HistogramVec
|
||||
morphCacheMetrics *morphCacheMetrics
|
||||
logMetrics logger.LogMetrics
|
||||
multinet *multinetMetrics
|
||||
// nolint: unused
|
||||
appInfo *ApplicationInfo
|
||||
}
|
||||
|
@ -51,6 +52,7 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
|
|||
morphCacheMetrics: newMorphCacheMetrics(innerRingNamespace),
|
||||
appInfo: NewApplicationInfo(misc.Version),
|
||||
logMetrics: logger.NewLogMetrics(innerRingNamespace),
|
||||
multinet: newMultinetMetrics(innerRingNamespace),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,3 +80,7 @@ func (m *InnerRingServiceMetrics) MorphCacheMetrics() MorphCacheMetrics {
|
|||
func (m *InnerRingServiceMetrics) LogMetrics() logger.LogMetrics {
|
||||
return m.logMetrics
|
||||
}
|
||||
|
||||
func (m *InnerRingServiceMetrics) Multinet() MultinetMetrics {
|
||||
return m.multinet
|
||||
}
|
||||
|
|
35
internal/metrics/multinet.go
Normal file
35
internal/metrics/multinet.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type multinetMetrics struct {
|
||||
dials *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
type MultinetMetrics interface {
|
||||
Dial(sourceIP string, success bool)
|
||||
}
|
||||
|
||||
func newMultinetMetrics(ns string) *multinetMetrics {
|
||||
return &multinetMetrics{
|
||||
dials: metrics.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Subsystem: multinetSubsystem,
|
||||
Name: "dial_count",
|
||||
Help: "Dials count performed by multinet",
|
||||
}, []string{sourceIPLabel, successLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *multinetMetrics) Dial(sourceIP string, success bool) {
|
||||
m.dials.With(prometheus.Labels{
|
||||
sourceIPLabel: sourceIP,
|
||||
successLabel: strconv.FormatBool(success),
|
||||
}).Inc()
|
||||
}
|
|
@ -25,6 +25,7 @@ type NodeMetrics struct {
|
|||
morphClient *morphClientMetrics
|
||||
morphCache *morphCacheMetrics
|
||||
log logger.LogMetrics
|
||||
multinet *multinetMetrics
|
||||
// nolint: unused
|
||||
appInfo *ApplicationInfo
|
||||
}
|
||||
|
@ -53,6 +54,7 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
morphCache: newMorphCacheMetrics(namespace),
|
||||
log: logger.NewLogMetrics(namespace),
|
||||
appInfo: NewApplicationInfo(misc.Version),
|
||||
multinet: newMultinetMetrics(namespace),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -120,3 +122,7 @@ func (m *NodeMetrics) MorphCacheMetrics() MorphCacheMetrics {
|
|||
func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
|
||||
return m.log
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) MultinetMetrics() MultinetMetrics {
|
||||
return m.multinet
|
||||
}
|
||||
|
|
69
internal/net/config.go
Normal file
69
internal/net/config.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/multinet"
|
||||
)
|
||||
|
||||
var errEmptySourceIPList = errors.New("empty source IP list")
|
||||
|
||||
type Subnet struct {
|
||||
Prefix string
|
||||
SourceIPs []string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Enabled bool
|
||||
Subnets []Subnet
|
||||
Balancer string
|
||||
Restrict bool
|
||||
FallbackDelay time.Duration
|
||||
Metrics metrics.MultinetMetrics
|
||||
}
|
||||
|
||||
func (c Config) toMultinetConfig() (multinet.Config, error) {
|
||||
var subnets []multinet.Subnet
|
||||
for _, s := range c.Subnets {
|
||||
var ms multinet.Subnet
|
||||
p, err := netip.ParsePrefix(s.Prefix)
|
||||
if err != nil {
|
||||
return multinet.Config{}, fmt.Errorf("parse IP prefix '%s': %w", s.Prefix, err)
|
||||
}
|
||||
ms.Prefix = p
|
||||
for _, ip := range s.SourceIPs {
|
||||
addr, err := netip.ParseAddr(ip)
|
||||
if err != nil {
|
||||
return multinet.Config{}, fmt.Errorf("parse IP address '%s': %w", ip, err)
|
||||
}
|
||||
ms.SourceIPs = append(ms.SourceIPs, addr)
|
||||
}
|
||||
if len(ms.SourceIPs) == 0 {
|
||||
return multinet.Config{}, errEmptySourceIPList
|
||||
}
|
||||
subnets = append(subnets, ms)
|
||||
}
|
||||
return multinet.Config{
|
||||
Subnets: subnets,
|
||||
Balancer: multinet.BalancerType(c.Balancer),
|
||||
Restrict: c.Restrict,
|
||||
FallbackDelay: c.FallbackDelay,
|
||||
Dialer: newDefaulDialer(),
|
||||
EventHandler: newEventHandler(c.Metrics),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c Config) equals(other Config) bool {
|
||||
return c.Enabled == other.Enabled &&
|
||||
slices.EqualFunc(c.Subnets, other.Subnets, func(lhs, rhs Subnet) bool {
|
||||
return lhs.Prefix == rhs.Prefix && slices.Equal(lhs.SourceIPs, rhs.SourceIPs)
|
||||
}) &&
|
||||
c.Balancer == other.Balancer &&
|
||||
c.Restrict == other.Restrict &&
|
||||
c.FallbackDelay == other.FallbackDelay
|
||||
}
|
54
internal/net/dial_target.go
Normal file
54
internal/net/dial_target.go
Normal file
|
@ -0,0 +1,54 @@
|
|||
// NOTE: code is taken from https://github.com/grpc/grpc-go/blob/v1.68.x/internal/transport/http_util.go
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2014 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package net
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// parseDialTarget returns the network and address to pass to dialer.
|
||||
func parseDialTarget(target string) (string, string) {
|
||||
net := "tcp"
|
||||
m1 := strings.Index(target, ":")
|
||||
m2 := strings.Index(target, ":/")
|
||||
// handle unix:addr which will fail with url.Parse
|
||||
if m1 >= 0 && m2 < 0 {
|
||||
if n := target[0:m1]; n == "unix" {
|
||||
return n, target[m1+1:]
|
||||
}
|
||||
}
|
||||
if m2 >= 0 {
|
||||
t, err := url.Parse(target)
|
||||
if err != nil {
|
||||
return net, target
|
||||
}
|
||||
scheme := t.Scheme
|
||||
addr := t.Path
|
||||
if scheme == "unix" {
|
||||
if addr == "" {
|
||||
addr = t.Host
|
||||
}
|
||||
return scheme, addr
|
||||
}
|
||||
}
|
||||
return net, target
|
||||
}
|
39
internal/net/dialer.go
Normal file
39
internal/net/dialer.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
type Dialer interface {
|
||||
DialContext(ctx context.Context, network, address string) (net.Conn, error)
|
||||
}
|
||||
|
||||
func DialContextTCP(ctx context.Context, address string, d Dialer) (net.Conn, error) {
|
||||
return d.DialContext(ctx, "tcp", address)
|
||||
}
|
||||
|
||||
func newDefaulDialer() net.Dialer {
|
||||
// From `grpc.WithContextDialer` comment:
|
||||
//
|
||||
// Note: All supported releases of Go (as of December 2023) override the OS
|
||||
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
|
||||
// with OS defaults for keepalive time and interval, use a net.Dialer that sets
|
||||
// the KeepAlive field to a negative value, and sets the SO_KEEPALIVE socket
|
||||
// option to true from the Control field. For a concrete example of how to do
|
||||
// this, see internal.NetDialerWithTCPKeepalive().
|
||||
//
|
||||
// https://github.com/grpc/grpc-go/blob/830135e6c5a351abf75f0c9cfdf978e5df8daeba/dialoptions.go#L432
|
||||
return net.Dialer{
|
||||
KeepAlive: time.Duration(-1),
|
||||
Control: func(_, _ string, c syscall.RawConn) error {
|
||||
return c.Control(func(fd uintptr) {
|
||||
_ = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
83
internal/net/dialer_source.go
Normal file
83
internal/net/dialer_source.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/multinet"
|
||||
)
|
||||
|
||||
type DialerSource struct {
|
||||
guard sync.RWMutex
|
||||
|
||||
c Config
|
||||
|
||||
md multinet.Dialer
|
||||
}
|
||||
|
||||
func NewDialerSource(c Config) (*DialerSource, error) {
|
||||
result := &DialerSource{}
|
||||
if err := result.build(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *DialerSource) build(c Config) error {
|
||||
if c.Enabled {
|
||||
mc, err := c.toMultinetConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
md, err := multinet.NewDialer(mc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.md = md
|
||||
s.c = c
|
||||
return nil
|
||||
}
|
||||
s.md = nil
|
||||
s.c = c
|
||||
return nil
|
||||
}
|
||||
|
||||
// GrpcContextDialer returns grpc.WithContextDialer func.
|
||||
// Returns nil if multinet disabled.
|
||||
func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Conn, error) {
|
||||
s.guard.RLock()
|
||||
defer s.guard.RUnlock()
|
||||
|
||||
if s.c.Enabled {
|
||||
return func(ctx context.Context, address string) (net.Conn, error) {
|
||||
network, address := parseDialTarget(address)
|
||||
return s.md.DialContext(ctx, network, address)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NetContextDialer returns net.DialContext dial function.
|
||||
// Returns nil if multinet disabled.
|
||||
func (s *DialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
|
||||
s.guard.RLock()
|
||||
defer s.guard.RUnlock()
|
||||
|
||||
if s.c.Enabled {
|
||||
return func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
return s.md.DialContext(ctx, network, address)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DialerSource) Update(c Config) error {
|
||||
s.guard.Lock()
|
||||
defer s.guard.Unlock()
|
||||
|
||||
if s.c.equals(c) {
|
||||
return nil
|
||||
}
|
||||
return s.build(c)
|
||||
}
|
29
internal/net/event_handler.go
Normal file
29
internal/net/event_handler.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/multinet"
|
||||
)
|
||||
|
||||
var _ multinet.EventHandler = (*metricsEventHandler)(nil)
|
||||
|
||||
type metricsEventHandler struct {
|
||||
m metrics.MultinetMetrics
|
||||
}
|
||||
|
||||
func (m *metricsEventHandler) DialPerformed(sourceIP net.Addr, _ string, _ string, err error) {
|
||||
sourceIPString := "undefined"
|
||||
if sourceIP != nil {
|
||||
sourceIPString = sourceIP.Network() + "://" + sourceIP.String()
|
||||
}
|
||||
m.m.Dial(sourceIPString, err == nil)
|
||||
}
|
||||
|
||||
func newEventHandler(m metrics.MultinetMetrics) multinet.EventHandler {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
return &metricsEventHandler{m: m}
|
||||
}
|
|
@ -463,6 +463,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
|
|||
name: morphPrefix,
|
||||
from: fromSideChainBlock,
|
||||
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
|
||||
multinetMetrics: s.irMetrics.Multinet(),
|
||||
}
|
||||
|
||||
// create morph client
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
||||
|
@ -116,6 +117,7 @@ type (
|
|||
sgn *transaction.Signer
|
||||
from uint32 // block height
|
||||
morphCacheMetric metrics.MorphCacheMetrics
|
||||
multinetMetrics metrics.MultinetMetrics
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -486,6 +488,12 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
|
|||
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
|
||||
}
|
||||
|
||||
nc := parseMultinetConfig(p.cfg, p.multinetMetrics)
|
||||
ds, err := internalNet.NewDialerSource(nc)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dialer source: %w", err)
|
||||
}
|
||||
|
||||
return client.New(
|
||||
ctx,
|
||||
p.key,
|
||||
|
@ -498,6 +506,7 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
|
|||
}),
|
||||
client.WithSwitchInterval(p.cfg.GetDuration(p.name+".switch_interval")),
|
||||
client.WithMorphCacheMetrics(p.morphCacheMetric),
|
||||
client.WithDialerSource(ds),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -542,6 +551,28 @@ func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) {
|
|||
return extraWallets, nil
|
||||
}
|
||||
|
||||
func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNet.Config {
|
||||
nc := internalNet.Config{
|
||||
Enabled: cfg.GetBool("multinet.enabled"),
|
||||
Balancer: cfg.GetString("multinet.balancer"),
|
||||
Restrict: cfg.GetBool("multinet.restrict"),
|
||||
FallbackDelay: cfg.GetDuration("multinet.fallback_delay"),
|
||||
Metrics: m,
|
||||
}
|
||||
for i := 0; ; i++ {
|
||||
mask := cfg.GetString(fmt.Sprintf("multinet.subnets.%d.mask", i))
|
||||
if mask == "" {
|
||||
break
|
||||
}
|
||||
sourceIPs := cfg.GetStringSlice(fmt.Sprintf("multinet.subnets.%d.source_ips", i))
|
||||
nc.Subnets = append(nc.Subnets, internalNet.Subnet{
|
||||
Prefix: mask,
|
||||
SourceIPs: sourceIPs,
|
||||
})
|
||||
}
|
||||
return nc
|
||||
}
|
||||
|
||||
func (s *Server) initConfigFromBlockchain() error {
|
||||
// get current epoch
|
||||
epoch, err := s.netmapClient.Epoch()
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
|
@ -41,13 +42,13 @@ type cfg struct {
|
|||
|
||||
endpoints []Endpoint
|
||||
|
||||
singleCli *rpcclient.WSClient // neo-go client for single client mode
|
||||
|
||||
inactiveModeCb Callback
|
||||
|
||||
switchInterval time.Duration
|
||||
|
||||
morphCacheMetrics metrics.MorphCacheMetrics
|
||||
|
||||
dialerSource *internalNet.DialerSource
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -124,40 +125,24 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
|
|||
|
||||
var err error
|
||||
var act *actor.Actor
|
||||
if cfg.singleCli != nil {
|
||||
// return client in single RPC node mode that uses
|
||||
// predefined WS client
|
||||
//
|
||||
// in case of the closing web socket connection:
|
||||
// if extra endpoints were provided via options,
|
||||
// they will be used in switch process, otherwise
|
||||
// inactive mode will be enabled
|
||||
cli.client = cfg.singleCli
|
||||
|
||||
act, err = newActor(cfg.singleCli, acc, *cfg)
|
||||
var endpoint Endpoint
|
||||
for cli.endpoints.curr, endpoint = range cli.endpoints.list {
|
||||
cli.client, act, err = cli.newCli(ctx, endpoint)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create RPC actor: %w", err)
|
||||
}
|
||||
} else {
|
||||
var endpoint Endpoint
|
||||
for cli.endpoints.curr, endpoint = range cli.endpoints.list {
|
||||
cli.client, act, err = cli.newCli(ctx, endpoint)
|
||||
if err != nil {
|
||||
cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint,
|
||||
zap.Error(err), zap.String("endpoint", endpoint.Address))
|
||||
} else {
|
||||
cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
|
||||
zap.String("endpoint", endpoint.Address))
|
||||
if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 {
|
||||
cli.switchIsActive.Store(true)
|
||||
go cli.switchToMostPrioritized(ctx)
|
||||
}
|
||||
break
|
||||
cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint,
|
||||
zap.Error(err), zap.String("endpoint", endpoint.Address))
|
||||
} else {
|
||||
cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
|
||||
zap.String("endpoint", endpoint.Address))
|
||||
if cli.endpoints.curr > 0 && cli.cfg.switchInterval != 0 {
|
||||
cli.switchIsActive.Store(true)
|
||||
go cli.switchToMostPrioritized(ctx)
|
||||
}
|
||||
break
|
||||
}
|
||||
if cli.client == nil {
|
||||
return nil, ErrNoHealthyEndpoint
|
||||
}
|
||||
}
|
||||
if cli.client == nil {
|
||||
return nil, ErrNoHealthyEndpoint
|
||||
}
|
||||
cli.setActor(act)
|
||||
|
||||
|
@ -175,6 +160,7 @@ func (c *Client) newCli(ctx context.Context, endpoint Endpoint) (*rpcclient.WSCl
|
|||
Options: rpcclient.Options{
|
||||
DialTimeout: c.cfg.dialTimeout,
|
||||
TLSClientConfig: cfg,
|
||||
NetDialContext: c.cfg.dialerSource.NetContextDialer(),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -285,17 +271,6 @@ func WithEndpoints(endpoints ...Endpoint) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithSingleClient returns a client constructor option
|
||||
// that specifies single neo-go client and forces Client
|
||||
// to use it for requests.
|
||||
//
|
||||
// Passed client must already be initialized.
|
||||
func WithSingleClient(cli *rpcclient.WSClient) Option {
|
||||
return func(c *cfg) {
|
||||
c.singleCli = cli
|
||||
}
|
||||
}
|
||||
|
||||
// WithConnLostCallback return a client constructor option
|
||||
// that specifies a callback that is called when Client
|
||||
// unsuccessfully tried to connect to all the specified
|
||||
|
@ -320,3 +295,9 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
|
|||
c.morphCacheMetrics = morphCacheMetrics
|
||||
}
|
||||
}
|
||||
|
||||
func WithDialerSource(ds *internalNet.DialerSource) Option {
|
||||
return func(c *cfg) {
|
||||
c.dialerSource = ds
|
||||
}
|
||||
}
|
||||
|
|
2
pkg/network/cache/client.go
vendored
2
pkg/network/cache/client.go
vendored
|
@ -5,6 +5,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
)
|
||||
|
@ -25,6 +26,7 @@ type (
|
|||
Key *ecdsa.PrivateKey
|
||||
ResponseCallback func(client.ResponseMetaInfo) error
|
||||
AllowExternal bool
|
||||
DialerSource *net.DialerSource
|
||||
}
|
||||
)
|
||||
|
||||
|
|
25
pkg/network/cache/multi.go
vendored
25
pkg/network/cache/multi.go
vendored
|
@ -60,18 +60,21 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
|||
prmInit.Key = *x.opts.Key
|
||||
}
|
||||
|
||||
grpcOpts := []grpc.DialOption{
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
||||
}
|
||||
|
||||
prmDial := client.PrmDial{
|
||||
Endpoint: addr.URIAddr(),
|
||||
GRPCDialOptions: []grpc.DialOption{
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing.NewUnaryClientInteceptor(),
|
||||
),
|
||||
grpc.WithChainStreamInterceptor(
|
||||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
},
|
||||
Endpoint: addr.URIAddr(),
|
||||
GRPCDialOptions: grpcOpts,
|
||||
}
|
||||
if x.opts.DialTimeout > 0 {
|
||||
prmDial.DialTimeout = x.opts.DialTimeout
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
|
@ -21,6 +22,7 @@ type clientCache struct {
|
|||
sync.Mutex
|
||||
simplelru.LRU[string, cacheItem]
|
||||
key *ecdsa.PrivateKey
|
||||
ds *internalNet.DialerSource
|
||||
}
|
||||
|
||||
type cacheItem struct {
|
||||
|
@ -36,7 +38,7 @@ const (
|
|||
|
||||
var errRecentlyFailed = errors.New("client has recently failed")
|
||||
|
||||
func (c *clientCache) init(pk *ecdsa.PrivateKey) {
|
||||
func (c *clientCache) init(pk *ecdsa.PrivateKey, ds *internalNet.DialerSource) {
|
||||
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) {
|
||||
if conn := value.cc; conn != nil {
|
||||
_ = conn.Close()
|
||||
|
@ -44,6 +46,7 @@ func (c *clientCache) init(pk *ecdsa.PrivateKey) {
|
|||
})
|
||||
c.LRU = *l
|
||||
c.key = pk
|
||||
c.ds = ds
|
||||
}
|
||||
|
||||
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
|
||||
|
@ -99,6 +102,7 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
|||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
||||
}
|
||||
|
||||
if !netAddr.IsTLSEnabled() {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"crypto/ecdsa"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
|
@ -45,6 +46,7 @@ type cfg struct {
|
|||
morphChainStorage policyengine.MorphRuleChainStorageReader
|
||||
|
||||
metrics MetricsRegister
|
||||
ds *net.DialerSource
|
||||
}
|
||||
|
||||
// Option represents configuration option for a tree service.
|
||||
|
@ -161,3 +163,9 @@ func WithNetmapState(state netmap.State) Option {
|
|||
c.state = state
|
||||
}
|
||||
}
|
||||
|
||||
func WithDialerSource(ds *net.DialerSource) Option {
|
||||
return func(c *cfg) {
|
||||
c.ds = ds
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ func New(opts ...Option) *Service {
|
|||
s.log = &logger.Logger{Logger: zap.NewNop()}
|
||||
}
|
||||
|
||||
s.cache.init(s.key)
|
||||
s.cache.init(s.key, s.ds)
|
||||
s.closeCh = make(chan struct{})
|
||||
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
|
||||
s.replicateLocalCh = make(chan applyOp)
|
||||
|
|
Loading…
Reference in a new issue