systemd notify protocol #810

Merged
fyrchik merged 5 commits from elebedeva/frostfs-node:sd-notify into master 2024-09-04 19:51:04 +00:00
12 changed files with 144 additions and 6 deletions

View file

@ -62,6 +62,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
@ -361,6 +362,8 @@ type internals struct {
healthStatus *atomic.Int32
// is node under maintenance
isMaintenance atomic.Bool
sdNotify bool
}
// starts node's maintenance.
@ -632,9 +635,18 @@ func initInternals(appCfg *config.Config, log *logger.Logger) internals {
log: log,
apiVersion: version.Current(),
healthStatus: &healthStatus,
sdNotify: initSdNotify(appCfg),
}
}
func initSdNotify(appCfg *config.Config) bool {
if config.BoolSafe(appCfg.Sub("systemdnotify"), "enabled") {
fatalOnErr(sdnotify.InitSocket())
return true
}
return false
}
func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkState, relayOnly bool) shared {
var netAddr network.AddressGroup

View file

@ -2,6 +2,7 @@ package main
import (
"context"
"fmt"
"net"
controlconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/control"
@ -9,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -83,12 +85,14 @@ func (c *cfg) NetmapStatus() control.NetmapStatus {
}
func (c *cfg) setHealthStatus(st control.HealthStatus) {
c.notifySystemd(st)
c.healthStatus.Store(int32(st))
c.metricsCollector.State().SetHealth(int32(st))
}
func (c *cfg) compareAndSwapHealthStatus(oldSt, newSt control.HealthStatus) (swapped bool) {
if swapped = c.healthStatus.CompareAndSwap(int32(oldSt), int32(newSt)); swapped {
c.notifySystemd(newSt)
c.metricsCollector.State().SetHealth(int32(newSt))
}
return
@ -96,6 +100,7 @@ func (c *cfg) compareAndSwapHealthStatus(oldSt, newSt control.HealthStatus) (swa
func (c *cfg) swapHealthStatus(st control.HealthStatus) (old control.HealthStatus) {
old = control.HealthStatus(c.healthStatus.Swap(int32(st)))
c.notifySystemd(st)
c.metricsCollector.State().SetHealth(int32(st))
return
}
@ -103,3 +108,23 @@ func (c *cfg) swapHealthStatus(st control.HealthStatus) (old control.HealthStatu
func (c *cfg) HealthStatus() control.HealthStatus {
return control.HealthStatus(c.healthStatus.Load())
}
func (c *cfg) notifySystemd(st control.HealthStatus) {
if !c.sdNotify {
return
}
var err error
switch st {
case control.HealthStatus_READY:
err = sdnotify.FlagAndStatus(sdnotify.ReadyEnabled)
case control.HealthStatus_SHUTTING_DOWN:
err = sdnotify.FlagAndStatus(sdnotify.StoppingEnabled)
case control.HealthStatus_RECONFIGURING:
err = sdnotify.FlagAndStatus(sdnotify.ReloadingEnabled)
default:
err = sdnotify.Status(fmt.Sprintf("%v", st))
}
if err != nil {
c.log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
}
}

View file

@ -23,7 +23,8 @@ var _ engine.LocalOverrideEngine = (*accessPolicyEngine)(nil)
func newAccessPolicyEngine(
morphChainStorage engine.MorphRuleChainStorage,
localOverrideDatabase chainbase.LocalOverrideDatabase) *accessPolicyEngine {
localOverrideDatabase chainbase.LocalOverrideDatabase,
) *accessPolicyEngine {
return &accessPolicyEngine{
chainRouter: engine.NewDefaultChainRouterWithLocalOverrides(
morphChainStorage,

View file

@ -119,3 +119,6 @@ prometheus:
enabled: true
address: localhost:9090 # Endpoint for application prometheus metrics; disabled by default
shutdown_timeout: 30s # Timeout for metrics HTTP server graceful shutdown
systemdnotify:
enabled: true

View file

@ -1,6 +1,9 @@
logger:
level: debug # logger level: one of "debug", "info" (default), "warn", "error", "dpanic", "panic", "fatal"
systemdnotify:
enabled: true
pprof:
enabled: true
address: localhost:6060 # endpoint for Node profiling

View file

@ -3,7 +3,8 @@ Description=FrostFS InnerRing node
Requires=network.target
[Service]
Type=simple
Type=notify
NotifyAccess=all
ExecStart=/usr/bin/frostfs-ir --config /etc/frostfs/ir/config.yml
User=frostfs-ir
Group=frostfs-ir

View file

@ -3,7 +3,8 @@ Description=FrostFS Storage node
Requires=network.target
[Service]
Type=simple
Type=notify
NotifyAccess=all
ExecStart=/usr/bin/frostfs-node --config /etc/frostfs/storage/config.yml
User=frostfs-storage
Group=frostfs-storage

View file

@ -554,4 +554,5 @@ const (
BlobovniczaSavingCountersToMetaSuccess = "saving counters to blobovnicza's meta completed successfully"
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache"
FailedToReportStatusToSystemd = "failed to report status to systemd"
)

View file

@ -22,9 +22,7 @@ type boltLocalOverrideStorage struct {
db *bbolt.DB
}
var (
chainBucket = []byte{0}
)
var chainBucket = []byte{0}
var (
ErrChainBucketNotFound = logicerr.New("chain root bucket has not been found")

View file

@ -24,6 +24,7 @@ import (
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/precision"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
@ -73,6 +74,7 @@ type (
predefinedValidators keys.PublicKeys
initialEpochTickDelta uint32
withoutMainNet bool
sdNotify bool
// runtime processors
netmapProcessor *netmap.Processor
@ -336,6 +338,11 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
irMetrics: metrics,
}
server.sdNotify, err = server.initSdNotify(cfg)
if err != nil {
return nil, err
}
server.setHealthStatus(control.HealthStatus_HEALTH_STATUS_UNDEFINED)
// parse notary support
@ -404,6 +411,13 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return server, nil
}
func (s *Server) initSdNotify(cfg *viper.Viper) (bool, error) {
if cfg.GetBool("systemdnotify.enabled") {
return true, sdnotify.InitSocket()
}
return false, nil
}
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
var (
sub subscriber.Subscriber

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/spf13/viper"
@ -154,6 +155,7 @@ func (s *Server) ResetEpochTimer(h uint32) error {
func (s *Server) setHealthStatus(hs control.HealthStatus) {
s.healthStatus.Store(int32(hs))
s.notifySystemd(hs)
if s.irMetrics != nil {
s.irMetrics.SetHealth(int32(hs))
}
@ -173,3 +175,21 @@ func initPersistentStateStorage(cfg *viper.Viper) (*state.PersistentStorage, err
return persistStorage, nil
}
func (s *Server) notifySystemd(st control.HealthStatus) {
if !s.sdNotify {
return
}
var err error
switch st {
case control.HealthStatus_READY:
err = sdnotify.FlagAndStatus(sdnotify.ReadyEnabled)
case control.HealthStatus_SHUTTING_DOWN:
err = sdnotify.FlagAndStatus(sdnotify.StoppingEnabled)
default:
err = sdnotify.Status(fmt.Sprintf("%v", st))
}
if err != nil {
s.log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
}
}

View file

@ -0,0 +1,59 @@
package sdnotify
import (
"fmt"
"net"
"os"
"strings"
)
const (
ReadyEnabled = "READY=1"
StoppingEnabled = "STOPPING=1"
ReloadingEnabled = "RELOADING=1"
)
var socket *net.UnixAddr
// Initializes socket with provided name of
// environment variable.
func InitSocket() error {
notifySocket := os.Getenv("NOTIFY_SOCKET")
if notifySocket == "" {
return fmt.Errorf("\"NOTIFY_SOCKET\" environment variable is not present")
}
socket = &net.UnixAddr{
Name: notifySocket,
Net: "unixgram",
}
return nil
}
// FlagAndStatus sends systemd a combination of a
// well-known status and STATUS=%s{status}, separated by newline.
func FlagAndStatus(status string) error {
status += "\nSTATUS=" + strings.TrimSuffix(status, "=1")
return Send(status)
}
// Status sends systemd notify STATUS=%s{status}.
func Status(status string) error {
return Send(fmt.Sprintf("STATUS=%s", status))
}
// Send state through the notify socket if any.
// If the notify socket was not detected, it returns an error.
func Send(state string) error {
if socket == nil {
return fmt.Errorf("socket is not initialized")
}
conn, err := net.DialUnix(socket.Net, nil, socket)
if err != nil {
return fmt.Errorf("can't open unix socket: %v", err)
}
defer conn.Close()
if _, err = conn.Write([]byte(state)); err != nil {
return fmt.Errorf("can't write into the unix socket: %v", err)
}
return nil
}