package main import ( "context" "os" "os/signal" "sync" "syscall" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/lifecycle" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/metrics" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/morph" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/morph/contract" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/notificator" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/resolver" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/spf13/viper" "go.uber.org/zap" ) type ( App struct { log *zap.Logger logLevel zap.AtomicLevel key *keys.PrivateKey cfg *viper.Viper done chan struct{} appServices []*metrics.Service appMetrics *metrics.AppMetrics notificator *notificator.Notificator settings *appSettings } appSettings struct { mu sync.RWMutex serviceKeys []*keys.PublicKey } ) const ( HealthStatusUndefined int32 = 0 HealthStatusStarting int32 = 1 HealthStatusReady int32 = 2 HealthStatusShuttingDown int32 = 3 ) func newApp(ctx context.Context, cfg *viper.Viper) *App { appMetrics := metrics.NewAppMetrics() log := pickLogger(cfg, appMetrics) a := &App{ log: log.logger, logLevel: log.lvl, cfg: cfg, done: make(chan struct{}), appMetrics: appMetrics, settings: newAppSettings(cfg, log), } a.appMetrics.SetHealth(HealthStatusStarting) a.init(ctx) return a } func (a *App) init(ctx context.Context) { var err error a.key, err = fetchKey(a.cfg) if err != nil { a.log.Fatal(logs.FailedToLoadPrivateKey, zap.Error(err)) } endpoints := fetchMorphEndpoints(a.cfg, a.log) reconnectInterval := fetchMorphReconnectClientsInterval(a.cfg) clientCfg := morph.Config{ Logger: a.log, Endpoints: endpoints, Key: a.key, ReconnectInterval: reconnectInterval, DialTimeout: fetchMorphDialTimeout(a.cfg), } cli, err := morph.New(ctx, clientCfg) if err != nil { a.log.Fatal(logs.FailedToInitMorphClient, zap.Error(err)) } credSource := fetchCredentialSource(a.cfg, a.log) frostfsidContract, err := resolver.ResolveContractHash(cli, a.cfg.GetString(cfgMorphContractFrostfsID)) if err != nil { a.log.Fatal(logs.ResolveFrostfsIDContract, zap.Error(err)) } ffsidCfg := contract.FrostFSIDConfig{ Client: cli, ContractHash: frostfsidContract, } containerContract, err := resolver.ResolveContractHash(cli, a.cfg.GetString(cfgMorphContractContainer)) if err != nil { a.log.Fatal(logs.ResolveContainerContract, zap.Error(err)) } containerCfg := contract.ContainerConfig{ Client: cli, ContractHash: containerContract, Log: a.log, } objPool, treePool := getPools(ctx, a.cfg, a.log, a.key) epochCh := make(chan uint64) go func() { <-a.done close(epochCh) }() ffs := frostfs.NewFrostFS(objPool, a.log) tr := tree.NewTree(frostfs.NewTreePoolWrapper(treePool), a.log) lifecycleCfg := lifecycle.Config{ UserFetcher: contract.NewFrostFSID(ffsidCfg), ContainerFetcher: contract.NewContainer(containerCfg), FrostFSFetcher: ffs, CredentialSource: credSource, Settings: a.settings, CurrentLifecycler: a.key, Logger: a.log, TreeFetcher: tr, BufferSize: fetchJobFetcherBuffer(a.cfg), EpochChannel: epochCh, } jobProvider := lifecycle.NewJobProvider(ctx, lifecycleCfg) executorCfg := lifecycle.ExecutorConfig{ Logger: a.log, Jobs: jobProvider.Jobs(), WorkerPoolSize: fetchExecutorPoolSize(a.cfg), TreeFetcher: tr, FrostFSFetcher: ffs, } executor, err := lifecycle.NewExecutor(ctx, executorCfg) if err != nil { a.log.Fatal(logs.ResolveNetmapContract, zap.Error(err)) } _ = executor // todo consider run with separate method netmapContract, err := resolver.ResolveContractHash(cli, a.cfg.GetString(cfgMorphContractNetmap)) if err != nil { a.log.Fatal(logs.ResolveNetmapContract, zap.Error(err)) } notificatorCfg := notificator.Config{ Handler: func(ctx context.Context, ee notificator.NewEpochEvent) { a.log.Info(logs.HandlerTriggered, zap.Uint64("epoch", ee.Epoch)) select { case <-ctx.Done(): a.log.Debug(logs.HandlerContextCanceled, zap.Error(ctx.Err())) case epochCh <- ee.Epoch: } }, Logger: a.log, NewListenerFn: func(config notificator.ListenerConfig) (notificator.Listener, error) { lnCfg := notificator.ConfigListener{ Client: cli, Logger: a.log, ReconnectInterval: reconnectInterval, Parser: config.Parser, Handler: config.Handler, } return notificator.NewListener(ctx, lnCfg) }, NetmapContract: netmapContract, } if a.notificator, err = notificator.New(ctx, notificatorCfg); err != nil { a.log.Fatal(logs.InitNotificator, zap.Error(err)) } } func newAppSettings(v *viper.Viper, log *Logger) *appSettings { s := &appSettings{} s.update(v, log.logger) return s } func (s *appSettings) update(cfg *viper.Viper, log *zap.Logger) { svcKeys, svcKeyErr := fetchLifecycleServices(cfg) if svcKeyErr != nil { log.Warn(logs.FailedToFetchServicesKeys, zap.Error(svcKeyErr)) } s.mu.Lock() defer s.mu.Unlock() if svcKeyErr == nil { s.serviceKeys = svcKeys } } func (s *appSettings) ServicesKeys() keys.PublicKeys { s.mu.RLock() defer s.mu.RUnlock() return s.serviceKeys } func (a *App) Wait() { a.log.Info(logs.ApplicationStarted, zap.String("app_name", "frostfs-s3-lifecycler"), zap.String("version", Version)) a.appMetrics.SetHealth(HealthStatusReady) a.appMetrics.SetVersion(Version) <-a.done a.log.Info(logs.ApplicationStopped) } func (a *App) Serve(ctx context.Context) { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGHUP) a.startAppServices() go a.notificator.Start(ctx) loop: for { select { case <-ctx.Done(): break loop case <-sigs: a.configReload() } } a.log.Info(logs.StoppingApplication) a.appMetrics.SetHealth(HealthStatusShuttingDown) a.stopAppServices() close(a.done) } func (a *App) configReload() { a.log.Info(logs.SIGHUPConfigReloadStarted) if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) { a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed) return } if err := readInConfig(a.cfg); err != nil { a.log.Warn(logs.FailedToReloadConfig, zap.Error(err)) return } if lvl, err := getLogLevel(a.cfg.GetString(cfgLoggerLevel)); err != nil { a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err)) } else { a.logLevel.SetLevel(lvl) } a.stopAppServices() a.startAppServices() a.settings.update(a.cfg, a.log) a.log.Info(logs.SIGHUPConfigReloadCompleted) } func (a *App) startAppServices() { a.appServices = a.appServices[:0] pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)} pprofService := metrics.NewPprofService(a.log, pprofConfig) a.appServices = append(a.appServices, pprofService) go pprofService.Start() prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)} prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig) a.appServices = append(a.appServices, prometheusService) go prometheusService.Start() } func (a *App) stopAppServices() { ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) defer cancel() for _, svc := range a.appServices { svc.ShutDown(ctx) } } func getPools(ctx context.Context, cfg *viper.Viper, logger *zap.Logger, key *keys.PrivateKey) (*pool.Pool, *treepool.Pool) { var prm pool.InitParameters var prmTree treepool.InitParameters prm.SetKey(&key.PrivateKey) prmTree.SetKey(key) for _, peer := range fetchPeers(cfg, logger) { prm.AddNode(peer) prmTree.AddNode(peer) } connTimeout := fetchConnectTimeout(cfg) prm.SetNodeDialTimeout(connTimeout) prmTree.SetNodeDialTimeout(connTimeout) streamTimeout := fetchStreamTimeout(cfg) prm.SetNodeStreamTimeout(streamTimeout) prmTree.SetNodeStreamTimeout(streamTimeout) healthCheckTimeout := fetchHealthCheckTimeout(cfg) prm.SetHealthcheckTimeout(healthCheckTimeout) prmTree.SetHealthcheckTimeout(healthCheckTimeout) rebalanceInterval := fetchRebalanceInterval(cfg) prm.SetClientRebalanceInterval(rebalanceInterval) prmTree.SetClientRebalanceInterval(rebalanceInterval) errorThreshold := fetchErrorThreshold(cfg) prm.SetErrorThreshold(errorThreshold) prm.SetLogger(logger) prmTree.SetLogger(logger) prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgFrostFSTreePoolMaxAttempts)) p, err := pool.NewPool(prm) if err != nil { logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err)) } if err = p.Dial(ctx); err != nil { logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err)) } treePool, err := treepool.NewPool(prmTree) if err != nil { logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err)) } if err = treePool.Dial(ctx); err != nil { logger.Fatal(logs.FailedToDialTreePool, zap.Error(err)) } return p, treePool }