frostfs-s3-lifecycler/cmd/s3-lifecycler/app.go

360 lines
9.3 KiB
Go
Raw Permalink Normal View History

package main
import (
"context"
"os"
"os/signal"
"sync"
"syscall"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/lifecycle"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/morph"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/morph/contract"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/notificator"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/spf13/viper"
"go.uber.org/zap"
)
type (
App struct {
log *zap.Logger
logLevel zap.AtomicLevel
key *keys.PrivateKey
cfg *viper.Viper
done chan struct{}
appServices []*metrics.Service
appMetrics *metrics.AppMetrics
notificator *notificator.Notificator
settings *appSettings
}
appSettings struct {
mu sync.RWMutex
serviceKeys []*keys.PublicKey
}
)
const (
HealthStatusUndefined int32 = 0
HealthStatusStarting int32 = 1
HealthStatusReady int32 = 2
HealthStatusShuttingDown int32 = 3
)
func newApp(ctx context.Context, cfg *viper.Viper) *App {
appMetrics := metrics.NewAppMetrics()
log := pickLogger(cfg, appMetrics)
a := &App{
log: log.logger,
logLevel: log.lvl,
cfg: cfg,
done: make(chan struct{}),
appMetrics: appMetrics,
settings: newAppSettings(cfg, log),
}
a.appMetrics.SetHealth(HealthStatusStarting)
a.init(ctx)
return a
}
func (a *App) init(ctx context.Context) {
var err error
a.key, err = fetchKey(a.cfg)
if err != nil {
a.log.Fatal(logs.FailedToLoadPrivateKey, zap.Error(err))
}
endpoints := fetchMorphEndpoints(a.cfg, a.log)
reconnectInterval := fetchMorphReconnectClientsInterval(a.cfg)
clientCfg := morph.Config{
Logger: a.log,
Endpoints: endpoints,
Key: a.key,
ReconnectInterval: reconnectInterval,
DialTimeout: fetchMorphDialTimeout(a.cfg),
}
cli, err := morph.New(ctx, clientCfg)
if err != nil {
a.log.Fatal(logs.FailedToInitMorphClient, zap.Error(err))
}
credSource := fetchCredentialSource(a.cfg, a.log)
frostfsidContract, err := resolver.ResolveContractHash(cli, a.cfg.GetString(cfgMorphContractFrostfsID))
if err != nil {
a.log.Fatal(logs.ResolveFrostfsIDContract, zap.Error(err))
}
ffsidCfg := contract.FrostFSIDConfig{
Client: cli,
ContractHash: frostfsidContract,
}
containerContract, err := resolver.ResolveContractHash(cli, a.cfg.GetString(cfgMorphContractContainer))
if err != nil {
a.log.Fatal(logs.ResolveContainerContract, zap.Error(err))
}
containerCfg := contract.ContainerConfig{
Client: cli,
ContractHash: containerContract,
Log: a.log,
}
objPool, treePool := getPools(ctx, a.cfg, a.log, a.key)
epochCh := make(chan uint64)
go func() {
<-a.done
close(epochCh)
}()
ffs := frostfs.NewFrostFS(objPool, a.log)
tr := tree.NewTree(frostfs.NewTreePoolWrapper(treePool), a.log)
lifecycleCfg := lifecycle.Config{
UserFetcher: contract.NewFrostFSID(ffsidCfg),
ContainerFetcher: contract.NewContainer(containerCfg),
FrostFSFetcher: ffs,
CredentialSource: credSource,
Settings: a.settings,
CurrentLifecycler: a.key,
Logger: a.log,
TreeFetcher: tr,
BufferSize: fetchJobFetcherBuffer(a.cfg),
EpochChannel: epochCh,
}
jobProvider := lifecycle.NewJobProvider(ctx, lifecycleCfg)
executorCfg := lifecycle.ExecutorConfig{
Logger: a.log,
Jobs: jobProvider.Jobs(),
WorkerPoolSize: fetchExecutorPoolSize(a.cfg),
TreeFetcher: tr,
FrostFSFetcher: ffs,
}
executor, err := lifecycle.NewExecutor(ctx, executorCfg)
if err != nil {
a.log.Fatal(logs.ResolveNetmapContract, zap.Error(err))
}
_ = executor // todo consider run with separate method
netmapContract, err := resolver.ResolveContractHash(cli, a.cfg.GetString(cfgMorphContractNetmap))
if err != nil {
a.log.Fatal(logs.ResolveNetmapContract, zap.Error(err))
}
notificatorCfg := notificator.Config{
Handler: func(ctx context.Context, ee notificator.NewEpochEvent) {
a.log.Info(logs.HandlerTriggered, zap.Uint64("epoch", ee.Epoch))
select {
case <-ctx.Done():
a.log.Debug(logs.HandlerContextCanceled, zap.Error(ctx.Err()))
case epochCh <- ee.Epoch:
}
},
Logger: a.log,
NewListenerFn: func(config notificator.ListenerConfig) (notificator.Listener, error) {
lnCfg := notificator.ConfigListener{
Client: cli,
Logger: a.log,
ReconnectInterval: reconnectInterval,
Parser: config.Parser,
Handler: config.Handler,
}
return notificator.NewListener(ctx, lnCfg)
},
NetmapContract: netmapContract,
}
if a.notificator, err = notificator.New(ctx, notificatorCfg); err != nil {
a.log.Fatal(logs.InitNotificator, zap.Error(err))
}
}
func newAppSettings(v *viper.Viper, log *Logger) *appSettings {
s := &appSettings{}
s.update(v, log.logger)
return s
}
func (s *appSettings) update(cfg *viper.Viper, log *zap.Logger) {
svcKeys, svcKeyErr := fetchLifecycleServices(cfg)
if svcKeyErr != nil {
log.Warn(logs.FailedToFetchServicesKeys, zap.Error(svcKeyErr))
}
s.mu.Lock()
defer s.mu.Unlock()
if svcKeyErr == nil {
s.serviceKeys = svcKeys
}
}
func (s *appSettings) ServicesKeys() keys.PublicKeys {
s.mu.RLock()
defer s.mu.RUnlock()
return s.serviceKeys
}
func (a *App) Wait() {
a.log.Info(logs.ApplicationStarted,
zap.String("app_name", "frostfs-s3-lifecycler"),
zap.String("version", Version))
a.appMetrics.SetHealth(HealthStatusReady)
a.appMetrics.SetVersion(Version)
<-a.done
a.log.Info(logs.ApplicationStopped)
}
func (a *App) Serve(ctx context.Context) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP)
a.startAppServices()
go a.notificator.Start(ctx)
loop:
for {
select {
case <-ctx.Done():
break loop
case <-sigs:
a.configReload()
}
}
a.log.Info(logs.StoppingApplication)
a.appMetrics.SetHealth(HealthStatusShuttingDown)
a.stopAppServices()
close(a.done)
}
func (a *App) configReload() {
a.log.Info(logs.SIGHUPConfigReloadStarted)
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
return
}
if err := readInConfig(a.cfg); err != nil {
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
return
}
if lvl, err := getLogLevel(a.cfg.GetString(cfgLoggerLevel)); err != nil {
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
} else {
a.logLevel.SetLevel(lvl)
}
a.stopAppServices()
a.startAppServices()
a.settings.update(a.cfg, a.log)
a.log.Info(logs.SIGHUPConfigReloadCompleted)
}
func (a *App) startAppServices() {
a.appServices = a.appServices[:0]
pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)}
pprofService := metrics.NewPprofService(a.log, pprofConfig)
a.appServices = append(a.appServices, pprofService)
go pprofService.Start()
prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)}
prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig)
a.appServices = append(a.appServices, prometheusService)
go prometheusService.Start()
}
func (a *App) stopAppServices() {
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()
for _, svc := range a.appServices {
svc.ShutDown(ctx)
}
}
func getPools(ctx context.Context, cfg *viper.Viper, logger *zap.Logger, key *keys.PrivateKey) (*pool.Pool, *treepool.Pool) {
var prm pool.InitParameters
var prmTree treepool.InitParameters
prm.SetKey(&key.PrivateKey)
prmTree.SetKey(key)
for _, peer := range fetchPeers(cfg, logger) {
prm.AddNode(peer)
prmTree.AddNode(peer)
}
connTimeout := fetchConnectTimeout(cfg)
prm.SetNodeDialTimeout(connTimeout)
prmTree.SetNodeDialTimeout(connTimeout)
streamTimeout := fetchStreamTimeout(cfg)
prm.SetNodeStreamTimeout(streamTimeout)
prmTree.SetNodeStreamTimeout(streamTimeout)
healthCheckTimeout := fetchHealthCheckTimeout(cfg)
prm.SetHealthcheckTimeout(healthCheckTimeout)
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
rebalanceInterval := fetchRebalanceInterval(cfg)
prm.SetClientRebalanceInterval(rebalanceInterval)
prmTree.SetClientRebalanceInterval(rebalanceInterval)
errorThreshold := fetchErrorThreshold(cfg)
prm.SetErrorThreshold(errorThreshold)
prm.SetLogger(logger)
prmTree.SetLogger(logger)
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgFrostFSTreePoolMaxAttempts))
p, err := pool.NewPool(prm)
if err != nil {
logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
}
if err = p.Dial(ctx); err != nil {
logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
}
treePool, err := treepool.NewPool(prmTree)
if err != nil {
logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
}
if err = treePool.Dial(ctx); err != nil {
logger.Fatal(logs.FailedToDialTreePool, zap.Error(err))
}
return p, treePool
}