[#1422] morph: Add dialer source support
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
67798bb50e
commit
fbdfd503e4
2 changed files with 29 additions and 0 deletions
|
@ -58,6 +58,20 @@ func (s *DialerSource) GrpcContextDialer() func(context.Context, string) (net.Co
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NetContextDialer returns net.DialContext dial function.
|
||||||
|
// Returns nil if multinet disabled.
|
||||||
|
func (s *DialerSource) NetContextDialer() func(context.Context, string, string) (net.Conn, error) {
|
||||||
|
s.guard.RLock()
|
||||||
|
defer s.guard.RUnlock()
|
||||||
|
|
||||||
|
if s.c.Enabled {
|
||||||
|
return func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||||
|
return s.md.DialContext(ctx, network, address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *DialerSource) Update(c Config) error {
|
func (s *DialerSource) Update(c Config) error {
|
||||||
s.guard.Lock()
|
s.guard.Lock()
|
||||||
defer s.guard.Unlock()
|
defer s.guard.Unlock()
|
||||||
|
|
|
@ -4,10 +4,12 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
|
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
||||||
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
@ -46,6 +48,8 @@ type cfg struct {
|
||||||
switchInterval time.Duration
|
switchInterval time.Duration
|
||||||
|
|
||||||
morphCacheMetrics metrics.MorphCacheMetrics
|
morphCacheMetrics metrics.MorphCacheMetrics
|
||||||
|
|
||||||
|
dialerSource *internalNet.DialerSource
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -153,10 +157,15 @@ func (c *Client) newCli(ctx context.Context, endpoint Endpoint) (*rpcclient.WSCl
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("read mtls certificates: %w", err)
|
return nil, nil, fmt.Errorf("read mtls certificates: %w", err)
|
||||||
}
|
}
|
||||||
|
var netDialContext func(ctx context.Context, network, addr string) (net.Conn, error)
|
||||||
|
if c.cfg.dialerSource != nil { // TODO fix after IR
|
||||||
|
netDialContext = c.cfg.dialerSource.NetContextDialer()
|
||||||
|
}
|
||||||
cli, err := rpcclient.NewWS(ctx, endpoint.Address, rpcclient.WSOptions{
|
cli, err := rpcclient.NewWS(ctx, endpoint.Address, rpcclient.WSOptions{
|
||||||
Options: rpcclient.Options{
|
Options: rpcclient.Options{
|
||||||
DialTimeout: c.cfg.dialTimeout,
|
DialTimeout: c.cfg.dialTimeout,
|
||||||
TLSClientConfig: cfg,
|
TLSClientConfig: cfg,
|
||||||
|
NetDialContext: netDialContext,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -291,3 +300,9 @@ func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
|
||||||
c.morphCacheMetrics = morphCacheMetrics
|
c.morphCacheMetrics = morphCacheMetrics
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithDialerSource(ds *internalNet.DialerSource) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.dialerSource = ds
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue