forked from TrueCloudLab/frostfs-s3-gw
[#521] Use source dialer for gRPC connection to storage
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
parent
a8458dbc27
commit
94504e9746
3 changed files with 58 additions and 3 deletions
|
@ -37,6 +37,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/policy/contract"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/policy/contract"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
|
internalnet "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/net"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/wallet"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/wallet"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
|
||||||
|
@ -97,6 +98,7 @@ type (
|
||||||
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
|
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
|
||||||
frostfsidValidation bool
|
frostfsidValidation bool
|
||||||
accessbox *cid.ID
|
accessbox *cid.ID
|
||||||
|
dialerSource *internalnet.DialerSource
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
namespaces Namespaces
|
namespaces Namespaces
|
||||||
|
@ -131,7 +133,9 @@ type (
|
||||||
)
|
)
|
||||||
|
|
||||||
func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
||||||
objPool, treePool, key := getPools(ctx, log.logger, v)
|
settings := newAppSettings(log, v)
|
||||||
|
|
||||||
|
objPool, treePool, key := getPools(ctx, log.logger, v, settings.dialerSource)
|
||||||
|
|
||||||
app := &App{
|
app := &App{
|
||||||
log: log.logger,
|
log: log.logger,
|
||||||
|
@ -143,7 +147,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
||||||
webDone: make(chan struct{}, 1),
|
webDone: make(chan struct{}, 1),
|
||||||
wrkDone: make(chan struct{}, 1),
|
wrkDone: make(chan struct{}, 1),
|
||||||
|
|
||||||
settings: newAppSettings(log, v),
|
settings: settings,
|
||||||
}
|
}
|
||||||
|
|
||||||
app.init(ctx)
|
app.init(ctx)
|
||||||
|
@ -234,6 +238,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||||
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
||||||
reconnectInterval: fetchReconnectInterval(v),
|
reconnectInterval: fetchReconnectInterval(v),
|
||||||
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
||||||
|
dialerSource: getDialerSource(log.logger, v),
|
||||||
}
|
}
|
||||||
|
|
||||||
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
||||||
|
@ -652,7 +657,15 @@ func newMaxClients(cfg *viper.Viper) maxClientsConfig {
|
||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
|
func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource {
|
||||||
|
source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg))
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err))
|
||||||
|
}
|
||||||
|
return source
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSource *internalnet.DialerSource) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
|
||||||
var prm pool.InitParameters
|
var prm pool.InitParameters
|
||||||
var prmTree treepool.InitParameters
|
var prmTree treepool.InitParameters
|
||||||
|
|
||||||
|
@ -700,6 +713,7 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.
|
||||||
interceptors := []grpc.DialOption{
|
interceptors := []grpc.DialOption{
|
||||||
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
||||||
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
|
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
|
||||||
|
grpc.WithContextDialer(dialSource.GrpcContextDialer()),
|
||||||
}
|
}
|
||||||
prm.SetGRPCDialOptions(interceptors...)
|
prm.SetGRPCDialOptions(interceptors...)
|
||||||
prmTree.SetGRPCDialOptions(interceptors...)
|
prmTree.SetGRPCDialOptions(interceptors...)
|
||||||
|
@ -878,6 +892,10 @@ func (a *App) updateSettings() {
|
||||||
a.settings.logLevel.SetLevel(lvl)
|
a.settings.logLevel.SetLevel(lvl)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg)); err != nil {
|
||||||
|
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
a.settings.update(a.cfg, a.log)
|
a.settings.update(a.cfg, a.log)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
|
internalnet "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/net"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
@ -62,6 +63,8 @@ const (
|
||||||
defaultVHSHeader = "X-Frostfs-S3-VHS"
|
defaultVHSHeader = "X-Frostfs-S3-VHS"
|
||||||
defaultServernameHeader = "X-Frostfs-Servername"
|
defaultServernameHeader = "X-Frostfs-Servername"
|
||||||
|
|
||||||
|
defaultMultinetFallbackDelay = 300 * time.Millisecond
|
||||||
|
|
||||||
defaultConstraintName = "default"
|
defaultConstraintName = "default"
|
||||||
|
|
||||||
defaultNamespace = ""
|
defaultNamespace = ""
|
||||||
|
@ -210,6 +213,13 @@ const ( // Settings.
|
||||||
cfgContainersLifecycle = "containers.lifecycle"
|
cfgContainersLifecycle = "containers.lifecycle"
|
||||||
cfgContainersAccessBox = "containers.accessbox"
|
cfgContainersAccessBox = "containers.accessbox"
|
||||||
|
|
||||||
|
// Multinet.
|
||||||
|
cfgMultinetEnabled = "multinet.enabled"
|
||||||
|
cfgMultinetBalancer = "multinet.balancer"
|
||||||
|
cfgMultinetRestrict = "multinet.restrict"
|
||||||
|
cfgMultinetFallbackDelay = "multinet.fallback_delay"
|
||||||
|
cfgMultinetSubnets = "multinet.subnets"
|
||||||
|
|
||||||
// Command line args.
|
// Command line args.
|
||||||
cmdHelp = "help"
|
cmdHelp = "help"
|
||||||
cmdVersion = "version"
|
cmdVersion = "version"
|
||||||
|
@ -746,6 +756,28 @@ func fetchVHSNamespaces(v *viper.Viper, log *zap.Logger) map[string]bool {
|
||||||
return vhsNamespacesEnabled
|
return vhsNamespacesEnabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fetchMultinetConfig(v *viper.Viper) (cfg internalnet.Config) {
|
||||||
|
cfg.Enabled = v.GetBool(cfgMultinetEnabled)
|
||||||
|
cfg.Balancer = v.GetString(cfgMultinetBalancer)
|
||||||
|
cfg.Restrict = v.GetBool(cfgMultinetRestrict)
|
||||||
|
cfg.FallbackDelay = v.GetDuration(cfgMultinetFallbackDelay)
|
||||||
|
cfg.Subnets = make([]internalnet.Subnet, 0, 5)
|
||||||
|
|
||||||
|
for i := 0; ; i++ {
|
||||||
|
key := cfgMultinetSubnets + "." + strconv.Itoa(i) + "."
|
||||||
|
subnet := internalnet.Subnet{}
|
||||||
|
|
||||||
|
subnet.Prefix = v.GetString(key + "mask")
|
||||||
|
if subnet.Prefix == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
subnet.SourceIPs = v.GetStringSlice(key + "source_ips")
|
||||||
|
cfg.Subnets = append(cfg.Subnets, subnet)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func newSettings() *viper.Viper {
|
func newSettings() *viper.Viper {
|
||||||
v := viper.New()
|
v := viper.New()
|
||||||
|
|
||||||
|
@ -848,6 +880,9 @@ func newSettings() *viper.Viper {
|
||||||
v.SetDefault(cfgVHSHeader, defaultVHSHeader)
|
v.SetDefault(cfgVHSHeader, defaultVHSHeader)
|
||||||
v.SetDefault(cfgServernameHeader, defaultServernameHeader)
|
v.SetDefault(cfgServernameHeader, defaultServernameHeader)
|
||||||
|
|
||||||
|
// multinet
|
||||||
|
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
|
||||||
|
|
||||||
// Bind flags
|
// Bind flags
|
||||||
if err := bindFlags(v, flags); err != nil {
|
if err := bindFlags(v, flags); err != nil {
|
||||||
panic(fmt.Errorf("bind flags: %w", err))
|
panic(fmt.Errorf("bind flags: %w", err))
|
||||||
|
|
|
@ -173,4 +173,6 @@ const (
|
||||||
CouldntCacheNetworkInfo = "couldn't cache network info"
|
CouldntCacheNetworkInfo = "couldn't cache network info"
|
||||||
NotSupported = "not supported"
|
NotSupported = "not supported"
|
||||||
CheckCustomAccessKeyIDUniqueness = "check custom access key id uniqueness"
|
CheckCustomAccessKeyIDUniqueness = "check custom access key id uniqueness"
|
||||||
|
FailedToLoadMultinetConfig = "failed to load multinet config"
|
||||||
|
MultinetConfigWontBeUpdated = "multinet config won't be updated"
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue