[#521] Use source dialer for gRPC connection to storage

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
Alexey Vanin 2024-10-21 15:29:27 +03:00
parent b7a5f91da5
commit 7bde06c679
3 changed files with 59 additions and 5 deletions

View file

@ -36,6 +36,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/policy/contract"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
internalnet "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/wallet"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
@ -95,6 +96,7 @@ type (
resolveZoneList []string
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
frostfsidValidation bool
dialerSource *internalnet.DialerSource
mu sync.RWMutex
namespaces Namespaces
@ -129,7 +131,9 @@ type (
)
func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
objPool, treePool, key := getPools(ctx, log.logger, v)
settings := newAppSettings(log, v)
objPool, treePool, key := getPools(ctx, log.logger, v, settings.dialerSource)
cfg := tokens.Config{
FrostFS: frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(objPool, key), log.logger),
@ -152,7 +156,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
webDone: make(chan struct{}, 1),
wrkDone: make(chan struct{}, 1),
settings: newAppSettings(log, v),
settings: settings,
}
app.init(ctx)
@ -224,6 +228,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
reconnectInterval: fetchReconnectInterval(v),
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
dialerSource: getDialerSource(log.logger, v),
}
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
@ -626,7 +631,15 @@ func newMaxClients(cfg *viper.Viper) maxClientsConfig {
return config
}
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource {
source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg))
if err != nil {
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err))
}
return source
}
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSource *internalnet.DialerSource) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
var prm pool.InitParameters
var prmTree treepool.InitParameters
@ -671,8 +684,8 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgTreePoolMaxAttempts))
var apiGRPCDialOpts []grpc.DialOption
var treeGRPCDialOpts []grpc.DialOption
var apiGRPCDialOpts = []grpc.DialOption{grpc.WithContextDialer(dialSource.GrpcContextDialer())}
var treeGRPCDialOpts = []grpc.DialOption{grpc.WithContextDialer(dialSource.GrpcContextDialer())}
if cfg.GetBool(cfgTracingEnabled) {
interceptors := []grpc.DialOption{
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
@ -858,6 +871,10 @@ func (a *App) updateSettings() {
a.settings.logLevel.SetLevel(lvl)
}
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg)); err != nil {
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
}
a.settings.update(a.cfg, a.log)
}

View file

@ -16,6 +16,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
internalnet "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
@ -62,6 +63,8 @@ const (
defaultVHSHeader = "X-Frostfs-S3-VHS"
defaultServernameHeader = "X-Frostfs-Servername"
defaultMultinetFallbackDelay = 300 * time.Millisecond
defaultConstraintName = "default"
defaultNamespace = ""
@ -209,6 +212,13 @@ const ( // Settings.
cfgContainersCORS = "containers.cors"
cfgContainersLifecycle = "containers.lifecycle"
// Multinet.
cfgMultinetEnabled = "multinet.enabled"
cfgMultinetBalancer = "multinet.balancer"
cfgMultinetRestrict = "multinet.restrict"
cfgMultinetFallbackDelay = "multinet.fallback_delay"
cfgMultinetSubnets = "multinet.subnets"
// Command line args.
cmdHelp = "help"
cmdVersion = "version"
@ -745,6 +755,28 @@ func fetchVHSNamespaces(v *viper.Viper, log *zap.Logger) map[string]bool {
return vhsNamespacesEnabled
}
func fetchMultinetConfig(v *viper.Viper) (cfg internalnet.Config) {
cfg.Enabled = v.GetBool(cfgMultinetEnabled)
cfg.Balancer = v.GetString(cfgMultinetBalancer)
cfg.Restrict = v.GetBool(cfgMultinetRestrict)
cfg.FallbackDelay = v.GetDuration(cfgMultinetFallbackDelay)
cfg.Subnets = make([]internalnet.Subnet, 0, 5)
for i := 0; ; i++ {
key := cfgMultinetSubnets + "." + strconv.Itoa(i) + "."
subnet := internalnet.Subnet{}
subnet.Prefix = v.GetString(key + "mask")
if subnet.Prefix == "" {
break
}
subnet.SourceIPs = v.GetStringSlice(key + "source_ips")
cfg.Subnets = append(cfg.Subnets, subnet)
}
return
}
func newSettings() *viper.Viper {
v := viper.New()
@ -847,6 +879,9 @@ func newSettings() *viper.Viper {
v.SetDefault(cfgVHSHeader, defaultVHSHeader)
v.SetDefault(cfgServernameHeader, defaultServernameHeader)
// multinet
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
// Bind flags
if err := bindFlags(v, flags); err != nil {
panic(fmt.Errorf("bind flags: %w", err))

View file

@ -171,4 +171,6 @@ const (
FailedToRemoveOldPartNode = "failed to remove old part node"
CouldntCacheNetworkInfo = "couldn't cache network info"
NotSupported = "not supported"
FailedToLoadMultinetConfig = "failed to load multinet config"
MultinetConfigWontBeUpdated = "multinet config won't be updated"
)