frostfs-s3-lifecycler/cmd/s3-lifecycler/app.go
Denis Kirillov d78861b148 [#2] Add FrostFS new epoch trigger
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-07-09 09:40:11 +03:00

214 lines
5.7 KiB
Go

package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/notificator"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/resolver"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/spf13/viper"
"go.uber.org/zap"
)
type (
App struct {
log *zap.Logger
logLevel zap.AtomicLevel
cfg *viper.Viper
done chan struct{}
appServices []*metrics.Service
appMetrics *metrics.AppMetrics
notificator *notificator.Notificator
}
)
const (
HealthStatusUndefined int32 = 0
HealthStatusStarting int32 = 1
HealthStatusReady int32 = 2
HealthStatusShuttingDown int32 = 3
)
func newApp(ctx context.Context, cfg *viper.Viper, log *Logger) *App {
a := &App{
log: log.logger,
logLevel: log.lvl,
cfg: cfg,
done: make(chan struct{}),
appMetrics: metrics.NewAppMetrics(),
}
a.appMetrics.SetHealth(HealthStatusStarting)
a.init(ctx)
return a
}
func (a *App) init(ctx context.Context) {
key, err := fetchKey(a.cfg)
if err != nil {
a.log.Fatal(logs.FailedToLoadPrivateKey, zap.Error(err))
}
endpoints := fetchMorphEndpoints(a.cfg, a.log)
newListenerFunc := a.getNewListenerFunction(ctx, key, endpoints)
handler := a.getNewEpochHandler()
netmapContract, err := resolver.ResolveContractHash(a.cfg.GetString(cfgMorphContractNetmap), endpoints[0].Address)
if err != nil {
a.log.Fatal(logs.ResolveNetmapContract, zap.Error(err))
}
cfg := notificator.Config{
Handler: handler,
Logger: a.log,
NewListener: newListenerFunc,
NetmapContract: netmapContract,
ReconnectClientsInterval: 30 * time.Second,
}
if a.notificator, err = notificator.New(ctx, cfg); err != nil {
a.log.Fatal(logs.InitNotificator, zap.Error(err))
}
}
func (a *App) getNewListenerFunction(ctx context.Context, key *keys.PrivateKey, endpoints []client.Endpoint) notificator.ListenerCreationFunc {
morphLogger := &logger.Logger{Logger: a.log}
clientOptions := []client.Option{
client.WithLogger(morphLogger),
client.WithEndpoints(endpoints...),
}
return func(connectionLostCb func()) (event.Listener, error) {
options := append([]client.Option{client.WithConnLostCallback(connectionLostCb)}, clientOptions...)
cli, err := client.New(ctx, key, options...)
if err != nil {
return nil, fmt.Errorf("create new client: %w", err)
}
currentBlock, err := cli.BlockCount()
if err != nil {
return nil, fmt.Errorf("get block count: %w", err)
}
subs, err := subscriber.New(ctx, &subscriber.Params{
Log: morphLogger,
StartFromBlock: currentBlock,
Client: cli,
})
if err != nil {
return nil, fmt.Errorf("create subscriber: %w", err)
}
return event.NewListener(event.ListenerParams{
Logger: morphLogger,
Subscriber: subs,
WorkerPoolCapacity: 0, // 0 means "infinite"
})
}
}
func (a *App) getNewEpochHandler() notificator.NewEpochHandler {
return func(_ context.Context, ee notificator.NewEpochEvent) {
// todo (d.kirillov) use real job executor here TrueCloudLab/frostfs-s3-lifecycler#3
fmt.Println("start handler", ee.Epoch)
time.Sleep(30 * time.Second)
fmt.Println("end handler", ee.Epoch)
}
}
func (a *App) Wait() {
a.log.Info(logs.ApplicationStarted,
zap.String("app_name", "frostfs-s3-lifecycler"),
zap.String("version", Version))
a.appMetrics.SetHealth(HealthStatusReady)
a.appMetrics.SetVersion(Version)
<-a.done
a.log.Info(logs.ApplicationStopped)
}
func (a *App) Serve(ctx context.Context) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP)
a.startAppServices()
go a.notificator.Start(ctx)
loop:
for {
select {
case <-ctx.Done():
break loop
case <-sigs:
a.configReload()
}
}
a.log.Info(logs.StoppingApplication)
a.appMetrics.SetHealth(HealthStatusShuttingDown)
a.stopAppServices()
close(a.done)
}
func (a *App) configReload() {
a.log.Info(logs.SIGHUPConfigReloadStarted)
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
return
}
if err := readInConfig(a.cfg); err != nil {
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
return
}
if lvl, err := getLogLevel(a.cfg.GetString(cfgLoggerLevel)); err != nil {
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
} else {
a.logLevel.SetLevel(lvl)
}
a.stopAppServices()
a.startAppServices()
a.log.Info(logs.SIGHUPConfigReloadCompleted)
}
func (a *App) startAppServices() {
a.appServices = a.appServices[:0]
pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)}
pprofService := metrics.NewPprofService(a.log, pprofConfig)
a.appServices = append(a.appServices, pprofService)
go pprofService.Start()
prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)}
prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig)
a.appServices = append(a.appServices, prometheusService)
go prometheusService.Start()
}
func (a *App) stopAppServices() {
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()
for _, svc := range a.appServices {
svc.ShutDown(ctx)
}
}