frostfs-node/cmd/frostfs-node/main.go
Airat Arifullin 542d3adcb2 [#1105] apemanager: Implement apemanager service
* Introduce grpc server for apemanager service and
  its implementation in `pkg/services/apemanager`.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-05-27 09:34:21 +00:00

180 lines
4.6 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"go.uber.org/zap"
)
const (
// SuccessReturnCode returns when application closed without panic.
SuccessReturnCode = 0
)
// prints err to standard logger and calls os.Exit(1).
func fatalOnErr(err error) {
if err != nil {
log.Fatal(err)
}
}
// prints err with details to standard logger and calls os.Exit(1).
func fatalOnErrDetails(details string, err error) {
if err != nil {
log.Fatal(fmt.Errorf("%s: %w", details, err))
}
}
func main() {
configFile := flag.String("config", "", "path to config")
configDir := flag.String("config-dir", "", "path to config directory")
versionFlag := flag.Bool("version", false, "frostfs node version")
dryRunFlag := flag.Bool("check", false, "validate configuration and exit")
flag.Parse()
if *versionFlag {
fmt.Print(misc.BuildInfo("FrostFS Storage node"))
os.Exit(SuccessReturnCode)
}
appCfg := config.New(*configFile, *configDir, config.EnvPrefix)
err := validateConfig(appCfg)
fatalOnErr(err)
if *dryRunFlag {
return
}
c := initCfg(appCfg)
var ctx context.Context
ctx, c.ctxCancel = context.WithCancel(context.Background())
c.setHealthStatus(control.HealthStatus_STARTING)
initApp(ctx, c)
bootUp(ctx, c)
c.compareAndSwapHealthStatus(control.HealthStatus_STARTING, control.HealthStatus_READY)
wait(c)
}
func initAndLog(c *cfg, name string, initializer func(*cfg)) {
c.log.Info(fmt.Sprintf("initializing %s service...", name))
initializer(c)
c.log.Info(name + " service has been successfully initialized")
}
func initApp(ctx context.Context, c *cfg) {
c.wg.Add(1)
go func() {
c.signalWatcher(ctx)
c.wg.Done()
}()
setRuntimeParameters(c)
metrics, _ := metricsComponent(c)
initAndLog(c, "profiler", initProfilerService)
initAndLog(c, metrics.name, metrics.init)
initAndLog(c, "tracing", func(c *cfg) { initTracing(ctx, c) })
initLocalStorage(ctx, c)
initAndLog(c, "storage engine", func(c *cfg) {
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open(ctx))
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx))
})
initAndLog(c, "gRPC", initGRPC)
initAndLog(c, "netmap", func(c *cfg) { initNetmapService(ctx, c) })
initAccessPolicyEngine(ctx, c)
initAndLog(c, "access policy engine", func(c *cfg) {
fatalOnErr(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalOverrideDatabaseCore().Open(ctx))
fatalOnErr(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalOverrideDatabaseCore().Init())
})
initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) })
initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) })
initAndLog(c, "session", initSessionService)
initAndLog(c, "notification", func(c *cfg) { initNotifications(ctx, c) })
initAndLog(c, "object", initObjectService)
initAndLog(c, "tree", initTreeService)
initAndLog(c, "apemanager", initAPEManagerService)
initAndLog(c, "control", initControlService)
initAndLog(c, "morph notifications", func(c *cfg) { listenMorphNotifications(ctx, c) })
}
func runAndLog(ctx context.Context, c *cfg, name string, logSuccess bool, starter func(context.Context, *cfg)) {
c.log.Info(fmt.Sprintf("starting %s service...", name))
starter(ctx, c)
if logSuccess {
c.log.Info(name + " service started successfully")
}
}
func stopAndLog(c *cfg, name string, stopper func() error) {
c.log.Debug(fmt.Sprintf("shutting down %s service", name))
err := stopper()
if err != nil {
c.log.Debug(fmt.Sprintf("could not shutdown %s server", name),
zap.String("error", err.Error()),
)
}
c.log.Debug(name + " service has been stopped")
}
func bootUp(ctx context.Context, c *cfg) {
runAndLog(ctx, c, "NATS", true, connectNats)
runAndLog(ctx, c, "gRPC", false, func(_ context.Context, c *cfg) { serveGRPC(c) })
runAndLog(ctx, c, "notary", true, makeAndWaitNotaryDeposit)
bootstrapNode(c)
startWorkers(ctx, c)
}
func wait(c *cfg) {
c.log.Info(logs.CommonApplicationStarted,
zap.String("version", misc.Version))
<-c.done // graceful shutdown
drain := &sync.WaitGroup{}
drain.Add(1)
go func() {
defer drain.Done()
for err := range c.internalErr {
c.log.Warn(logs.FrostFSNodeInternalApplicationError,
zap.String("message", err.Error()))
}
}()
c.log.Debug(logs.FrostFSNodeWaitingForAllProcessesToStop)
c.wg.Wait()
close(c.internalErr)
drain.Wait()
}
func (c *cfg) onShutdown(f func()) {
c.closers = append(c.closers, closer{"", f})
}