[#1422] node: Add dialer source to config
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
3304afa9d1
commit
74db735265
8 changed files with 256 additions and 0 deletions
|
@ -26,12 +26,14 @@ import (
|
||||||
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||||
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
||||||
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/multinet"
|
||||||
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||||
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
||||||
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||||
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
|
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
||||||
|
@ -436,6 +438,8 @@ type shared struct {
|
||||||
metricsCollector *metrics.NodeMetrics
|
metricsCollector *metrics.NodeMetrics
|
||||||
|
|
||||||
metricsSvc *objectService.MetricCollector
|
metricsSvc *objectService.MetricCollector
|
||||||
|
|
||||||
|
dialerSource *internalNet.DialerSource
|
||||||
}
|
}
|
||||||
|
|
||||||
// dynamicConfiguration stores parameters of the
|
// dynamicConfiguration stores parameters of the
|
||||||
|
@ -760,6 +764,9 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
|
||||||
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
|
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg))
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
cacheOpts := cache.ClientCacheOpts{
|
cacheOpts := cache.ClientCacheOpts{
|
||||||
DialTimeout: apiclientconfig.DialTimeout(appCfg),
|
DialTimeout: apiclientconfig.DialTimeout(appCfg),
|
||||||
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
|
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
|
||||||
|
@ -778,9 +785,27 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
|
||||||
putClientCache: cache.NewSDKClientCache(cacheOpts),
|
putClientCache: cache.NewSDKClientCache(cacheOpts),
|
||||||
persistate: persistate,
|
persistate: persistate,
|
||||||
metricsCollector: metrics.NewNodeMetrics(),
|
metricsCollector: metrics.NewNodeMetrics(),
|
||||||
|
dialerSource: ds,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func internalNetConfig(appCfg *config.Config) internalNet.Config {
|
||||||
|
result := internalNet.Config{
|
||||||
|
Enabled: multinet.Enabled(appCfg),
|
||||||
|
Balancer: multinet.Balancer(appCfg),
|
||||||
|
Restrict: multinet.Restrict(appCfg),
|
||||||
|
FallbackDelay: multinet.FallbackDelay(appCfg),
|
||||||
|
}
|
||||||
|
sn := multinet.Subnets(appCfg)
|
||||||
|
for _, s := range sn {
|
||||||
|
result.Subnets = append(result.Subnets, internalNet.Subnet{
|
||||||
|
Prefix: s.Mask,
|
||||||
|
SourceIPs: s.SourceIPs,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) cfgNetmap {
|
func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) cfgNetmap {
|
||||||
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
|
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
@ -1336,6 +1361,11 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := c.dialerSource.Update(internalNetConfig(c.appCfg)); err != nil {
|
||||||
|
c.log.Error(logs.FailedToUpdateMultinetConfiguration, zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -11,6 +11,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241010110344-99c5c5836509
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -523,4 +523,5 @@ const (
|
||||||
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty"
|
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: not empty"
|
||||||
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
||||||
WritecacheCantGetObject = "can't get an object from fstree"
|
WritecacheCantGetObject = "can't get an object from fstree"
|
||||||
|
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
||||||
)
|
)
|
||||||
|
|
66
internal/net/config.go
Normal file
66
internal/net/config.go
Normal file
|
@ -0,0 +1,66 @@
|
||||||
|
package net
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net/netip"
|
||||||
|
"slices"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/multinet"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errEmptySourceIPList = errors.New("empty source IP list")
|
||||||
|
|
||||||
|
type Subnet struct {
|
||||||
|
Prefix string
|
||||||
|
SourceIPs []string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
Enabled bool
|
||||||
|
Subnets []Subnet
|
||||||
|
Balancer string
|
||||||
|
Restrict bool
|
||||||
|
FallbackDelay time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Config) toMultinetConfig() (multinet.Config, error) {
|
||||||
|
var subnets []multinet.Subnet
|
||||||
|
for _, s := range c.Subnets {
|
||||||
|
var ms multinet.Subnet
|
||||||
|
p, err := netip.ParsePrefix(s.Prefix)
|
||||||
|
if err != nil {
|
||||||
|
return multinet.Config{}, fmt.Errorf("parse IP prefix '%s': %w", s.Prefix, err)
|
||||||
|
}
|
||||||
|
ms.Prefix = p
|
||||||
|
for _, ip := range s.SourceIPs {
|
||||||
|
addr, err := netip.ParseAddr(ip)
|
||||||
|
if err != nil {
|
||||||
|
return multinet.Config{}, fmt.Errorf("parse IP address '%s': %w", ip, err)
|
||||||
|
}
|
||||||
|
ms.SourceIPs = append(ms.SourceIPs, addr)
|
||||||
|
}
|
||||||
|
if len(ms.SourceIPs) == 0 {
|
||||||
|
return multinet.Config{}, errEmptySourceIPList
|
||||||
|
}
|
||||||
|
subnets = append(subnets, ms)
|
||||||
|
}
|
||||||
|
return multinet.Config{
|
||||||
|
Subnets: subnets,
|
||||||
|
Balancer: multinet.BalancerType(c.Balancer),
|
||||||
|
Restrict: c.Restrict,
|
||||||
|
FallbackDelay: c.FallbackDelay,
|
||||||
|
Dialer: newDefaulDialer(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Config) equals(other Config) bool {
|
||||||
|
return c.Enabled == other.Enabled &&
|
||||||
|
slices.EqualFunc(c.Subnets, other.Subnets, func(lhs, rhs Subnet) bool {
|
||||||
|
return lhs.Prefix == rhs.Prefix && slices.Equal(lhs.SourceIPs, rhs.SourceIPs)
|
||||||
|
}) &&
|
||||||
|
c.Balancer == other.Balancer &&
|
||||||
|
c.Restrict == other.Restrict &&
|
||||||
|
c.FallbackDelay == other.FallbackDelay
|
||||||
|
}
|
54
internal/net/dial_target.go
Normal file
54
internal/net/dial_target.go
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
// NOTE: code is taken from https://github.com/grpc/grpc-go/blob/v1.68.x/internal/transport/http_util.go
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2014 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package net
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// parseDialTarget returns the network and address to pass to dialer.
|
||||||
|
func parseDialTarget(target string) (string, string) {
|
||||||
|
net := "tcp"
|
||||||
|
m1 := strings.Index(target, ":")
|
||||||
|
m2 := strings.Index(target, ":/")
|
||||||
|
// handle unix:addr which will fail with url.Parse
|
||||||
|
if m1 >= 0 && m2 < 0 {
|
||||||
|
if n := target[0:m1]; n == "unix" {
|
||||||
|
return n, target[m1+1:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if m2 >= 0 {
|
||||||
|
t, err := url.Parse(target)
|
||||||
|
if err != nil {
|
||||||
|
return net, target
|
||||||
|
}
|
||||||
|
scheme := t.Scheme
|
||||||
|
addr := t.Path
|
||||||
|
if scheme == "unix" {
|
||||||
|
if addr == "" {
|
||||||
|
addr = t.Host
|
||||||
|
}
|
||||||
|
return scheme, addr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return net, target
|
||||||
|
}
|
35
internal/net/dialer.go
Normal file
35
internal/net/dialer.go
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
package net
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Dialer interface {
|
||||||
|
DialContext(ctx context.Context, network, address string) (net.Conn, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDefaulDialer() net.Dialer {
|
||||||
|
// From `grpc.WithContextDialer` comment:
|
||||||
|
//
|
||||||
|
// Note: All supported releases of Go (as of December 2023) override the OS
|
||||||
|
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
|
||||||
|
// with OS defaults for keepalive time and interval, use a net.Dialer that sets
|
||||||
|
// the KeepAlive field to a negative value, and sets the SO_KEEPALIVE socket
|
||||||
|
// option to true from the Control field. For a concrete example of how to do
|
||||||
|
// this, see internal.NetDialerWithTCPKeepalive().
|
||||||
|
//
|
||||||
|
// https://github.com/grpc/grpc-go/blob/830135e6c5a351abf75f0c9cfdf978e5df8daeba/dialoptions.go#L432
|
||||||
|
return net.Dialer{
|
||||||
|
KeepAlive: time.Duration(-1),
|
||||||
|
Control: func(_, _ string, c syscall.RawConn) error {
|
||||||
|
return c.Control(func(fd uintptr) {
|
||||||
|
unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)
|
||||||
|
})
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
69
internal/net/dialer_source.go
Normal file
69
internal/net/dialer_source.go
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
package net
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/multinet"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DialerSource struct {
|
||||||
|
guard sync.RWMutex
|
||||||
|
|
||||||
|
c Config
|
||||||
|
|
||||||
|
md multinet.Dialer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDialerSource(c Config) (*DialerSource, error) {
|
||||||
|
result := &DialerSource{}
|
||||||
|
if err := result.build(c); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *DialerSource) build(c Config) error {
|
||||||
|
if c.Enabled {
|
||||||
|
mc, err := c.toMultinetConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
md, err := multinet.NewDialer(mc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.md = md
|
||||||
|
s.c = c
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
s.md = nil
|
||||||
|
s.c = c
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GrpcContextDialer returns grpc.WithContextDialer func.
|
||||||
|
// Returns nil if multinet disabled.
|
||||||
|
func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Conn, error) {
|
||||||
|
s.guard.RLock()
|
||||||
|
defer s.guard.RUnlock()
|
||||||
|
|
||||||
|
if s.c.Enabled {
|
||||||
|
return func(ctx context.Context, address string) (net.Conn, error) {
|
||||||
|
network, address := parseDialTarget(address)
|
||||||
|
return s.md.DialContext(ctx, network, address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *DialerSource) Update(c Config) error {
|
||||||
|
s.guard.Lock()
|
||||||
|
defer s.guard.Unlock()
|
||||||
|
|
||||||
|
if s.c.equals(c) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return s.build(c)
|
||||||
|
}
|
Loading…
Reference in a new issue