From 3a7ed8220e94dc15cdf821ac357d758d77b299f3 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Mon, 12 Sep 2022 17:30:41 +0300 Subject: [PATCH] [#702] Refactor app initialization Signed-off-by: Denis Kirillov --- cmd/s3-gw/app.go | 276 +++++++++++++++++++++++------------------------ 1 file changed, 136 insertions(+), 140 deletions(-) diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index e7531d5..f65f49c 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -39,20 +39,20 @@ type ( log *zap.Logger cfg *viper.Viper pool *pool.Pool + key *keys.PrivateKey + nc *notifications.Controller obj layer.Client api api.Handler metrics *appMetrics bucketResolver *resolver.BucketResolver tlsProvider *certProvider - - maxClients api.MaxClients + services []*Service + settings *appSettings + maxClients api.MaxClients webDone chan struct{} wrkDone chan struct{} - - services []*Service - settings *appSettings } appSettings struct { @@ -87,159 +87,89 @@ type ( ) func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { - l := log.logger - - var ( - key *keys.PrivateKey - err error - caller api.Handler - ctr auth.Center - obj layer.Client - nc *notifications.Controller - - prmPool pool.InitParameters - - reBalance = defaultRebalanceInterval - conTimeout = defaultConnectTimeout - hckTimeout = defaultHealthcheckTimeout - - maxClientsCount = defaultMaxClientsCount - maxClientsDeadline = defaultMaxClientsDeadline - poolErrorThreshold = defaultPoolErrorThreshold - ) - - if v := v.GetDuration(cfgConnectTimeout); v > 0 { - conTimeout = v - } - - if v := v.GetDuration(cfgHealthcheckTimeout); v > 0 { - hckTimeout = v - } - - if v := v.GetInt(cfgMaxClientsCount); v > 0 { - maxClientsCount = v - } - - if v := v.GetDuration(cfgMaxClientsDeadline); v > 0 { - maxClientsDeadline = v - } - - if v := v.GetDuration(cfgRebalanceInterval); v > 0 { - reBalance = v - } - - if v := v.GetUint32(cfgPoolErrorThreshold); v > 0 { - poolErrorThreshold = v - } - - password := wallet.GetPassword(v, cfgWalletPassphrase) - if key, err = wallet.GetKeyFromPath(v.GetString(cfgWalletPath), v.GetString(cfgWalletAddress), password); err != nil { - l.Fatal("could not load NeoFS private key", zap.Error(err)) - } - - l.Info("using credentials", zap.String("NeoFS", hex.EncodeToString(key.PublicKey().Bytes()))) - - prmPool.SetKey(&key.PrivateKey) - prmPool.SetNodeDialTimeout(conTimeout) - prmPool.SetHealthcheckTimeout(hckTimeout) - prmPool.SetErrorThreshold(poolErrorThreshold) - prmPool.SetClientRebalanceInterval(reBalance) - for _, peer := range fetchPeers(l, v) { - prmPool.AddNode(peer) - } - - conns, err := pool.NewPool(prmPool) - if err != nil { - l.Fatal("failed to create connection pool", zap.Error(err)) - } - - if err = conns.Dial(ctx); err != nil { - l.Fatal("failed to dial connection pool", zap.Error(err)) - } - - // prepare random key for anonymous requests - randomKey, err := keys.NewPrivateKey() - if err != nil { - l.Fatal("couldn't generate random key", zap.Error(err)) - } - - resolveCfg := &resolver.Config{ - NeoFS: neofs.NewResolverNeoFS(conns), - RPCAddress: v.GetString(cfgRPCEndpoint), - } - - order := v.GetStringSlice(cfgResolveOrder) - if resolveCfg.RPCAddress == "" { - order = remove(order, resolver.NNSResolver) - l.Warn(fmt.Sprintf("resolver '%s' won't be used since '%s' isn't provided", resolver.NNSResolver, cfgRPCEndpoint)) - } - - bucketResolver, err := resolver.NewBucketResolver(order, resolveCfg) - if err != nil { - l.Fatal("failed to form resolver", zap.Error(err)) - } - - treeServiceEndpoint := v.GetString(cfgTreeServiceEndpoint) - treeService, err := neofs.NewTreeClient(treeServiceEndpoint, key) - if err != nil { - l.Fatal("failed to create tree service", zap.Error(err)) - } - l.Info("init tree service", zap.String("endpoint", treeServiceEndpoint)) - - layerCfg := &layer.Config{ - Caches: getCacheOptions(v, l), - AnonKey: layer.AnonymousKey{ - Key: randomKey, - }, - Resolver: bucketResolver, - TreeService: treeService, - } - - // prepare object layer - obj = layer.NewLayer(l, neofs.NewNeoFS(conns), layerCfg) - - if v.GetBool(cfgEnableNATS) { - nopts := getNotificationsOptions(v, l) - nc, err = notifications.NewController(nopts, l) - if err != nil { - l.Fatal("failed to enable notifications", zap.Error(err)) - } - - if err = obj.Initialize(ctx, nc); err != nil { - l.Fatal("couldn't initialize layer", zap.Error(err)) - } - } + conns, key := getPool(ctx, log.logger, v) // prepare auth center - ctr = auth.New(neofs.NewAuthmateNeoFS(conns), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, l)) - handlerOptions := getHandlerOptions(v, l) - - if caller, err = handler.New(l, obj, nc, handlerOptions); err != nil { - l.Fatal("could not initialize API handler", zap.Error(err)) - } + ctr := auth.New(neofs.NewAuthmateNeoFS(conns), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, log.logger)) app := &App{ ctr: ctr, - log: l, + log: log.logger, cfg: v, pool: conns, - obj: obj, - api: caller, + key: key, webDone: make(chan struct{}, 1), wrkDone: make(chan struct{}, 1), - maxClients: api.NewMaxClientsMiddleware(maxClientsCount, maxClientsDeadline), + maxClients: newMaxClients(v), settings: &appSettings{LogLevel: log.lvl}, } - app.initMetrics() - app.initResolver() - app.initTLSProvider() + app.init(ctx) return app } +func (a *App) init(ctx context.Context) { + a.initHandlers(ctx) + a.initMetrics() + a.initTLSProvider() +} + +func (a *App) initLayer(ctx context.Context) { + a.initResolver() + + treeServiceEndpoint := a.cfg.GetString(cfgTreeServiceEndpoint) + treeService, err := neofs.NewTreeClient(treeServiceEndpoint, a.key) + if err != nil { + a.log.Fatal("failed to create tree service", zap.Error(err)) + } + a.log.Info("init tree service", zap.String("endpoint", treeServiceEndpoint)) + + // prepare random key for anonymous requests + randomKey, err := keys.NewPrivateKey() + if err != nil { + a.log.Fatal("couldn't generate random key", zap.Error(err)) + } + + layerCfg := &layer.Config{ + Caches: getCacheOptions(a.cfg, a.log), + AnonKey: layer.AnonymousKey{ + Key: randomKey, + }, + Resolver: a.bucketResolver, + TreeService: treeService, + } + + // prepare object layer + a.obj = layer.NewLayer(a.log, neofs.NewNeoFS(a.pool), layerCfg) + + if a.cfg.GetBool(cfgEnableNATS) { + nopts := getNotificationsOptions(a.cfg, a.log) + a.nc, err = notifications.NewController(nopts, a.log) + if err != nil { + a.log.Fatal("failed to enable notifications", zap.Error(err)) + } + + if err = a.obj.Initialize(ctx, a.nc); err != nil { + a.log.Fatal("couldn't initialize layer", zap.Error(err)) + } + } +} + +func (a *App) initHandlers(ctx context.Context) { + a.initLayer(ctx) + + var err error + handlerOptions := getHandlerOptions(a.cfg, a.log) + + a.api, err = handler.New(a.log, a.obj, a.nc, handlerOptions) + if err != nil { + a.log.Fatal("could not initialize API handler", zap.Error(err)) + } +} + func (a *App) initMetrics() { gateMetricsProvider := newGateMetrics(neofs.NewPoolStatistic(a.pool)) a.metrics = newAppMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled)) @@ -278,6 +208,72 @@ func (a *App) getResolverConfig() ([]string, *resolver.Config) { return order, resolveCfg } +func newMaxClients(cfg *viper.Viper) api.MaxClients { + maxClientsCount := cfg.GetInt(cfgMaxClientsCount) + if maxClientsCount <= 0 { + maxClientsCount = defaultMaxClientsCount + } + + maxClientsDeadline := cfg.GetDuration(cfgMaxClientsDeadline) + if maxClientsDeadline <= 0 { + maxClientsDeadline = defaultMaxClientsDeadline + } + + return api.NewMaxClientsMiddleware(maxClientsCount, maxClientsDeadline) +} + +func getPool(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *keys.PrivateKey) { + var prm pool.InitParameters + + password := wallet.GetPassword(cfg, cfgWalletPassphrase) + key, err := wallet.GetKeyFromPath(cfg.GetString(cfgWalletPath), cfg.GetString(cfgWalletAddress), password) + if err != nil { + logger.Fatal("could not load NeoFS private key", zap.Error(err)) + } + + prm.SetKey(&key.PrivateKey) + logger.Info("using credentials", zap.String("NeoFS", hex.EncodeToString(key.PublicKey().Bytes()))) + + for _, peer := range fetchPeers(logger, cfg) { + prm.AddNode(peer) + } + + connTimeout := cfg.GetDuration(cfgConnectTimeout) + if connTimeout <= 0 { + connTimeout = defaultConnectTimeout + } + prm.SetNodeDialTimeout(connTimeout) + + healthCheckTimeout := cfg.GetDuration(cfgHealthcheckTimeout) + if healthCheckTimeout <= 0 { + healthCheckTimeout = defaultHealthcheckTimeout + } + prm.SetNodeDialTimeout(healthCheckTimeout) + + rebalanceInterval := cfg.GetDuration(cfgRebalanceInterval) + if rebalanceInterval <= 0 { + rebalanceInterval = defaultRebalanceInterval + } + prm.SetClientRebalanceInterval(rebalanceInterval) + + errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold) + if errorThreshold <= 0 { + errorThreshold = defaultPoolErrorThreshold + } + prm.SetErrorThreshold(errorThreshold) + + p, err := pool.NewPool(prm) + if err != nil { + logger.Fatal("failed to create connection pool", zap.Error(err)) + } + + if err = p.Dial(ctx); err != nil { + logger.Fatal("failed to dial connection pool", zap.Error(err)) + } + + return p, key +} + func newAppMetrics(logger *zap.Logger, provider GateMetricsCollector, enabled bool) *appMetrics { if !enabled { logger.Warn("metrics are disabled")