[#xxx] Use source dialer for gRPC connection to storage
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
parent
a76e751848
commit
bdd64b6ed1
4 changed files with 56 additions and 4 deletions
|
@ -21,6 +21,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||
|
@ -87,6 +88,7 @@ type (
|
|||
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
|
||||
appSettings struct {
|
||||
reconnectInterval time.Duration
|
||||
dialerSource *internalnet.DialerSource
|
||||
|
||||
mu sync.RWMutex
|
||||
defaultTimestamp bool
|
||||
|
@ -148,6 +150,8 @@ func newApp(ctx context.Context, opt ...Option) App {
|
|||
opt[i](a)
|
||||
}
|
||||
|
||||
a.initAppSettings()
|
||||
|
||||
// -- setup FastHTTP server --
|
||||
a.webServer.Name = "frost-http-gw"
|
||||
a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize)
|
||||
|
@ -161,7 +165,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
|||
a.webServer.DisablePreParseMultipartForm = true
|
||||
a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
|
||||
// -- -- -- -- -- -- -- -- -- -- -- -- -- --
|
||||
a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg)
|
||||
a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg, a.settings.dialerSource)
|
||||
|
||||
var owner user.ID
|
||||
user.IDFromKey(&owner, a.key.PrivateKey.PublicKey)
|
||||
|
@ -169,7 +173,6 @@ func newApp(ctx context.Context, opt ...Option) App {
|
|||
|
||||
a.setRuntimeParameters()
|
||||
|
||||
a.initAppSettings()
|
||||
a.initResolver()
|
||||
a.initMetrics()
|
||||
a.initTracing(ctx)
|
||||
|
@ -180,6 +183,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
|||
func (a *app) initAppSettings() {
|
||||
a.settings = &appSettings{
|
||||
reconnectInterval: fetchReconnectInterval(a.cfg),
|
||||
dialerSource: getDialerSource(a.log, a.cfg),
|
||||
}
|
||||
a.settings.update(a.cfg, a.log)
|
||||
}
|
||||
|
@ -559,6 +563,10 @@ func (a *app) configReload(ctx context.Context) {
|
|||
a.logLevel.SetLevel(lvl)
|
||||
}
|
||||
|
||||
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
|
||||
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
||||
}
|
||||
|
||||
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
|
||||
a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
|
@ -151,6 +152,13 @@ const (
|
|||
cfgCORSAllowCredentials = "cors.allow_credentials"
|
||||
cfgCORSMaxAge = "cors.max_age"
|
||||
|
||||
// Multinet.
|
||||
cfgMultinetEnabled = "multinet.enabled"
|
||||
cfgMultinetBalancer = "multinet.balancer"
|
||||
cfgMultinetRestrict = "multinet.restrict"
|
||||
cfgMultinetFallbackDelay = "multinet.fallback_delay"
|
||||
cfgMultinetSubnets = "multinet.subnets"
|
||||
|
||||
// Command line args.
|
||||
cmdHelp = "help"
|
||||
cmdVersion = "version"
|
||||
|
@ -584,7 +592,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
|||
return servers
|
||||
}
|
||||
|
||||
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
|
||||
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSource *internalnet.DialerSource) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
|
||||
key, err := getFrostFSKey(cfg, logger)
|
||||
if err != nil {
|
||||
logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
||||
|
@ -643,6 +651,7 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.
|
|||
interceptors := []grpc.DialOption{
|
||||
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
||||
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
|
||||
grpc.WithContextDialer(dialSource.GrpcContextDialer()),
|
||||
}
|
||||
prm.SetGRPCDialOptions(interceptors...)
|
||||
prmTree.SetGRPCDialOptions(interceptors...)
|
||||
|
@ -745,3 +754,34 @@ func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue
|
|||
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource {
|
||||
source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger))
|
||||
if err != nil {
|
||||
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err))
|
||||
}
|
||||
return source
|
||||
}
|
||||
|
||||
func fetchMultinetConfig(v *viper.Viper, l *zap.Logger) (cfg internalnet.Config) {
|
||||
cfg.Enabled = v.GetBool(cfgMultinetEnabled)
|
||||
cfg.Balancer = v.GetString(cfgMultinetBalancer)
|
||||
cfg.Restrict = v.GetBool(cfgMultinetRestrict)
|
||||
cfg.FallbackDelay = v.GetDuration(cfgMultinetFallbackDelay)
|
||||
cfg.Subnets = make([]internalnet.Subnet, 0, 5)
|
||||
cfg.EventHandler = internalnet.NewLogEventHandler(l)
|
||||
|
||||
for i := 0; ; i++ {
|
||||
key := cfgMultinetSubnets + "." + strconv.Itoa(i) + "."
|
||||
subnet := internalnet.Subnet{}
|
||||
|
||||
subnet.Prefix = v.GetString(key + "mask")
|
||||
if subnet.Prefix == "" {
|
||||
break
|
||||
}
|
||||
subnet.SourceIPs = v.GetStringSlice(key + "source_ips")
|
||||
cfg.Subnets = append(cfg.Subnets, subnet)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -79,4 +79,8 @@ const (
|
|||
ServerReconnectedSuccessfully = "server reconnected successfully"
|
||||
ServerReconnectFailed = "failed to reconnect server"
|
||||
WarnDuplicateAddress = "duplicate address"
|
||||
FailedToLoadMultinetConfig = "failed to load multinet config"
|
||||
MultinetConfigWontBeUpdated = "multinet config won't be updated"
|
||||
MultinetDialSuccess = "multinet dial successful"
|
||||
MultinetDialFail = "multinet dial failed"
|
||||
)
|
||||
|
|
|
@ -3,7 +3,7 @@ package net
|
|||
import (
|
||||
"net"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in a new issue