Source-based routing support #1422
|
@ -48,6 +48,8 @@ func defaultConfiguration(cfg *viper.Viper) {
|
|||
cfg.SetDefault("node.kludge_compatibility_mode", false)
|
||||
|
||||
cfg.SetDefault("audit.enabled", false)
|
||||
|
||||
setMultinetDefaults(cfg)
|
||||
}
|
||||
|
||||
func setControlDefaults(cfg *viper.Viper) {
|
||||
|
@ -131,3 +133,11 @@ func setMorphDefaults(cfg *viper.Viper) {
|
|||
cfg.SetDefault("morph.validators", []string{})
|
||||
cfg.SetDefault("morph.switch_interval", 2*time.Minute)
|
||||
}
|
||||
|
||||
func setMultinetDefaults(cfg *viper.Viper) {
|
||||
cfg.SetDefault("multinet.enabled", false)
|
||||
cfg.SetDefault("multinet.balancer", "")
|
||||
cfg.SetDefault("multinet.restrict", false)
|
||||
cfg.SetDefault("multinet.fallback_delay", "0s")
|
||||
cfg.SetDefault("multinet.subnets", "")
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
}
|
||||
|
|
|
@ -26,12 +26,14 @@ import (
|
|||
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
||||
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/multinet"
|
||||
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
||||
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
||||
|
@ -436,6 +438,8 @@ type shared struct {
|
|||
metricsCollector *metrics.NodeMetrics
|
||||
|
||||
metricsSvc *objectService.MetricCollector
|
||||
|
||||
dialerSource *internalNet.DialerSource
|
||||
}
|
||||
|
||||
// dynamicConfiguration stores parameters of the
|
||||
|
@ -760,12 +764,18 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
|
|||
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
|
||||
fatalOnErr(err)
|
||||
|
||||
nodeMetrics := metrics.NewNodeMetrics()
|
||||
|
||||
ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg, nodeMetrics.MultinetMetrics()))
|
||||
fatalOnErr(err)
|
||||
|
||||
cacheOpts := cache.ClientCacheOpts{
|
||||
DialTimeout: apiclientconfig.DialTimeout(appCfg),
|
||||
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
|
||||
Key: &key.PrivateKey,
|
||||
AllowExternal: apiclientconfig.AllowExternal(appCfg),
|
||||
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
|
||||
DialerSource: ds,
|
||||
}
|
||||
|
||||
return shared{
|
||||
|
@ -777,10 +787,29 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
|
|||
bgClientCache: cache.NewSDKClientCache(cacheOpts),
|
||||
putClientCache: cache.NewSDKClientCache(cacheOpts),
|
||||
persistate: persistate,
|
||||
metricsCollector: metrics.NewNodeMetrics(),
|
||||
metricsCollector: nodeMetrics,
|
||||
dialerSource: ds,
|
||||
}
|
||||
}
|
||||
|
||||
func internalNetConfig(appCfg *config.Config, m metrics.MultinetMetrics) internalNet.Config {
|
||||
result := internalNet.Config{
|
||||
Enabled: multinet.Enabled(appCfg),
|
||||
Balancer: multinet.Balancer(appCfg),
|
||||
Restrict: multinet.Restrict(appCfg),
|
||||
FallbackDelay: multinet.FallbackDelay(appCfg),
|
||||
Metrics: m,
|
||||
}
|
||||
sn := multinet.Subnets(appCfg)
|
||||
for _, s := range sn {
|
||||
result.Subnets = append(result.Subnets, internalNet.Subnet{
|
||||
Prefix: s.Mask,
|
||||
SourceIPs: s.SourceIPs,
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) cfgNetmap {
|
||||
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
|
||||
fatalOnErr(err)
|
||||
|
@ -1336,6 +1365,11 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
if err := c.dialerSource.Update(internalNetConfig(c.appCfg, c.metricsCollector.MultinetMetrics())); err != nil {
|
||||
c.log.Error(logs.FailedToUpdateMultinetConfiguration, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
||||
}
|
||||
|
||||
|
|
62
cmd/frostfs-node/config/multinet/config.go
Normal file
|
@ -0,0 +1,62 @@
|
|||
package multinet
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
)
|
||||
|
||||
const (
|
||||
subsection = "multinet"
|
||||
|
||||
FallbackDelayDefault = 300 * time.Millisecond
|
||||
)
|
||||
|
||||
// Enabled returns the value of "enabled" config parameter from "multinet" section.
|
||||
func Enabled(c *config.Config) bool {
|
||||
return config.BoolSafe(c.Sub(subsection), "enabled")
|
||||
}
|
||||
|
||||
type Subnet struct {
|
||||
Mask string
|
||||
SourceIPs []string
|
||||
}
|
||||
|
||||
// Subnets returns the value of "subnets" config parameter from "multinet" section.
|
||||
func Subnets(c *config.Config) []Subnet {
|
||||
var result []Subnet
|
||||
sub := c.Sub(subsection).Sub("subnets")
|
||||
for i := 0; ; i++ {
|
||||
s := sub.Sub(strconv.FormatInt(int64(i), 10))
|
||||
mask := config.StringSafe(s, "mask")
|
||||
if mask == "" {
|
||||
break
|
||||
}
|
||||
sourceIPs := config.StringSliceSafe(s, "source_ips")
|
||||
result = append(result, Subnet{
|
||||
Mask: mask,
|
||||
SourceIPs: sourceIPs,
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Balancer returns the value of "balancer" config parameter from "multinet" section.
|
||||
func Balancer(c *config.Config) string {
|
||||
return config.StringSafe(c.Sub(subsection), "balancer")
|
||||
}
|
||||
|
||||
// Restrict returns the value of "restrict" config parameter from "multinet" section.
|
||||
func Restrict(c *config.Config) bool {
|
||||
return config.BoolSafe(c.Sub(subsection), "restrict")
|
||||
}
|
||||
|
||||
// FallbackDelay returns the value of "fallback_delay" config parameter from "multinet" section.
|
||||
func FallbackDelay(c *config.Config) time.Duration {
|
||||
fd := config.DurationSafe(c.Sub(subsection), "fallback_delay")
|
||||
if fd != 0 { // negative value means no fallback
|
||||
return fd
|
||||
}
|
||||
return FallbackDelayDefault
|
||||
}
|
52
cmd/frostfs-node/config/multinet/config_test.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package multinet
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMultinetSection(t *testing.T) {
|
||||
t.Run("defaults", func(t *testing.T) {
|
||||
empty := configtest.EmptyConfig()
|
||||
require.Equal(t, false, Enabled(empty))
|
||||
require.Equal(t, ([]Subnet)(nil), Subnets(empty))
|
||||
require.Equal(t, "", Balancer(empty))
|
||||
require.Equal(t, false, Restrict(empty))
|
||||
require.Equal(t, FallbackDelayDefault, FallbackDelay(empty))
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
require.Equal(t, true, Enabled(c))
|
||||
require.Equal(t, []Subnet{
|
||||
{
|
||||
Mask: "192.168.219.174/24",
|
||||
SourceIPs: []string{
|
||||
"192.168.218.185",
|
||||
"192.168.219.185",
|
||||
},
|
||||
},
|
||||
{
|
||||
Mask: "10.78.70.74/24",
|
||||
SourceIPs: []string{
|
||||
"10.78.70.185",
|
||||
"10.78.71.185",
|
||||
},
|
||||
},
|
||||
}, Subnets(c))
|
||||
require.Equal(t, "roundrobin", Balancer(c))
|
||||
require.Equal(t, false, Restrict(c))
|
||||
require.Equal(t, 350*time.Millisecond, FallbackDelay(c))
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
||||
t.Run("ENV", func(t *testing.T) {
|
||||
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||
})
|
||||
}
|
|
@ -29,51 +29,12 @@ const (
|
|||
)
|
||||
|
||||
func initMorphComponents(ctx context.Context, c *cfg) {
|
||||
addresses := morphconfig.RPCEndpoint(c.appCfg)
|
||||
|
||||
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
|
||||
// order of endpoints with the same priority.
|
||||
rand.Shuffle(len(addresses), func(i, j int) {
|
||||
addresses[i], addresses[j] = addresses[j], addresses[i]
|
||||
})
|
||||
|
||||
cli, err := client.New(ctx,
|
||||
c.key,
|
||||
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
|
||||
client.WithLogger(c.log),
|
||||
client.WithMetrics(c.metricsCollector.MorphClientMetrics()),
|
||||
client.WithEndpoints(addresses...),
|
||||
client.WithConnLostCallback(func() {
|
||||
c.internalErr <- errors.New("morph connection has been lost")
|
||||
}),
|
||||
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
|
||||
client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()),
|
||||
)
|
||||
if err != nil {
|
||||
c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient,
|
||||
zap.Any("endpoints", addresses),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
fatalOnErr(err)
|
||||
}
|
||||
|
||||
c.onShutdown(func() {
|
||||
c.log.Info(logs.FrostFSNodeClosingMorphComponents)
|
||||
cli.Close()
|
||||
})
|
||||
|
||||
if err := cli.SetGroupSignerScope(); err != nil {
|
||||
c.log.Info(logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err))
|
||||
}
|
||||
|
||||
c.cfgMorph.client = cli
|
||||
c.cfgMorph.notaryEnabled = cli.ProbeNotary()
|
||||
initMorphClient(ctx, c)
|
||||
|
||||
lookupScriptHashesInNNS(c) // smart contract auto negotiation
|
||||
|
||||
if c.cfgMorph.notaryEnabled {
|
||||
err = c.cfgMorph.client.EnableNotarySupport(
|
||||
err := c.cfgMorph.client.EnableNotarySupport(
|
||||
client.WithProxyContract(
|
||||
c.cfgMorph.proxyScriptHash,
|
||||
),
|
||||
|
@ -111,6 +72,50 @@ func initMorphComponents(ctx context.Context, c *cfg) {
|
|||
c.cfgNetmap.wrapper = wrap
|
||||
}
|
||||
|
||||
func initMorphClient(ctx context.Context, c *cfg) {
|
||||
addresses := morphconfig.RPCEndpoint(c.appCfg)
|
||||
|
||||
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
|
||||
// order of endpoints with the same priority.
|
||||
rand.Shuffle(len(addresses), func(i, j int) {
|
||||
addresses[i], addresses[j] = addresses[j], addresses[i]
|
||||
})
|
||||
|
||||
cli, err := client.New(ctx,
|
||||
c.key,
|
||||
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
|
||||
client.WithLogger(c.log),
|
||||
client.WithMetrics(c.metricsCollector.MorphClientMetrics()),
|
||||
client.WithEndpoints(addresses...),
|
||||
client.WithConnLostCallback(func() {
|
||||
c.internalErr <- errors.New("morph connection has been lost")
|
||||
}),
|
||||
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
|
||||
client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()),
|
||||
client.WithDialerSource(c.dialerSource),
|
||||
)
|
||||
if err != nil {
|
||||
c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient,
|
||||
zap.Any("endpoints", addresses),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
fatalOnErr(err)
|
||||
}
|
||||
|
||||
c.onShutdown(func() {
|
||||
c.log.Info(logs.FrostFSNodeClosingMorphComponents)
|
||||
cli.Close()
|
||||
})
|
||||
|
||||
if err := cli.SetGroupSignerScope(); err != nil {
|
||||
c.log.Info(logs.FrostFSNodeFailedToSetGroupSignerScopeContinueWithGlobal, zap.Error(err))
|
||||
}
|
||||
|
||||
c.cfgMorph.client = cli
|
||||
c.cfgMorph.notaryEnabled = cli.ProbeNotary()
|
||||
}
|
||||
|
||||
func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
|
||||
// skip notary deposit in non-notary environments
|
||||
if !c.cfgMorph.notaryEnabled {
|
||||
|
|
|
@ -67,6 +67,7 @@ func initTreeService(c *cfg) {
|
|||
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
|
||||
tree.WithAPEMorphRuleStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage()),
|
||||
tree.WithNetmapState(c.cfgNetmap.state),
|
||||
tree.WithDialerSource(c.dialerSource),
|
||||
)
|
||||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
|
|
|
@ -80,3 +80,12 @@ FROSTFS_IR_PPROF_MUTEX_RATE=10000
|
|||
FROSTFS_IR_PROMETHEUS_ENABLED=true
|
||||
FROSTFS_IR_PROMETHEUS_ADDRESS=localhost:9090
|
||||
FROSTFS_IR_PROMETHEUS_SHUTDOWN_TIMEOUT=30s
|
||||
|
||||
FROSTFS_MULTINET_ENABLED=true
|
||||
FROSTFS_MULTINET_SUBNETS_0_MASK="192.168.219.174/24"
|
||||
FROSTFS_MULTINET_SUBNETS_0_SOURCE_IPS="192.168.218.185 192.168.219.185"
|
||||
FROSTFS_MULTINET_SUBNETS_1_MASK="10.78.70.74/24"
|
||||
FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185"
|
||||
FROSTFS_MULTINET_BALANCER=roundrobin
|
||||
FROSTFS_MULTINET_RESTRICT=false
|
||||
FROSTFS_MULTINET_FALLBACK_DELAY=350ms
|
||||
|
|
|
@ -123,3 +123,18 @@ prometheus:
|
|||
|
||||
systemdnotify:
|
||||
enabled: true
|
||||
|
||||
multinet:
|
||||
enabled: true
|
||||
subnets:
|
||||
- mask: 192.168.219.174/24
|
||||
source_ips:
|
||||
- 192.168.218.185
|
||||
- 192.168.219.185
|
||||
- mask: 10.78.70.74/24
|
||||
source_ips:
|
||||
- 10.78.70.185
|
||||
- 10.78.71.185
|
||||
balancer: roundrobin
|
||||
restrict: false
|
||||
fallback_delay: 350ms
|
||||
|
|
|
@ -206,3 +206,13 @@ FROSTFS_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
|||
|
||||
# AUDIT section
|
||||
FROSTFS_AUDIT_ENABLED=true
|
||||
|
||||
# MULTINET section
|
||||
FROSTFS_MULTINET_ENABLED=true
|
||||
FROSTFS_MULTINET_SUBNETS_0_MASK="192.168.219.174/24"
|
||||
FROSTFS_MULTINET_SUBNETS_0_SOURCE_IPS="192.168.218.185 192.168.219.185"
|
||||
FROSTFS_MULTINET_SUBNETS_1_MASK="10.78.70.74/24"
|
||||
FROSTFS_MULTINET_SUBNETS_1_SOURCE_IPS="10.78.70.185 10.78.71.185"
|
||||
FROSTFS_MULTINET_BALANCER=roundrobin
|
||||
FROSTFS_MULTINET_RESTRICT=false
|
||||
FROSTFS_MULTINET_FALLBACK_DELAY=350ms
|
||||
|
|
|
@ -264,5 +264,27 @@
|
|||
},
|
||||
"audit": {
|
||||
"enabled": true
|
||||
},
|
||||
"multinet": {
|
||||
"enabled": true,
|
||||
"subnets": [
|
||||
{
|
||||
"mask": "192.168.219.174/24",
|
||||
"source_ips": [
|
||||
"192.168.218.185",
|
||||
"192.168.219.185"
|
||||
]
|
||||
},
|
||||
{
|
||||
"mask": "10.78.70.74/24",
|
||||
"source_ips":[
|
||||
"10.78.70.185",
|
||||
"10.78.71.185"
|
||||
]
|
||||
}
|
||||
],
|
||||
"balancer": "roundrobin",
|
||||
"restrict": false,
|
||||
"fallback_delay": "350ms"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -240,3 +240,18 @@ runtime:
|
|||
|
||||
audit:
|
||||
enabled: true
|
||||
|
||||
multinet:
|
||||
enabled: true
|
||||
subnets:
|
||||
- mask: 192.168.219.174/24
|
||||
source_ips:
|
||||
- 192.168.218.185
|
||||
- 192.168.219.185
|
||||
- mask: 10.78.70.74/24
|
||||
source_ips:
|
||||
- 10.78.70.185
|
||||
- 10.78.71.185
|
||||
balancer: roundrobin
|
||||
restrict: false
|
||||
fallback_delay: 350ms
|
||||
|
|
|
@ -26,7 +26,7 @@ There are some custom types used for brevity:
|
|||
| `storage` | [Storage engine configuration](#storage-section) |
|
||||
| `runtime` | [Runtime configuration](#runtime-section) |
|
||||
| `audit` | [Audit configuration](#audit-section) |
|
||||
|
||||
| `multinet` | [Multinet configuration](#multinet-section) |
|
||||
|
||||
# `control` section
|
||||
```yaml
|
||||
|
@ -436,5 +436,34 @@ audit:
|
|||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|---------------------|--------|---------------|---------------------------------------------------|
|
||||
| `soft_memory_limit` | `bool` | false | If `true` then audit event logs will be recorded. |
|
||||
|-----------|--------|---------------|---------------------------------------------------|
|
||||
| `enabled` | `bool` | false | If `true` then audit event logs will be recorded. |
|
||||
|
||||
|
||||
# `multinet` section
|
||||
Contains multinet parameters.
|
||||
|
||||
```yaml
|
||||
multinet:
|
||||
enabled: true
|
||||
subnets:
|
||||
- mask: 192.168.219.174/24
|
||||
source_ips:
|
||||
- 192.168.218.185
|
||||
- 192.168.219.185
|
||||
- mask: 10.78.70.74/24
|
||||
source_ips:
|
||||
- 10.78.70.185
|
||||
- 10.78.71.185
|
||||
balancer: roundrobin
|
||||
restrict: false
|
||||
fallback_delay: 350ms
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
| ---------------- | ---------- | ------------- | -------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `enabled` | `bool` | false | If `true` then source-based routing is enabled. |
|
||||
| `subnets` | `subnet` | empty | Resulting subnets. |
|
||||
| `balancer` | `string` | "" | Balancer to select network interfaces, allowed values are "" (no balancing, use first suitable interface) or "roundrobin". |
|
||||
| `restrict` | `bool` | false | If `true` then any requests that do not match `subnets` will fail. |
|
||||
| `fallback_delay` | `duration` | 350ms | Delay before fallback to secondary IP addresses in case of hostname resolve. |
|
||||
|
|
3
go.mod
|
@ -11,6 +11,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
|
@ -132,4 +133,4 @@ require (
|
|||
rsc.io/tmplfunc v0.0.3 // indirect
|
||||
)
|
||||
|
||||
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928
|
||||
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07
|
||||
|
|
6
go.sum
|
@ -14,8 +14,10 @@ git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509
|
|||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509/go.mod h1:jmb7yxzZota9jWbC10p+7YR+6wwJPBj7J/Fl5VDkXys=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928 h1:LK3mCkNZkY48eBA9jnk1N0eQZLsZhOG+XYw4EBoKUjM=
|
||||
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg=
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI=
|
||||
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07 h1:gPaqGsk6gSWQyNVjaStydfUz6Z/loHc9XyvGrJ5qSPY=
|
||||
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 h1:vgbfkcnIexZUm3vREBBSa/Gv1Whjd1SFCUd0A+IaGPQ=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA=
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
||||
|
|
|
@ -523,4 +523,5 @@ const (
|
|||
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty"
|
||||
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
||||
WritecacheCantGetObject = "can't get an object from fstree"
|
||||
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
||||
)
|
||||
|
|
|
@ -22,6 +22,7 @@ const (
|
|||
grpcServerSubsystem = "grpc_server"
|
||||
policerSubsystem = "policer"
|
||||
commonCacheSubsystem = "common_cache"
|
||||
multinetSubsystem = "multinet"
|
||||
|
||||
successLabel = "success"
|
||||
shardIDLabel = "shard_id"
|
||||
|
@ -41,6 +42,7 @@ const (
|
|||
endpointLabel = "endpoint"
|
||||
hitLabel = "hit"
|
||||
cacheLabel = "cache"
|
||||
sourceIPLabel = "source_ip"
|
||||
|
||||
readWriteMode = "READ_WRITE"
|
||||
readOnlyMode = "READ_ONLY"
|
||||
|
|
|
@ -17,6 +17,7 @@ type InnerRingServiceMetrics struct {
|
|||
eventDuration *prometheus.HistogramVec
|
||||
morphCacheMetrics *morphCacheMetrics
|
||||
logMetrics logger.LogMetrics
|
||||
multinet *multinetMetrics
|
||||
// nolint: unused
|
||||
appInfo *ApplicationInfo
|
||||
}
|
||||
|
@ -51,6 +52,7 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
|
|||
morphCacheMetrics: newMorphCacheMetrics(innerRingNamespace),
|
||||
appInfo: NewApplicationInfo(misc.Version),
|
||||
logMetrics: logger.NewLogMetrics(innerRingNamespace),
|
||||
multinet: newMultinetMetrics(innerRingNamespace),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,3 +80,7 @@ func (m *InnerRingServiceMetrics) MorphCacheMetrics() MorphCacheMetrics {
|
|||
func (m *InnerRingServiceMetrics) LogMetrics() logger.LogMetrics {
|
||||
return m.logMetrics
|
||||
}
|
||||
|
||||
func (m *InnerRingServiceMetrics) Multinet() MultinetMetrics {
|
||||
return m.multinet
|
||||
}
|
||||
|
|
35
internal/metrics/multinet.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type multinetMetrics struct {
|
||||
dials *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
type MultinetMetrics interface {
|
||||
Dial(sourceIP string, success bool)
|
||||
}
|
||||
|
||||
func newMultinetMetrics(ns string) *multinetMetrics {
|
||||
return &multinetMetrics{
|
||||
dials: metrics.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Subsystem: multinetSubsystem,
|
||||
Name: "dial_count",
|
||||
Help: "Dials count performed by multinet",
|
||||
}, []string{sourceIPLabel, successLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *multinetMetrics) Dial(sourceIP string, success bool) {
|
||||
m.dials.With(prometheus.Labels{
|
||||
sourceIPLabel: sourceIP,
|
||||
successLabel: strconv.FormatBool(success),
|
||||
}).Inc()
|
||||
}
|
|
@ -25,6 +25,7 @@ type NodeMetrics struct {
|
|||
morphClient *morphClientMetrics
|
||||
morphCache *morphCacheMetrics
|
||||
log logger.LogMetrics
|
||||
multinet *multinetMetrics
|
||||
// nolint: unused
|
||||
appInfo *ApplicationInfo
|
||||
}
|
||||
|
@ -53,6 +54,7 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
morphCache: newMorphCacheMetrics(namespace),
|
||||
log: logger.NewLogMetrics(namespace),
|
||||
appInfo: NewApplicationInfo(misc.Version),
|
||||
multinet: newMultinetMetrics(namespace),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -120,3 +122,7 @@ func (m *NodeMetrics) MorphCacheMetrics() MorphCacheMetrics {
|
|||
func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
|
||||
return m.log
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) MultinetMetrics() MultinetMetrics {
|
||||
return m.multinet
|
||||
}
|
||||
|
|
69
internal/net/config.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/multinet"
|
||||
)
|
||||
|
||||
var errEmptySourceIPList = errors.New("empty source IP list")
|
||||
|
||||
type Subnet struct {
|
||||
Prefix string
|
||||
SourceIPs []string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Enabled bool
|
||||
Subnets []Subnet
|
||||
Balancer string
|
||||
Restrict bool
|
||||
FallbackDelay time.Duration
|
||||
Metrics metrics.MultinetMetrics
|
||||
}
|
||||
|
||||
func (c Config) toMultinetConfig() (multinet.Config, error) {
|
||||
var subnets []multinet.Subnet
|
||||
for _, s := range c.Subnets {
|
||||
var ms multinet.Subnet
|
||||
p, err := netip.ParsePrefix(s.Prefix)
|
||||
if err != nil {
|
||||
return multinet.Config{}, fmt.Errorf("parse IP prefix '%s': %w", s.Prefix, err)
|
||||
}
|
||||
ms.Prefix = p
|
||||
for _, ip := range s.SourceIPs {
|
||||
addr, err := netip.ParseAddr(ip)
|
||||
if err != nil {
|
||||
return multinet.Config{}, fmt.Errorf("parse IP address '%s': %w", ip, err)
|
||||
}
|
||||
ms.SourceIPs = append(ms.SourceIPs, addr)
|
||||
}
|
||||
if len(ms.SourceIPs) == 0 {
|
||||
return multinet.Config{}, errEmptySourceIPList
|
||||
}
|
||||
subnets = append(subnets, ms)
|
||||
}
|
||||
return multinet.Config{
|
||||
Subnets: subnets,
|
||||
Balancer: multinet.BalancerType(c.Balancer),
|
||||
Restrict: c.Restrict,
|
||||
FallbackDelay: c.FallbackDelay,
|
||||
Dialer: newDefaulDialer(),
|
||||
EventHandler: newEventHandler(c.Metrics),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c Config) equals(other Config) bool {
|
||||
return c.Enabled == other.Enabled &&
|
||||
slices.EqualFunc(c.Subnets, other.Subnets, func(lhs, rhs Subnet) bool {
|
||||
return lhs.Prefix == rhs.Prefix && slices.Equal(lhs.SourceIPs, rhs.SourceIPs)
|
||||
}) &&
|
||||
c.Balancer == other.Balancer &&
|
||||
c.Restrict == other.Restrict &&
|
||||
c.FallbackDelay == other.FallbackDelay
|
||||
}
|
54
internal/net/dial_target.go
Normal file
|
@ -0,0 +1,54 @@
|
|||
// NOTE: code is taken from https://github.com/grpc/grpc-go/blob/v1.68.x/internal/transport/http_util.go
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2014 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package net
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// parseDialTarget returns the network and address to pass to dialer.
|
||||
func parseDialTarget(target string) (string, string) {
|
||||
net := "tcp"
|
||||
m1 := strings.Index(target, ":")
|
||||
m2 := strings.Index(target, ":/")
|
||||
// handle unix:addr which will fail with url.Parse
|
||||
if m1 >= 0 && m2 < 0 {
|
||||
if n := target[0:m1]; n == "unix" {
|
||||
return n, target[m1+1:]
|
||||
}
|
||||
}
|
||||
if m2 >= 0 {
|
||||
t, err := url.Parse(target)
|
||||
if err != nil {
|
||||
return net, target
|
||||
}
|
||||
scheme := t.Scheme
|
||||
addr := t.Path
|
||||
if scheme == "unix" {
|
||||
if addr == "" {
|
||||
addr = t.Host
|
||||
}
|
||||
return scheme, addr
|
||||
}
|
||||
}
|
||||
return net, target
|
||||
}
|
39
internal/net/dialer.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
type Dialer interface {
|
||||
DialContext(ctx context.Context, network, address string) (net.Conn, error)
|
||||
}
|
||||
|
||||
func DialContextTCP(ctx context.Context, address string, d Dialer) (net.Conn, error) {
|
||||
return d.DialContext(ctx, "tcp", address)
|
||||
}
|
||||
|
||||
func newDefaulDialer() net.Dialer {
|
||||
// From `grpc.WithContextDialer` comment:
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Could you provide a link to github? Could you provide a link to github?
dstepanov-yadro
commented
done done
|
||||
//
|
||||
// Note: All supported releases of Go (as of December 2023) override the OS
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why do we care about it in this PR? Why do we care about it in this PR?
dstepanov-yadro
commented
`multinet` is used primarily for gprs connections, so I think it's important to preserve the behavior of the standard grpc library.
dstepanov-yadro
commented
Also I think it's correct to use OS settings for keep alive. Also I think it's correct to use OS settings for keep alive.
|
||||
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
|
||||
// with OS defaults for keepalive time and interval, use a net.Dialer that sets
|
||||
// the KeepAlive field to a negative value, and sets the SO_KEEPALIVE socket
|
||||
// option to true from the Control field. For a concrete example of how to do
|
||||
// this, see internal.NetDialerWithTCPKeepalive().
|
||||
//
|
||||
// https://github.com/grpc/grpc-go/blob/830135e6c5a351abf75f0c9cfdf978e5df8daeba/dialoptions.go#L432
|
||||
return net.Dialer{
|
||||
KeepAlive: time.Duration(-1),
|
||||
Control: func(_, _ string, c syscall.RawConn) error {
|
||||
return c.Control(func(fd uintptr) {
|
||||
_ = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
83
internal/net/dialer_source.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/multinet"
|
||||
)
|
||||
|
||||
type DialerSource struct {
|
||||
guard sync.RWMutex
|
||||
|
||||
c Config
|
||||
|
||||
md multinet.Dialer
|
||||
}
|
||||
|
||||
func NewDialerSource(c Config) (*DialerSource, error) {
|
||||
result := &DialerSource{}
|
||||
if err := result.build(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *DialerSource) build(c Config) error {
|
||||
if c.Enabled {
|
||||
mc, err := c.toMultinetConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
md, err := multinet.NewDialer(mc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.md = md
|
||||
s.c = c
|
||||
return nil
|
||||
}
|
||||
s.md = nil
|
||||
s.c = c
|
||||
return nil
|
||||
}
|
||||
|
||||
// GrpcContextDialer returns grpc.WithContextDialer func.
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Frankly, this is a rather generic package (it is even called Frankly, this is a rather generic package (it is even called `net`)
Why don't we return `Dialer` only?
It seems _every_ client of this function has to check the second return value and use _the same_ default otherwise.
dstepanov-yadro
commented
fixed fixed
|
||||
// Returns nil if multinet disabled.
|
||||
func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Conn, error) {
|
||||
s.guard.RLock()
|
||||
defer s.guard.RUnlock()
|
||||
|
||||
if s.c.Enabled {
|
||||
return func(ctx context.Context, address string) (net.Conn, error) {
|
||||
network, address := parseDialTarget(address)
|
||||
return s.md.DialContext(ctx, network, address)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NetContextDialer returns net.DialContext dial function.
|
||||
// Returns nil if multinet disabled.
|
||||
func (s *DialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
|
||||
s.guard.RLock()
|
||||
defer s.guard.RUnlock()
|
||||
|
||||
if s.c.Enabled {
|
||||
return func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
return s.md.DialContext(ctx, network, address)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DialerSource) Update(c Config) error {
|
||||
s.guard.Lock()
|
||||
defer s.guard.Unlock()
|
||||
|
||||
if s.c.equals(c) {
|
||||
return nil
|
||||
}
|
||||
return s.build(c)
|
||||
}
|
29
internal/net/event_handler.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package net
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/multinet"
|
||||
)
|
||||
|
||||
var _ multinet.EventHandler = (*metricsEventHandler)(nil)
|
||||
|
||||
type metricsEventHandler struct {
|
||||
m metrics.MultinetMetrics
|
||||
}
|
||||
|
||||
func (m *metricsEventHandler) DialPerformed(sourceIP net.Addr, _ string, _ string, err error) {
|
||||
sourceIPString := "undefined"
|
||||
if sourceIP != nil {
|
||||
sourceIPString = sourceIP.Network() + "://" + sourceIP.String()
|
||||
}
|
||||
m.m.Dial(sourceIPString, err == nil)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
We have We have `success bool` in many other places, why do we go with `status` here?
dstepanov-yadro
commented
In the distant bright future, it may be necessary to have separated error types, for example, for failover scenarios. In the distant bright future, it may be necessary to have separated error types, for example, for failover scenarios.
fyrchik
commented
Do you mean for this error or for everything? Do you mean for this error or for everything?
I would rather be consistent across the codebase.
Also, what different kinds of errors do you have in mind?
dstepanov-yadro
commented
For example, it may be necessary to distinguish between errors of the server to which the connection is being made and errors of the network interface through which the connection is being made. For example, it may be necessary to distinguish between errors of the server to which the connection is being made and errors of the network interface through which the connection is being made.
fyrchik
commented
I doubt we will do sth like this in the near future (remember: it is I doubt we will do sth like this in the near future (remember: it is `Dial`, not some protocol method, so there is no status)
And if we will or will need to, we can always add a label.
dstepanov-yadro
commented
fixed fixed
|
||||
}
|
||||
|
||||
func newEventHandler(m metrics.MultinetMetrics) multinet.EventHandler {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
return &metricsEventHandler{m: m}
|
||||
}
|
|
@ -463,6 +463,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
|
|||
name: morphPrefix,
|
||||
from: fromSideChainBlock,
|
||||
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
|
||||
multinetMetrics: s.irMetrics.Multinet(),
|
||||
}
|
||||
|
||||
// create morph client
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
||||
|
@ -116,6 +117,7 @@ type (
|
|||
sgn *transaction.Signer
|
||||
from uint32 // block height
|
||||
morphCacheMetric metrics.MorphCacheMetrics
|
||||
multinetMetrics metrics.MultinetMetrics
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -486,6 +488,12 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
|
|||
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
|
||||
}
|
||||
|
||||
nc := parseMultinetConfig(p.cfg, p.multinetMetrics)
|
||||
ds, err := internalNet.NewDialerSource(nc)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dialer source: %w", err)
|
||||
}
|
||||
|
||||
return client.New(
|
||||
ctx,
|
||||
p.key,
|
||||
|
@ -498,6 +506,7 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
|
|||
}),
|
||||
client.WithSwitchInterval(p.cfg.GetDuration(p.name+".switch_interval")),
|
||||
client.WithMorphCacheMetrics(p.morphCacheMetric),
|
||||
client.WithDialerSource(ds),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -542,6 +551,28 @@ func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) {
|
|||
return extraWallets, nil
|
||||
}
|
||||
|
||||
func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNet.Config {
|
||||
nc := internalNet.Config{
|
||||
Enabled: cfg.GetBool("multinet.enabled"),
|
||||
Balancer: cfg.GetString("multinet.balancer"),
|
||||
Restrict: cfg.GetBool("multinet.restrict"),
|
||||
FallbackDelay: cfg.GetDuration("multinet.fallback_delay"),
|
||||
Metrics: m,
|
||||
}
|
||||
for i := 0; ; i++ {
|
||||
mask := cfg.GetString(fmt.Sprintf("multinet.subnets.%d.mask", i))
|
||||
if mask == "" {
|
||||
break
|
||||
}
|
||||
sourceIPs := cfg.GetStringSlice(fmt.Sprintf("multinet.subnets.%d.source_ips", i))
|
||||
nc.Subnets = append(nc.Subnets, internalNet.Subnet{
|
||||
Prefix: mask,
|
||||
SourceIPs: sourceIPs,
|
||||
})
|
||||
}
|
||||
return nc
|
||||
}
|
||||
|
||||
func (s *Server) initConfigFromBlockchain() error {
|
||||
// get current epoch
|
||||
epoch, err := s.netmapClient.Epoch()
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
|
@ -41,13 +42,13 @@ type cfg struct {
|
|||
|
||||
endpoints []Endpoint
|
||||
|
||||
singleCli *rpcclient.WSClient // neo-go client for single client mode
|
||||
|
||||
inactiveModeCb Callback
|
||||
|
||||
switchInterval time.Duration
|
||||
|
||||
morphCacheMetrics metrics.MorphCacheMetrics
|
||||
|
||||
dialerSource *internalNet.DialerSource
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -124,21 +125,6 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
|
|||
|
||||
var err error
|
||||
var act *actor.Actor
|
||||
if cfg.singleCli != nil {
|
||||
// return client in single RPC node mode that uses
|
||||
// predefined WS client
|
||||
//
|
||||
// in case of the closing web socket connection:
|
||||
// if extra endpoints were provided via options,
|
||||
// they will be used in switch process, otherwise
|
||||
// inactive mode will be enabled
|
||||
cli.client = cfg.singleCli
|
||||
|
||||
act, err = newActor(cfg.singleCli, acc, *cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create RPC actor: %w", err)
|
||||
}
|
||||
} else {
|
||||
var endpoint Endpoint
|
||||
for cli.endpoints.curr, endpoint = range cli.endpoints.list {
|
||||
cli.client, act, err = cli.newCli(ctx, endpoint)
|
||||
|
@ -158,7 +144,6 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
|
|||
if cli.client == nil {
|
||||
return nil, ErrNoHealthyEndpoint
|
||||
}
|
||||
}
|
||||
cli.setActor(act)
|
||||
|
||||
go cli.closeWaiter(ctx)
|
||||
|
@ -175,6 +160,7 @@ func (c *Client) newCli(ctx context.Context, endpoint Endpoint) (*rpcclient.WSCl
|
|||
Options: rpcclient.Options{
|
||||
DialTimeout: c.cfg.dialTimeout,
|
||||
TLSClientConfig: cfg,
|
||||
NetDialContext: c.cfg.dialerSource.NetContextDialer(),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -285,17 +271,6 @@ func WithEndpoints(endpoints ...Endpoint) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithSingleClient returns a client constructor option
|
||||
// that specifies single neo-go client and forces Client
|
||||
// to use it for requests.
|
||||
//
|
||||
// Passed client must already be initialized.
|
||||
func WithSingleClient(cli *rpcclient.WSClient) Option {
|
||||
return func(c *cfg) {
|
||||
c.singleCli = cli
|
||||
}
|
||||
}
|
||||
|
||||
// WithConnLostCallback return a client constructor option
|
||||
// that specifies a callback that is called when Client
|
||||
// unsuccessfully tried to connect to all the specified
|
||||
|
@ -320,3 +295,9 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
|
|||
c.morphCacheMetrics = morphCacheMetrics
|
||||
}
|
||||
}
|
||||
|
||||
func WithDialerSource(ds *internalNet.DialerSource) Option {
|
||||
return func(c *cfg) {
|
||||
c.dialerSource = ds
|
||||
}
|
||||
}
|
||||
|
|
2
pkg/network/cache/client.go
vendored
|
@ -5,6 +5,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
)
|
||||
|
@ -25,6 +26,7 @@ type (
|
|||
Key *ecdsa.PrivateKey
|
||||
ResponseCallback func(client.ResponseMetaInfo) error
|
||||
AllowExternal bool
|
||||
DialerSource *net.DialerSource
|
||||
}
|
||||
)
|
||||
|
||||
|
|
11
pkg/network/cache/multi.go
vendored
|
@ -60,9 +60,7 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
|||
prmInit.Key = *x.opts.Key
|
||||
}
|
||||
|
||||
prmDial := client.PrmDial{
|
||||
Endpoint: addr.URIAddr(),
|
||||
GRPCDialOptions: []grpc.DialOption{
|
||||
grpcOpts := []grpc.DialOption{
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing.NewUnaryClientInteceptor(),
|
||||
|
@ -71,7 +69,12 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
|||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
},
|
||||
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
||||
}
|
||||
|
||||
prmDial := client.PrmDial{
|
||||
Endpoint: addr.URIAddr(),
|
||||
GRPCDialOptions: grpcOpts,
|
||||
}
|
||||
if x.opts.DialTimeout > 0 {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Unix domain sockets can be used: Unix domain sockets can be used:
https://github.com/grpc/grpc-go/blob/830135e6c5a351abf75f0c9cfdf978e5df8daeba/internal/transport/http2_client.go#L159
We should parse the address and return an explicit error if we do not support this.
dstepanov-yadro
commented
Fixed. unix sockets may be supported if Fixed. unix sockets may be supported if `restrict = false`.
|
||||
prmDial.DialTimeout = x.opts.DialTimeout
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
|
@ -21,6 +22,7 @@ type clientCache struct {
|
|||
sync.Mutex
|
||||
simplelru.LRU[string, cacheItem]
|
||||
key *ecdsa.PrivateKey
|
||||
ds *internalNet.DialerSource
|
||||
}
|
||||
|
||||
type cacheItem struct {
|
||||
|
@ -36,7 +38,7 @@ const (
|
|||
|
||||
var errRecentlyFailed = errors.New("client has recently failed")
|
||||
|
||||
func (c *clientCache) init(pk *ecdsa.PrivateKey) {
|
||||
func (c *clientCache) init(pk *ecdsa.PrivateKey, ds *internalNet.DialerSource) {
|
||||
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) {
|
||||
if conn := value.cc; conn != nil {
|
||||
_ = conn.Close()
|
||||
|
@ -44,6 +46,7 @@ func (c *clientCache) init(pk *ecdsa.PrivateKey) {
|
|||
})
|
||||
c.LRU = *l
|
||||
c.key = pk
|
||||
c.ds = ds
|
||||
}
|
||||
|
||||
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
|
||||
|
@ -99,6 +102,7 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
|||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
||||
}
|
||||
|
||||
if !netAddr.IsTLSEnabled() {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"crypto/ecdsa"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
|
@ -45,6 +46,7 @@ type cfg struct {
|
|||
morphChainStorage policyengine.MorphRuleChainStorageReader
|
||||
|
||||
metrics MetricsRegister
|
||||
ds *net.DialerSource
|
||||
}
|
||||
|
||||
// Option represents configuration option for a tree service.
|
||||
|
@ -161,3 +163,9 @@ func WithNetmapState(state netmap.State) Option {
|
|||
c.state = state
|
||||
}
|
||||
}
|
||||
|
||||
func WithDialerSource(ds *net.DialerSource) Option {
|
||||
return func(c *cfg) {
|
||||
c.ds = ds
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ func New(opts ...Option) *Service {
|
|||
s.log = &logger.Logger{Logger: zap.NewNop()}
|
||||
}
|
||||
|
||||
s.cache.init(s.key)
|
||||
s.cache.init(s.key, s.ds)
|
||||
s.closeCh = make(chan struct{})
|
||||
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
|
||||
s.replicateLocalCh = make(chan applyOp)
|
||||
|
|
Shouldn't it be an empty list?
It works so. To specify empty list it is required to add
Subnet
type to IR config, but this is only required to set defaults.I've meant
"[]"
, but if it already works, ok.