From 8bc64ce5e997b169b9c146b3ee291b86c3fdab62 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 29 Oct 2024 14:41:49 +0300 Subject: [PATCH] [#160] Use source dialer for gRPC connection to storage Signed-off-by: Alex Vanin --- cmd/http-gw/app.go | 12 +++++++++-- cmd/http-gw/settings.go | 47 ++++++++++++++++++++++++++++++++++++++++- internal/logs/logs.go | 2 ++ 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/cmd/http-gw/app.go b/cmd/http-gw/app.go index fa93a0e..84379d4 100644 --- a/cmd/http-gw/app.go +++ b/cmd/http-gw/app.go @@ -21,6 +21,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" + internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" @@ -87,6 +88,7 @@ type ( // appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex. appSettings struct { reconnectInterval time.Duration + dialerSource *internalnet.DialerSource mu sync.RWMutex defaultTimestamp bool @@ -148,6 +150,8 @@ func newApp(ctx context.Context, opt ...Option) App { opt[i](a) } + a.initAppSettings() + // -- setup FastHTTP server -- a.webServer.Name = "frost-http-gw" a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize) @@ -161,7 +165,7 @@ func newApp(ctx context.Context, opt ...Option) App { a.webServer.DisablePreParseMultipartForm = true a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody) // -- -- -- -- -- -- -- -- -- -- -- -- -- -- - a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg) + a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg, a.settings.dialerSource) var owner user.ID user.IDFromKey(&owner, a.key.PrivateKey.PublicKey) @@ -169,7 +173,6 @@ func newApp(ctx context.Context, opt ...Option) App { a.setRuntimeParameters() - a.initAppSettings() a.initResolver() a.initMetrics() a.initTracing(ctx) @@ -180,6 +183,7 @@ func newApp(ctx context.Context, opt ...Option) App { func (a *app) initAppSettings() { a.settings = &appSettings{ reconnectInterval: fetchReconnectInterval(a.cfg), + dialerSource: getDialerSource(a.log, a.cfg), } a.settings.update(a.cfg, a.log) } @@ -559,6 +563,10 @@ func (a *app) configReload(ctx context.Context) { a.logLevel.SetLevel(lvl) } + if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil { + a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err)) + } + if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil { a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err)) } diff --git a/cmd/http-gw/settings.go b/cmd/http-gw/settings.go index 5fb628d..f464fbc 100644 --- a/cmd/http-gw/settings.go +++ b/cmd/http-gw/settings.go @@ -16,6 +16,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" + internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" @@ -58,6 +59,8 @@ const ( defaultCORSMaxAge = 600 // seconds + defaultMultinetFallbackDelay = 300 * time.Millisecond + cfgServer = "server" cfgTLSEnabled = "tls.enabled" cfgTLSCertFile = "tls.cert_file" @@ -151,6 +154,13 @@ const ( cfgCORSAllowCredentials = "cors.allow_credentials" cfgCORSMaxAge = "cors.max_age" + // Multinet. + cfgMultinetEnabled = "multinet.enabled" + cfgMultinetBalancer = "multinet.balancer" + cfgMultinetRestrict = "multinet.restrict" + cfgMultinetFallbackDelay = "multinet.fallback_delay" + cfgMultinetSubnets = "multinet.subnets" + // Command line args. cmdHelp = "help" cmdVersion = "version" @@ -245,6 +255,9 @@ func settings() *viper.Viper { v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader) v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"}) + // multinet + v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay) + // Binding flags if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil { panic(err) @@ -584,7 +597,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo { return servers } -func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) { +func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSource *internalnet.DialerSource) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) { key, err := getFrostFSKey(cfg, logger) if err != nil { logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err)) @@ -643,6 +656,7 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool. interceptors := []grpc.DialOption{ grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()), grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()), + grpc.WithContextDialer(dialSource.GrpcContextDialer()), } prm.SetGRPCDialOptions(interceptors...) prmTree.SetGRPCDialOptions(interceptors...) @@ -745,3 +759,34 @@ func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue return defaultValue } + +func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource { + source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger)) + if err != nil { + logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err)) + } + return source +} + +func fetchMultinetConfig(v *viper.Viper, l *zap.Logger) (cfg internalnet.Config) { + cfg.Enabled = v.GetBool(cfgMultinetEnabled) + cfg.Balancer = v.GetString(cfgMultinetBalancer) + cfg.Restrict = v.GetBool(cfgMultinetRestrict) + cfg.FallbackDelay = v.GetDuration(cfgMultinetFallbackDelay) + cfg.Subnets = make([]internalnet.Subnet, 0, 5) + cfg.EventHandler = internalnet.NewLogEventHandler(l) + + for i := 0; ; i++ { + key := cfgMultinetSubnets + "." + strconv.Itoa(i) + "." + subnet := internalnet.Subnet{} + + subnet.Prefix = v.GetString(key + "mask") + if subnet.Prefix == "" { + break + } + subnet.SourceIPs = v.GetStringSlice(key + "source_ips") + cfg.Subnets = append(cfg.Subnets, subnet) + } + + return +} diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 7b7ddc1..409f87d 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -81,4 +81,6 @@ const ( WarnDuplicateAddress = "duplicate address" MultinetDialSuccess = "multinet dial successful" MultinetDialFail = "multinet dial failed" + FailedToLoadMultinetConfig = "failed to load multinet config" + MultinetConfigWontBeUpdated = "multinet config won't be updated" )