forked from TrueCloudLab/frostfs-s3-gw
[#702] Refactor app initialization
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
1fa28dfacd
commit
3a7ed8220e
1 changed files with 136 additions and 140 deletions
276
cmd/s3-gw/app.go
276
cmd/s3-gw/app.go
|
@ -39,20 +39,20 @@ type (
|
|||
log *zap.Logger
|
||||
cfg *viper.Viper
|
||||
pool *pool.Pool
|
||||
key *keys.PrivateKey
|
||||
nc *notifications.Controller
|
||||
obj layer.Client
|
||||
api api.Handler
|
||||
|
||||
metrics *appMetrics
|
||||
bucketResolver *resolver.BucketResolver
|
||||
tlsProvider *certProvider
|
||||
|
||||
maxClients api.MaxClients
|
||||
services []*Service
|
||||
settings *appSettings
|
||||
maxClients api.MaxClients
|
||||
|
||||
webDone chan struct{}
|
||||
wrkDone chan struct{}
|
||||
|
||||
services []*Service
|
||||
settings *appSettings
|
||||
}
|
||||
|
||||
appSettings struct {
|
||||
|
@ -87,159 +87,89 @@ type (
|
|||
)
|
||||
|
||||
func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
||||
l := log.logger
|
||||
|
||||
var (
|
||||
key *keys.PrivateKey
|
||||
err error
|
||||
caller api.Handler
|
||||
ctr auth.Center
|
||||
obj layer.Client
|
||||
nc *notifications.Controller
|
||||
|
||||
prmPool pool.InitParameters
|
||||
|
||||
reBalance = defaultRebalanceInterval
|
||||
conTimeout = defaultConnectTimeout
|
||||
hckTimeout = defaultHealthcheckTimeout
|
||||
|
||||
maxClientsCount = defaultMaxClientsCount
|
||||
maxClientsDeadline = defaultMaxClientsDeadline
|
||||
poolErrorThreshold = defaultPoolErrorThreshold
|
||||
)
|
||||
|
||||
if v := v.GetDuration(cfgConnectTimeout); v > 0 {
|
||||
conTimeout = v
|
||||
}
|
||||
|
||||
if v := v.GetDuration(cfgHealthcheckTimeout); v > 0 {
|
||||
hckTimeout = v
|
||||
}
|
||||
|
||||
if v := v.GetInt(cfgMaxClientsCount); v > 0 {
|
||||
maxClientsCount = v
|
||||
}
|
||||
|
||||
if v := v.GetDuration(cfgMaxClientsDeadline); v > 0 {
|
||||
maxClientsDeadline = v
|
||||
}
|
||||
|
||||
if v := v.GetDuration(cfgRebalanceInterval); v > 0 {
|
||||
reBalance = v
|
||||
}
|
||||
|
||||
if v := v.GetUint32(cfgPoolErrorThreshold); v > 0 {
|
||||
poolErrorThreshold = v
|
||||
}
|
||||
|
||||
password := wallet.GetPassword(v, cfgWalletPassphrase)
|
||||
if key, err = wallet.GetKeyFromPath(v.GetString(cfgWalletPath), v.GetString(cfgWalletAddress), password); err != nil {
|
||||
l.Fatal("could not load NeoFS private key", zap.Error(err))
|
||||
}
|
||||
|
||||
l.Info("using credentials", zap.String("NeoFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
||||
|
||||
prmPool.SetKey(&key.PrivateKey)
|
||||
prmPool.SetNodeDialTimeout(conTimeout)
|
||||
prmPool.SetHealthcheckTimeout(hckTimeout)
|
||||
prmPool.SetErrorThreshold(poolErrorThreshold)
|
||||
prmPool.SetClientRebalanceInterval(reBalance)
|
||||
for _, peer := range fetchPeers(l, v) {
|
||||
prmPool.AddNode(peer)
|
||||
}
|
||||
|
||||
conns, err := pool.NewPool(prmPool)
|
||||
if err != nil {
|
||||
l.Fatal("failed to create connection pool", zap.Error(err))
|
||||
}
|
||||
|
||||
if err = conns.Dial(ctx); err != nil {
|
||||
l.Fatal("failed to dial connection pool", zap.Error(err))
|
||||
}
|
||||
|
||||
// prepare random key for anonymous requests
|
||||
randomKey, err := keys.NewPrivateKey()
|
||||
if err != nil {
|
||||
l.Fatal("couldn't generate random key", zap.Error(err))
|
||||
}
|
||||
|
||||
resolveCfg := &resolver.Config{
|
||||
NeoFS: neofs.NewResolverNeoFS(conns),
|
||||
RPCAddress: v.GetString(cfgRPCEndpoint),
|
||||
}
|
||||
|
||||
order := v.GetStringSlice(cfgResolveOrder)
|
||||
if resolveCfg.RPCAddress == "" {
|
||||
order = remove(order, resolver.NNSResolver)
|
||||
l.Warn(fmt.Sprintf("resolver '%s' won't be used since '%s' isn't provided", resolver.NNSResolver, cfgRPCEndpoint))
|
||||
}
|
||||
|
||||
bucketResolver, err := resolver.NewBucketResolver(order, resolveCfg)
|
||||
if err != nil {
|
||||
l.Fatal("failed to form resolver", zap.Error(err))
|
||||
}
|
||||
|
||||
treeServiceEndpoint := v.GetString(cfgTreeServiceEndpoint)
|
||||
treeService, err := neofs.NewTreeClient(treeServiceEndpoint, key)
|
||||
if err != nil {
|
||||
l.Fatal("failed to create tree service", zap.Error(err))
|
||||
}
|
||||
l.Info("init tree service", zap.String("endpoint", treeServiceEndpoint))
|
||||
|
||||
layerCfg := &layer.Config{
|
||||
Caches: getCacheOptions(v, l),
|
||||
AnonKey: layer.AnonymousKey{
|
||||
Key: randomKey,
|
||||
},
|
||||
Resolver: bucketResolver,
|
||||
TreeService: treeService,
|
||||
}
|
||||
|
||||
// prepare object layer
|
||||
obj = layer.NewLayer(l, neofs.NewNeoFS(conns), layerCfg)
|
||||
|
||||
if v.GetBool(cfgEnableNATS) {
|
||||
nopts := getNotificationsOptions(v, l)
|
||||
nc, err = notifications.NewController(nopts, l)
|
||||
if err != nil {
|
||||
l.Fatal("failed to enable notifications", zap.Error(err))
|
||||
}
|
||||
|
||||
if err = obj.Initialize(ctx, nc); err != nil {
|
||||
l.Fatal("couldn't initialize layer", zap.Error(err))
|
||||
}
|
||||
}
|
||||
conns, key := getPool(ctx, log.logger, v)
|
||||
|
||||
// prepare auth center
|
||||
ctr = auth.New(neofs.NewAuthmateNeoFS(conns), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, l))
|
||||
handlerOptions := getHandlerOptions(v, l)
|
||||
|
||||
if caller, err = handler.New(l, obj, nc, handlerOptions); err != nil {
|
||||
l.Fatal("could not initialize API handler", zap.Error(err))
|
||||
}
|
||||
ctr := auth.New(neofs.NewAuthmateNeoFS(conns), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, log.logger))
|
||||
|
||||
app := &App{
|
||||
ctr: ctr,
|
||||
log: l,
|
||||
log: log.logger,
|
||||
cfg: v,
|
||||
pool: conns,
|
||||
obj: obj,
|
||||
api: caller,
|
||||
key: key,
|
||||
|
||||
webDone: make(chan struct{}, 1),
|
||||
wrkDone: make(chan struct{}, 1),
|
||||
|
||||
maxClients: api.NewMaxClientsMiddleware(maxClientsCount, maxClientsDeadline),
|
||||
maxClients: newMaxClients(v),
|
||||
settings: &appSettings{LogLevel: log.lvl},
|
||||
}
|
||||
|
||||
app.initMetrics()
|
||||
app.initResolver()
|
||||
app.initTLSProvider()
|
||||
app.init(ctx)
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
func (a *App) init(ctx context.Context) {
|
||||
a.initHandlers(ctx)
|
||||
a.initMetrics()
|
||||
a.initTLSProvider()
|
||||
}
|
||||
|
||||
func (a *App) initLayer(ctx context.Context) {
|
||||
a.initResolver()
|
||||
|
||||
treeServiceEndpoint := a.cfg.GetString(cfgTreeServiceEndpoint)
|
||||
treeService, err := neofs.NewTreeClient(treeServiceEndpoint, a.key)
|
||||
if err != nil {
|
||||
a.log.Fatal("failed to create tree service", zap.Error(err))
|
||||
}
|
||||
a.log.Info("init tree service", zap.String("endpoint", treeServiceEndpoint))
|
||||
|
||||
// prepare random key for anonymous requests
|
||||
randomKey, err := keys.NewPrivateKey()
|
||||
if err != nil {
|
||||
a.log.Fatal("couldn't generate random key", zap.Error(err))
|
||||
}
|
||||
|
||||
layerCfg := &layer.Config{
|
||||
Caches: getCacheOptions(a.cfg, a.log),
|
||||
AnonKey: layer.AnonymousKey{
|
||||
Key: randomKey,
|
||||
},
|
||||
Resolver: a.bucketResolver,
|
||||
TreeService: treeService,
|
||||
}
|
||||
|
||||
// prepare object layer
|
||||
a.obj = layer.NewLayer(a.log, neofs.NewNeoFS(a.pool), layerCfg)
|
||||
|
||||
if a.cfg.GetBool(cfgEnableNATS) {
|
||||
nopts := getNotificationsOptions(a.cfg, a.log)
|
||||
a.nc, err = notifications.NewController(nopts, a.log)
|
||||
if err != nil {
|
||||
a.log.Fatal("failed to enable notifications", zap.Error(err))
|
||||
}
|
||||
|
||||
if err = a.obj.Initialize(ctx, a.nc); err != nil {
|
||||
a.log.Fatal("couldn't initialize layer", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) initHandlers(ctx context.Context) {
|
||||
a.initLayer(ctx)
|
||||
|
||||
var err error
|
||||
handlerOptions := getHandlerOptions(a.cfg, a.log)
|
||||
|
||||
a.api, err = handler.New(a.log, a.obj, a.nc, handlerOptions)
|
||||
if err != nil {
|
||||
a.log.Fatal("could not initialize API handler", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) initMetrics() {
|
||||
gateMetricsProvider := newGateMetrics(neofs.NewPoolStatistic(a.pool))
|
||||
a.metrics = newAppMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
|
||||
|
@ -278,6 +208,72 @@ func (a *App) getResolverConfig() ([]string, *resolver.Config) {
|
|||
return order, resolveCfg
|
||||
}
|
||||
|
||||
func newMaxClients(cfg *viper.Viper) api.MaxClients {
|
||||
maxClientsCount := cfg.GetInt(cfgMaxClientsCount)
|
||||
if maxClientsCount <= 0 {
|
||||
maxClientsCount = defaultMaxClientsCount
|
||||
}
|
||||
|
||||
maxClientsDeadline := cfg.GetDuration(cfgMaxClientsDeadline)
|
||||
if maxClientsDeadline <= 0 {
|
||||
maxClientsDeadline = defaultMaxClientsDeadline
|
||||
}
|
||||
|
||||
return api.NewMaxClientsMiddleware(maxClientsCount, maxClientsDeadline)
|
||||
}
|
||||
|
||||
func getPool(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *keys.PrivateKey) {
|
||||
var prm pool.InitParameters
|
||||
|
||||
password := wallet.GetPassword(cfg, cfgWalletPassphrase)
|
||||
key, err := wallet.GetKeyFromPath(cfg.GetString(cfgWalletPath), cfg.GetString(cfgWalletAddress), password)
|
||||
if err != nil {
|
||||
logger.Fatal("could not load NeoFS private key", zap.Error(err))
|
||||
}
|
||||
|
||||
prm.SetKey(&key.PrivateKey)
|
||||
logger.Info("using credentials", zap.String("NeoFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
||||
|
||||
for _, peer := range fetchPeers(logger, cfg) {
|
||||
prm.AddNode(peer)
|
||||
}
|
||||
|
||||
connTimeout := cfg.GetDuration(cfgConnectTimeout)
|
||||
if connTimeout <= 0 {
|
||||
connTimeout = defaultConnectTimeout
|
||||
}
|
||||
prm.SetNodeDialTimeout(connTimeout)
|
||||
|
||||
healthCheckTimeout := cfg.GetDuration(cfgHealthcheckTimeout)
|
||||
if healthCheckTimeout <= 0 {
|
||||
healthCheckTimeout = defaultHealthcheckTimeout
|
||||
}
|
||||
prm.SetNodeDialTimeout(healthCheckTimeout)
|
||||
|
||||
rebalanceInterval := cfg.GetDuration(cfgRebalanceInterval)
|
||||
if rebalanceInterval <= 0 {
|
||||
rebalanceInterval = defaultRebalanceInterval
|
||||
}
|
||||
prm.SetClientRebalanceInterval(rebalanceInterval)
|
||||
|
||||
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
|
||||
if errorThreshold <= 0 {
|
||||
errorThreshold = defaultPoolErrorThreshold
|
||||
}
|
||||
prm.SetErrorThreshold(errorThreshold)
|
||||
|
||||
p, err := pool.NewPool(prm)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to create connection pool", zap.Error(err))
|
||||
}
|
||||
|
||||
if err = p.Dial(ctx); err != nil {
|
||||
logger.Fatal("failed to dial connection pool", zap.Error(err))
|
||||
}
|
||||
|
||||
return p, key
|
||||
}
|
||||
|
||||
func newAppMetrics(logger *zap.Logger, provider GateMetricsCollector, enabled bool) *appMetrics {
|
||||
if !enabled {
|
||||
logger.Warn("metrics are disabled")
|
||||
|
|
Loading…
Reference in a new issue