[#160] Use source dialer for gRPC connection to storage
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
parent
69b7761bd6
commit
8bc64ce5e9
3 changed files with 58 additions and 3 deletions
|
@ -21,6 +21,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||
|
@ -87,6 +88,7 @@ type (
|
|||
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
|
||||
appSettings struct {
|
||||
reconnectInterval time.Duration
|
||||
dialerSource *internalnet.DialerSource
|
||||
|
||||
mu sync.RWMutex
|
||||
defaultTimestamp bool
|
||||
|
@ -148,6 +150,8 @@ func newApp(ctx context.Context, opt ...Option) App {
|
|||
opt[i](a)
|
||||
}
|
||||
|
||||
a.initAppSettings()
|
||||
|
||||
// -- setup FastHTTP server --
|
||||
a.webServer.Name = "frost-http-gw"
|
||||
a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize)
|
||||
|
@ -161,7 +165,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
|||
a.webServer.DisablePreParseMultipartForm = true
|
||||
a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
|
||||
// -- -- -- -- -- -- -- -- -- -- -- -- -- --
|
||||
a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg)
|
||||
a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg, a.settings.dialerSource)
|
||||
|
||||
var owner user.ID
|
||||
user.IDFromKey(&owner, a.key.PrivateKey.PublicKey)
|
||||
|
@ -169,7 +173,6 @@ func newApp(ctx context.Context, opt ...Option) App {
|
|||
|
||||
a.setRuntimeParameters()
|
||||
|
||||
a.initAppSettings()
|
||||
a.initResolver()
|
||||
a.initMetrics()
|
||||
a.initTracing(ctx)
|
||||
|
@ -180,6 +183,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
|||
func (a *app) initAppSettings() {
|
||||
a.settings = &appSettings{
|
||||
reconnectInterval: fetchReconnectInterval(a.cfg),
|
||||
dialerSource: getDialerSource(a.log, a.cfg),
|
||||
}
|
||||
a.settings.update(a.cfg, a.log)
|
||||
}
|
||||
|
@ -559,6 +563,10 @@ func (a *app) configReload(ctx context.Context) {
|
|||
a.logLevel.SetLevel(lvl)
|
||||
}
|
||||
|
||||
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
|
||||
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
||||
}
|
||||
|
||||
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
|
||||
a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
|
@ -58,6 +59,8 @@ const (
|
|||
|
||||
defaultCORSMaxAge = 600 // seconds
|
||||
|
||||
defaultMultinetFallbackDelay = 300 * time.Millisecond
|
||||
|
||||
cfgServer = "server"
|
||||
cfgTLSEnabled = "tls.enabled"
|
||||
cfgTLSCertFile = "tls.cert_file"
|
||||
|
@ -151,6 +154,13 @@ const (
|
|||
cfgCORSAllowCredentials = "cors.allow_credentials"
|
||||
cfgCORSMaxAge = "cors.max_age"
|
||||
|
||||
// Multinet.
|
||||
cfgMultinetEnabled = "multinet.enabled"
|
||||
cfgMultinetBalancer = "multinet.balancer"
|
||||
cfgMultinetRestrict = "multinet.restrict"
|
||||
cfgMultinetFallbackDelay = "multinet.fallback_delay"
|
||||
cfgMultinetSubnets = "multinet.subnets"
|
||||
|
||||
// Command line args.
|
||||
cmdHelp = "help"
|
||||
cmdVersion = "version"
|
||||
|
@ -245,6 +255,9 @@ func settings() *viper.Viper {
|
|||
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
||||
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
|
||||
|
||||
// multinet
|
||||
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
|
||||
|
||||
// Binding flags
|
||||
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
|
||||
panic(err)
|
||||
|
@ -584,7 +597,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
|||
return servers
|
||||
}
|
||||
|
||||
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
|
||||
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSource *internalnet.DialerSource) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
|
||||
key, err := getFrostFSKey(cfg, logger)
|
||||
if err != nil {
|
||||
logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
||||
|
@ -643,6 +656,7 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.
|
|||
interceptors := []grpc.DialOption{
|
||||
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
||||
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
|
||||
grpc.WithContextDialer(dialSource.GrpcContextDialer()),
|
||||
}
|
||||
prm.SetGRPCDialOptions(interceptors...)
|
||||
prmTree.SetGRPCDialOptions(interceptors...)
|
||||
|
@ -745,3 +759,34 @@ func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue
|
|||
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource {
|
||||
source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger))
|
||||
if err != nil {
|
||||
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err))
|
||||
}
|
||||
return source
|
||||
}
|
||||
|
||||
func fetchMultinetConfig(v *viper.Viper, l *zap.Logger) (cfg internalnet.Config) {
|
||||
cfg.Enabled = v.GetBool(cfgMultinetEnabled)
|
||||
cfg.Balancer = v.GetString(cfgMultinetBalancer)
|
||||
cfg.Restrict = v.GetBool(cfgMultinetRestrict)
|
||||
cfg.FallbackDelay = v.GetDuration(cfgMultinetFallbackDelay)
|
||||
cfg.Subnets = make([]internalnet.Subnet, 0, 5)
|
||||
cfg.EventHandler = internalnet.NewLogEventHandler(l)
|
||||
|
||||
for i := 0; ; i++ {
|
||||
key := cfgMultinetSubnets + "." + strconv.Itoa(i) + "."
|
||||
subnet := internalnet.Subnet{}
|
||||
|
||||
subnet.Prefix = v.GetString(key + "mask")
|
||||
if subnet.Prefix == "" {
|
||||
break
|
||||
}
|
||||
subnet.SourceIPs = v.GetStringSlice(key + "source_ips")
|
||||
cfg.Subnets = append(cfg.Subnets, subnet)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -81,4 +81,6 @@ const (
|
|||
WarnDuplicateAddress = "duplicate address"
|
||||
MultinetDialSuccess = "multinet dial successful"
|
||||
MultinetDialFail = "multinet dial failed"
|
||||
FailedToLoadMultinetConfig = "failed to load multinet config"
|
||||
MultinetConfigWontBeUpdated = "multinet config won't be updated"
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue