package main import ( "context" "fmt" "net" controlconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/control" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify" "go.uber.org/zap" "google.golang.org/grpc" ) const serviceNameControl = "control" func initControlService(c *cfg) { endpoint := controlconfig.GRPC(c.appCfg).Endpoint() if endpoint == controlconfig.GRPCEndpointDefault { return } pubs := controlconfig.AuthorizedKeys(c.appCfg) rawPubs := make([][]byte, 0, len(pubs)+1) // +1 for node key rawPubs = append(rawPubs, c.key.PublicKey().Bytes()) for i := range pubs { rawPubs = append(rawPubs, pubs[i].Bytes()) } ctlSvc := controlSvc.New( controlSvc.WithKey(&c.key.PrivateKey), controlSvc.WithAuthorizedKeys(rawPubs), controlSvc.WithHealthChecker(c), controlSvc.WithNetMapSource(c.netMapSource), controlSvc.WithContainerSource(c.cfgObject.cnrSource), controlSvc.WithReplicator(c.replicator), controlSvc.WithNodeState(c), controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage), controlSvc.WithTreeService(c.treeService), controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine), ) lis, err := net.Listen("tcp", endpoint) if err != nil { c.log.Error(logs.FrostFSNodeCantListenGRPCEndpointControl, zap.Error(err)) return } c.cfgControlService.server = grpc.NewServer() c.onShutdown(func() { stopGRPC("FrostFS Control API", c.cfgControlService.server, c.log) }) control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc) c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { runAndLog(ctx, c, serviceNameControl, false, func(context.Context, *cfg) { c.log.Info(logs.FrostFSNodeStartListeningEndpoint, zap.String("service", serviceNameControl), zap.String("endpoint", endpoint)) fatalOnErr(c.cfgControlService.server.Serve(lis)) }) })) } func (c *cfg) NetmapStatus() control.NetmapStatus { return c.cfgNetmap.state.controlNetmapStatus() } func (c *cfg) setHealthStatus(st control.HealthStatus) { c.notifySystemd(st) c.healthStatus.Store(int32(st)) c.metricsCollector.State().SetHealth(int32(st)) } func (c *cfg) compareAndSwapHealthStatus(oldSt, newSt control.HealthStatus) (swapped bool) { if swapped = c.healthStatus.CompareAndSwap(int32(oldSt), int32(newSt)); swapped { c.notifySystemd(newSt) c.metricsCollector.State().SetHealth(int32(newSt)) } return } func (c *cfg) swapHealthStatus(st control.HealthStatus) (old control.HealthStatus) { old = control.HealthStatus(c.healthStatus.Swap(int32(st))) c.notifySystemd(st) c.metricsCollector.State().SetHealth(int32(st)) return } func (c *cfg) HealthStatus() control.HealthStatus { return control.HealthStatus(c.healthStatus.Load()) } func (c *cfg) notifySystemd(st control.HealthStatus) { if !c.sdNotify { return } var err error switch st { case control.HealthStatus_READY: err = sdnotify.FlagAndStatus(sdnotify.ReadyEnabled) case control.HealthStatus_SHUTTING_DOWN: err = sdnotify.FlagAndStatus(sdnotify.StoppingEnabled) case control.HealthStatus_RECONFIGURING: err = sdnotify.FlagAndStatus(sdnotify.ReloadingEnabled) default: err = sdnotify.Status(fmt.Sprintf("%v", st)) } if err != nil { c.log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err)) } }