Denis Kirillov
8b6d93c94d
Add new metric frostfs_s3_lifecycler_statistic_dropped_logs Also, configuration sampling interval is added Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
359 lines
9.3 KiB
Go
359 lines
9.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"syscall"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/frostfs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/lifecycle"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/metrics"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/morph"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/morph/contract"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/notificator"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/resolver"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
|
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/spf13/viper"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type (
|
|
App struct {
|
|
log *zap.Logger
|
|
logLevel zap.AtomicLevel
|
|
key *keys.PrivateKey
|
|
cfg *viper.Viper
|
|
done chan struct{}
|
|
appServices []*metrics.Service
|
|
appMetrics *metrics.AppMetrics
|
|
notificator *notificator.Notificator
|
|
settings *appSettings
|
|
}
|
|
|
|
appSettings struct {
|
|
mu sync.RWMutex
|
|
serviceKeys []*keys.PublicKey
|
|
}
|
|
)
|
|
|
|
const (
|
|
HealthStatusUndefined int32 = 0
|
|
HealthStatusStarting int32 = 1
|
|
HealthStatusReady int32 = 2
|
|
HealthStatusShuttingDown int32 = 3
|
|
)
|
|
|
|
func newApp(ctx context.Context, cfg *viper.Viper) *App {
|
|
appMetrics := metrics.NewAppMetrics()
|
|
log := pickLogger(cfg, appMetrics)
|
|
|
|
a := &App{
|
|
log: log.logger,
|
|
logLevel: log.lvl,
|
|
cfg: cfg,
|
|
done: make(chan struct{}),
|
|
appMetrics: appMetrics,
|
|
settings: newAppSettings(cfg, log),
|
|
}
|
|
a.appMetrics.SetHealth(HealthStatusStarting)
|
|
|
|
a.init(ctx)
|
|
|
|
return a
|
|
}
|
|
|
|
func (a *App) init(ctx context.Context) {
|
|
var err error
|
|
a.key, err = fetchKey(a.cfg)
|
|
if err != nil {
|
|
a.log.Fatal(logs.FailedToLoadPrivateKey, zap.Error(err))
|
|
}
|
|
|
|
endpoints := fetchMorphEndpoints(a.cfg, a.log)
|
|
reconnectInterval := fetchMorphReconnectClientsInterval(a.cfg)
|
|
|
|
clientCfg := morph.Config{
|
|
Logger: a.log,
|
|
Endpoints: endpoints,
|
|
Key: a.key,
|
|
ReconnectInterval: reconnectInterval,
|
|
DialTimeout: fetchMorphDialTimeout(a.cfg),
|
|
}
|
|
|
|
cli, err := morph.New(ctx, clientCfg)
|
|
if err != nil {
|
|
a.log.Fatal(logs.FailedToInitMorphClient, zap.Error(err))
|
|
}
|
|
|
|
credSource := fetchCredentialSource(a.cfg, a.log)
|
|
|
|
frostfsidContract, err := resolver.ResolveContractHash(cli, a.cfg.GetString(cfgMorphContractFrostfsID))
|
|
if err != nil {
|
|
a.log.Fatal(logs.ResolveFrostfsIDContract, zap.Error(err))
|
|
}
|
|
|
|
ffsidCfg := contract.FrostFSIDConfig{
|
|
Client: cli,
|
|
ContractHash: frostfsidContract,
|
|
}
|
|
|
|
containerContract, err := resolver.ResolveContractHash(cli, a.cfg.GetString(cfgMorphContractContainer))
|
|
if err != nil {
|
|
a.log.Fatal(logs.ResolveContainerContract, zap.Error(err))
|
|
}
|
|
|
|
containerCfg := contract.ContainerConfig{
|
|
Client: cli,
|
|
ContractHash: containerContract,
|
|
Log: a.log,
|
|
}
|
|
|
|
objPool, treePool := getPools(ctx, a.cfg, a.log, a.key)
|
|
|
|
epochCh := make(chan uint64)
|
|
go func() {
|
|
<-a.done
|
|
close(epochCh)
|
|
}()
|
|
|
|
ffs := frostfs.NewFrostFS(objPool, a.log)
|
|
tr := tree.NewTree(frostfs.NewTreePoolWrapper(treePool), a.log)
|
|
|
|
lifecycleCfg := lifecycle.Config{
|
|
UserFetcher: contract.NewFrostFSID(ffsidCfg),
|
|
ContainerFetcher: contract.NewContainer(containerCfg),
|
|
FrostFSFetcher: ffs,
|
|
CredentialSource: credSource,
|
|
Settings: a.settings,
|
|
CurrentLifecycler: a.key,
|
|
Logger: a.log,
|
|
TreeFetcher: tr,
|
|
BufferSize: fetchJobFetcherBuffer(a.cfg),
|
|
EpochChannel: epochCh,
|
|
}
|
|
|
|
jobProvider := lifecycle.NewJobProvider(ctx, lifecycleCfg)
|
|
|
|
executorCfg := lifecycle.ExecutorConfig{
|
|
Logger: a.log,
|
|
Jobs: jobProvider.Jobs(),
|
|
WorkerPoolSize: fetchExecutorPoolSize(a.cfg),
|
|
TreeFetcher: tr,
|
|
FrostFSFetcher: ffs,
|
|
}
|
|
|
|
executor, err := lifecycle.NewExecutor(ctx, executorCfg)
|
|
if err != nil {
|
|
a.log.Fatal(logs.ResolveNetmapContract, zap.Error(err))
|
|
}
|
|
_ = executor // todo consider run with separate method
|
|
|
|
netmapContract, err := resolver.ResolveContractHash(cli, a.cfg.GetString(cfgMorphContractNetmap))
|
|
if err != nil {
|
|
a.log.Fatal(logs.ResolveNetmapContract, zap.Error(err))
|
|
}
|
|
|
|
notificatorCfg := notificator.Config{
|
|
Handler: func(ctx context.Context, ee notificator.NewEpochEvent) {
|
|
a.log.Info(logs.HandlerTriggered, zap.Uint64("epoch", ee.Epoch))
|
|
select {
|
|
case <-ctx.Done():
|
|
a.log.Debug(logs.HandlerContextCanceled, zap.Error(ctx.Err()))
|
|
case epochCh <- ee.Epoch:
|
|
}
|
|
},
|
|
Logger: a.log,
|
|
NewListenerFn: func(config notificator.ListenerConfig) (notificator.Listener, error) {
|
|
lnCfg := notificator.ConfigListener{
|
|
Client: cli,
|
|
Logger: a.log,
|
|
ReconnectInterval: reconnectInterval,
|
|
Parser: config.Parser,
|
|
Handler: config.Handler,
|
|
}
|
|
|
|
return notificator.NewListener(ctx, lnCfg)
|
|
},
|
|
NetmapContract: netmapContract,
|
|
}
|
|
|
|
if a.notificator, err = notificator.New(ctx, notificatorCfg); err != nil {
|
|
a.log.Fatal(logs.InitNotificator, zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func newAppSettings(v *viper.Viper, log *Logger) *appSettings {
|
|
s := &appSettings{}
|
|
|
|
s.update(v, log.logger)
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *appSettings) update(cfg *viper.Viper, log *zap.Logger) {
|
|
svcKeys, svcKeyErr := fetchLifecycleServices(cfg)
|
|
if svcKeyErr != nil {
|
|
log.Warn(logs.FailedToFetchServicesKeys, zap.Error(svcKeyErr))
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if svcKeyErr == nil {
|
|
s.serviceKeys = svcKeys
|
|
}
|
|
}
|
|
|
|
func (s *appSettings) ServicesKeys() keys.PublicKeys {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.serviceKeys
|
|
}
|
|
|
|
func (a *App) Wait() {
|
|
a.log.Info(logs.ApplicationStarted,
|
|
zap.String("app_name", "frostfs-s3-lifecycler"),
|
|
zap.String("version", Version))
|
|
|
|
a.appMetrics.SetHealth(HealthStatusReady)
|
|
a.appMetrics.SetVersion(Version)
|
|
|
|
<-a.done
|
|
|
|
a.log.Info(logs.ApplicationStopped)
|
|
}
|
|
|
|
func (a *App) Serve(ctx context.Context) {
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, syscall.SIGHUP)
|
|
|
|
a.startAppServices()
|
|
|
|
go a.notificator.Start(ctx)
|
|
|
|
loop:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
break loop
|
|
case <-sigs:
|
|
a.configReload()
|
|
}
|
|
}
|
|
|
|
a.log.Info(logs.StoppingApplication)
|
|
|
|
a.appMetrics.SetHealth(HealthStatusShuttingDown)
|
|
a.stopAppServices()
|
|
|
|
close(a.done)
|
|
}
|
|
|
|
func (a *App) configReload() {
|
|
a.log.Info(logs.SIGHUPConfigReloadStarted)
|
|
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
|
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
|
return
|
|
}
|
|
if err := readInConfig(a.cfg); err != nil {
|
|
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if lvl, err := getLogLevel(a.cfg.GetString(cfgLoggerLevel)); err != nil {
|
|
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
|
|
} else {
|
|
a.logLevel.SetLevel(lvl)
|
|
}
|
|
|
|
a.stopAppServices()
|
|
a.startAppServices()
|
|
|
|
a.settings.update(a.cfg, a.log)
|
|
|
|
a.log.Info(logs.SIGHUPConfigReloadCompleted)
|
|
}
|
|
|
|
func (a *App) startAppServices() {
|
|
a.appServices = a.appServices[:0]
|
|
|
|
pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)}
|
|
pprofService := metrics.NewPprofService(a.log, pprofConfig)
|
|
a.appServices = append(a.appServices, pprofService)
|
|
go pprofService.Start()
|
|
|
|
prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)}
|
|
prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig)
|
|
a.appServices = append(a.appServices, prometheusService)
|
|
go prometheusService.Start()
|
|
}
|
|
|
|
func (a *App) stopAppServices() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
|
|
defer cancel()
|
|
|
|
for _, svc := range a.appServices {
|
|
svc.ShutDown(ctx)
|
|
}
|
|
}
|
|
|
|
func getPools(ctx context.Context, cfg *viper.Viper, logger *zap.Logger, key *keys.PrivateKey) (*pool.Pool, *treepool.Pool) {
|
|
var prm pool.InitParameters
|
|
var prmTree treepool.InitParameters
|
|
|
|
prm.SetKey(&key.PrivateKey)
|
|
prmTree.SetKey(key)
|
|
|
|
for _, peer := range fetchPeers(cfg, logger) {
|
|
prm.AddNode(peer)
|
|
prmTree.AddNode(peer)
|
|
}
|
|
|
|
connTimeout := fetchConnectTimeout(cfg)
|
|
prm.SetNodeDialTimeout(connTimeout)
|
|
prmTree.SetNodeDialTimeout(connTimeout)
|
|
|
|
streamTimeout := fetchStreamTimeout(cfg)
|
|
prm.SetNodeStreamTimeout(streamTimeout)
|
|
prmTree.SetNodeStreamTimeout(streamTimeout)
|
|
|
|
healthCheckTimeout := fetchHealthCheckTimeout(cfg)
|
|
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
|
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
|
|
|
rebalanceInterval := fetchRebalanceInterval(cfg)
|
|
prm.SetClientRebalanceInterval(rebalanceInterval)
|
|
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
|
|
|
errorThreshold := fetchErrorThreshold(cfg)
|
|
prm.SetErrorThreshold(errorThreshold)
|
|
prm.SetLogger(logger)
|
|
prmTree.SetLogger(logger)
|
|
|
|
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgFrostFSTreePoolMaxAttempts))
|
|
|
|
p, err := pool.NewPool(prm)
|
|
if err != nil {
|
|
logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
|
|
}
|
|
|
|
if err = p.Dial(ctx); err != nil {
|
|
logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
|
}
|
|
|
|
treePool, err := treepool.NewPool(prmTree)
|
|
if err != nil {
|
|
logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
|
|
}
|
|
if err = treePool.Dial(ctx); err != nil {
|
|
logger.Fatal(logs.FailedToDialTreePool, zap.Error(err))
|
|
}
|
|
|
|
return p, treePool
|
|
}
|