[#xxx] Use source dialer for gRPC connection to storage

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
Alexey Vanin 2024-10-29 14:41:49 +03:00
parent 0a9214d0b7
commit e4ff1fb2f4
3 changed files with 58 additions and 3 deletions

View file

@ -21,6 +21,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
@ -87,6 +88,7 @@ type (
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex. // appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
appSettings struct { appSettings struct {
reconnectInterval time.Duration reconnectInterval time.Duration
dialerSource *internalnet.DialerSource
mu sync.RWMutex mu sync.RWMutex
defaultTimestamp bool defaultTimestamp bool
@ -148,6 +150,8 @@ func newApp(ctx context.Context, opt ...Option) App {
opt[i](a) opt[i](a)
} }
a.initAppSettings()
// -- setup FastHTTP server -- // -- setup FastHTTP server --
a.webServer.Name = "frost-http-gw" a.webServer.Name = "frost-http-gw"
a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize) a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize)
@ -161,7 +165,7 @@ func newApp(ctx context.Context, opt ...Option) App {
a.webServer.DisablePreParseMultipartForm = true a.webServer.DisablePreParseMultipartForm = true
a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody) a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
// -- -- -- -- -- -- -- -- -- -- -- -- -- -- // -- -- -- -- -- -- -- -- -- -- -- -- -- --
a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg) a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg, a.settings.dialerSource)
var owner user.ID var owner user.ID
user.IDFromKey(&owner, a.key.PrivateKey.PublicKey) user.IDFromKey(&owner, a.key.PrivateKey.PublicKey)
@ -169,7 +173,6 @@ func newApp(ctx context.Context, opt ...Option) App {
a.setRuntimeParameters() a.setRuntimeParameters()
a.initAppSettings()
a.initResolver() a.initResolver()
a.initMetrics() a.initMetrics()
a.initTracing(ctx) a.initTracing(ctx)
@ -180,6 +183,7 @@ func newApp(ctx context.Context, opt ...Option) App {
func (a *app) initAppSettings() { func (a *app) initAppSettings() {
a.settings = &appSettings{ a.settings = &appSettings{
reconnectInterval: fetchReconnectInterval(a.cfg), reconnectInterval: fetchReconnectInterval(a.cfg),
dialerSource: getDialerSource(a.log, a.cfg),
} }
a.settings.update(a.cfg, a.log) a.settings.update(a.cfg, a.log)
} }
@ -559,6 +563,10 @@ func (a *app) configReload(ctx context.Context) {
a.logLevel.SetLevel(lvl) a.logLevel.SetLevel(lvl)
} }
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
}
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil { if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err)) a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
} }

View file

@ -16,6 +16,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
@ -58,6 +59,8 @@ const (
defaultCORSMaxAge = 600 // seconds defaultCORSMaxAge = 600 // seconds
defaultMultinetFallbackDelay = 300 * time.Millisecond
cfgServer = "server" cfgServer = "server"
cfgTLSEnabled = "tls.enabled" cfgTLSEnabled = "tls.enabled"
cfgTLSCertFile = "tls.cert_file" cfgTLSCertFile = "tls.cert_file"
@ -151,6 +154,13 @@ const (
cfgCORSAllowCredentials = "cors.allow_credentials" cfgCORSAllowCredentials = "cors.allow_credentials"
cfgCORSMaxAge = "cors.max_age" cfgCORSMaxAge = "cors.max_age"
// Multinet.
cfgMultinetEnabled = "multinet.enabled"
cfgMultinetBalancer = "multinet.balancer"
cfgMultinetRestrict = "multinet.restrict"
cfgMultinetFallbackDelay = "multinet.fallback_delay"
cfgMultinetSubnets = "multinet.subnets"
// Command line args. // Command line args.
cmdHelp = "help" cmdHelp = "help"
cmdVersion = "version" cmdVersion = "version"
@ -245,6 +255,9 @@ func settings() *viper.Viper {
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader) v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"}) v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
// multinet
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
// Binding flags // Binding flags
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil { if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
panic(err) panic(err)
@ -584,7 +597,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
return servers return servers
} }
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) { func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSource *internalnet.DialerSource) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
key, err := getFrostFSKey(cfg, logger) key, err := getFrostFSKey(cfg, logger)
if err != nil { if err != nil {
logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err)) logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
@ -643,6 +656,7 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.
interceptors := []grpc.DialOption{ interceptors := []grpc.DialOption{
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()), grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()), grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
grpc.WithContextDialer(dialSource.GrpcContextDialer()),
} }
prm.SetGRPCDialOptions(interceptors...) prm.SetGRPCDialOptions(interceptors...)
prmTree.SetGRPCDialOptions(interceptors...) prmTree.SetGRPCDialOptions(interceptors...)
@ -745,3 +759,34 @@ func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue
return defaultValue return defaultValue
} }
func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource {
source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger))
if err != nil {
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err))
}
return source
}
func fetchMultinetConfig(v *viper.Viper, l *zap.Logger) (cfg internalnet.Config) {
cfg.Enabled = v.GetBool(cfgMultinetEnabled)
cfg.Balancer = v.GetString(cfgMultinetBalancer)
cfg.Restrict = v.GetBool(cfgMultinetRestrict)
cfg.FallbackDelay = v.GetDuration(cfgMultinetFallbackDelay)
cfg.Subnets = make([]internalnet.Subnet, 0, 5)
cfg.EventHandler = internalnet.NewLogEventHandler(l)
for i := 0; ; i++ {
key := cfgMultinetSubnets + "." + strconv.Itoa(i) + "."
subnet := internalnet.Subnet{}
subnet.Prefix = v.GetString(key + "mask")
if subnet.Prefix == "" {
break
}
subnet.SourceIPs = v.GetStringSlice(key + "source_ips")
cfg.Subnets = append(cfg.Subnets, subnet)
}
return
}

View file

@ -81,4 +81,6 @@ const (
WarnDuplicateAddress = "duplicate address" WarnDuplicateAddress = "duplicate address"
MultinetDialSuccess = "multinet dial successful" MultinetDialSuccess = "multinet dial successful"
MultinetDialFail = "multinet dial failed" MultinetDialFail = "multinet dial failed"
FailedToLoadMultinetConfig = "failed to load multinet config"
MultinetConfigWontBeUpdated = "multinet config won't be updated"
) )