systemd notify protocol #810
12 changed files with 144 additions and 6 deletions
|
@ -62,6 +62,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
|
@ -361,6 +362,8 @@ type internals struct {
|
|||
healthStatus *atomic.Int32
|
||||
// is node under maintenance
|
||||
isMaintenance atomic.Bool
|
||||
|
||||
sdNotify bool
|
||||
}
|
||||
|
||||
// starts node's maintenance.
|
||||
|
@ -632,9 +635,18 @@ func initInternals(appCfg *config.Config, log *logger.Logger) internals {
|
|||
log: log,
|
||||
apiVersion: version.Current(),
|
||||
healthStatus: &healthStatus,
|
||||
sdNotify: initSdNotify(appCfg),
|
||||
}
|
||||
}
|
||||
|
||||
func initSdNotify(appCfg *config.Config) bool {
|
||||
if config.BoolSafe(appCfg.Sub("systemdnotify"), "enabled") {
|
||||
fatalOnErr(sdnotify.InitSocket())
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkState, relayOnly bool) shared {
|
||||
var netAddr network.AddressGroup
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
controlconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/control"
|
||||
|
@ -9,6 +10,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
@ -83,12 +85,14 @@ func (c *cfg) NetmapStatus() control.NetmapStatus {
|
|||
}
|
||||
|
||||
func (c *cfg) setHealthStatus(st control.HealthStatus) {
|
||||
c.notifySystemd(st)
|
||||
c.healthStatus.Store(int32(st))
|
||||
c.metricsCollector.State().SetHealth(int32(st))
|
||||
}
|
||||
|
||||
func (c *cfg) compareAndSwapHealthStatus(oldSt, newSt control.HealthStatus) (swapped bool) {
|
||||
if swapped = c.healthStatus.CompareAndSwap(int32(oldSt), int32(newSt)); swapped {
|
||||
c.notifySystemd(newSt)
|
||||
c.metricsCollector.State().SetHealth(int32(newSt))
|
||||
}
|
||||
return
|
||||
|
@ -96,6 +100,7 @@ func (c *cfg) compareAndSwapHealthStatus(oldSt, newSt control.HealthStatus) (swa
|
|||
|
||||
func (c *cfg) swapHealthStatus(st control.HealthStatus) (old control.HealthStatus) {
|
||||
old = control.HealthStatus(c.healthStatus.Swap(int32(st)))
|
||||
c.notifySystemd(st)
|
||||
c.metricsCollector.State().SetHealth(int32(st))
|
||||
return
|
||||
}
|
||||
|
@ -103,3 +108,23 @@ func (c *cfg) swapHealthStatus(st control.HealthStatus) (old control.HealthStatu
|
|||
func (c *cfg) HealthStatus() control.HealthStatus {
|
||||
return control.HealthStatus(c.healthStatus.Load())
|
||||
}
|
||||
|
||||
func (c *cfg) notifySystemd(st control.HealthStatus) {
|
||||
if !c.sdNotify {
|
||||
return
|
||||
}
|
||||
var err error
|
||||
switch st {
|
||||
case control.HealthStatus_READY:
|
||||
err = sdnotify.FlagAndStatus(sdnotify.ReadyEnabled)
|
||||
case control.HealthStatus_SHUTTING_DOWN:
|
||||
err = sdnotify.FlagAndStatus(sdnotify.StoppingEnabled)
|
||||
case control.HealthStatus_RECONFIGURING:
|
||||
err = sdnotify.FlagAndStatus(sdnotify.ReloadingEnabled)
|
||||
default:
|
||||
err = sdnotify.Status(fmt.Sprintf("%v", st))
|
||||
}
|
||||
if err != nil {
|
||||
c.log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,8 @@ var _ engine.LocalOverrideEngine = (*accessPolicyEngine)(nil)
|
|||
|
||||
func newAccessPolicyEngine(
|
||||
morphChainStorage engine.MorphRuleChainStorage,
|
||||
localOverrideDatabase chainbase.LocalOverrideDatabase) *accessPolicyEngine {
|
||||
localOverrideDatabase chainbase.LocalOverrideDatabase,
|
||||
) *accessPolicyEngine {
|
||||
return &accessPolicyEngine{
|
||||
chainRouter: engine.NewDefaultChainRouterWithLocalOverrides(
|
||||
morphChainStorage,
|
||||
|
|
|
@ -119,3 +119,6 @@ prometheus:
|
|||
enabled: true
|
||||
address: localhost:9090 # Endpoint for application prometheus metrics; disabled by default
|
||||
shutdown_timeout: 30s # Timeout for metrics HTTP server graceful shutdown
|
||||
|
||||
systemdnotify:
|
||||
enabled: true
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
logger:
|
||||
level: debug # logger level: one of "debug", "info" (default), "warn", "error", "dpanic", "panic", "fatal"
|
||||
|
||||
systemdnotify:
|
||||
enabled: true
|
||||
|
||||
pprof:
|
||||
enabled: true
|
||||
address: localhost:6060 # endpoint for Node profiling
|
||||
|
|
3
debian/frostfs-ir.service
vendored
3
debian/frostfs-ir.service
vendored
|
@ -3,7 +3,8 @@ Description=FrostFS InnerRing node
|
|||
Requires=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
Type=notify
|
||||
NotifyAccess=all
|
||||
ExecStart=/usr/bin/frostfs-ir --config /etc/frostfs/ir/config.yml
|
||||
User=frostfs-ir
|
||||
Group=frostfs-ir
|
||||
|
|
3
debian/frostfs-storage.service
vendored
3
debian/frostfs-storage.service
vendored
|
@ -3,7 +3,8 @@ Description=FrostFS Storage node
|
|||
Requires=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
Type=notify
|
||||
NotifyAccess=all
|
||||
ExecStart=/usr/bin/frostfs-node --config /etc/frostfs/storage/config.yml
|
||||
User=frostfs-storage
|
||||
Group=frostfs-storage
|
||||
|
|
|
@ -554,4 +554,5 @@ const (
|
|||
BlobovniczaSavingCountersToMetaSuccess = "saving counters to blobovnicza's meta completed successfully"
|
||||
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
|
||||
ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache"
|
||||
FailedToReportStatusToSystemd = "failed to report status to systemd"
|
||||
)
|
||||
|
|
|
@ -22,9 +22,7 @@ type boltLocalOverrideStorage struct {
|
|||
db *bbolt.DB
|
||||
}
|
||||
|
||||
var (
|
||||
chainBucket = []byte{0}
|
||||
)
|
||||
var chainBucket = []byte{0}
|
||||
|
||||
var (
|
||||
ErrChainBucketNotFound = logicerr.New("chain root bucket has not been found")
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/precision"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||
|
@ -73,6 +74,7 @@ type (
|
|||
predefinedValidators keys.PublicKeys
|
||||
initialEpochTickDelta uint32
|
||||
withoutMainNet bool
|
||||
sdNotify bool
|
||||
|
||||
// runtime processors
|
||||
netmapProcessor *netmap.Processor
|
||||
|
@ -336,6 +338,11 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
|||
irMetrics: metrics,
|
||||
}
|
||||
|
||||
server.sdNotify, err = server.initSdNotify(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
server.setHealthStatus(control.HealthStatus_HEALTH_STATUS_UNDEFINED)
|
||||
|
||||
// parse notary support
|
||||
|
@ -404,6 +411,13 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
|||
return server, nil
|
||||
}
|
||||
|
||||
func (s *Server) initSdNotify(cfg *viper.Viper) (bool, error) {
|
||||
if cfg.GetBool("systemdnotify.enabled") {
|
||||
return true, sdnotify.InitSocket()
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
|
||||
var (
|
||||
sub subscriber.Subscriber
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
||||
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/spf13/viper"
|
||||
|
@ -154,6 +155,7 @@ func (s *Server) ResetEpochTimer(h uint32) error {
|
|||
|
||||
func (s *Server) setHealthStatus(hs control.HealthStatus) {
|
||||
s.healthStatus.Store(int32(hs))
|
||||
s.notifySystemd(hs)
|
||||
if s.irMetrics != nil {
|
||||
s.irMetrics.SetHealth(int32(hs))
|
||||
}
|
||||
|
@ -173,3 +175,21 @@ func initPersistentStateStorage(cfg *viper.Viper) (*state.PersistentStorage, err
|
|||
|
||||
return persistStorage, nil
|
||||
}
|
||||
|
||||
func (s *Server) notifySystemd(st control.HealthStatus) {
|
||||
if !s.sdNotify {
|
||||
return
|
||||
}
|
||||
var err error
|
||||
switch st {
|
||||
case control.HealthStatus_READY:
|
||||
err = sdnotify.FlagAndStatus(sdnotify.ReadyEnabled)
|
||||
case control.HealthStatus_SHUTTING_DOWN:
|
||||
err = sdnotify.FlagAndStatus(sdnotify.StoppingEnabled)
|
||||
default:
|
||||
err = sdnotify.Status(fmt.Sprintf("%v", st))
|
||||
}
|
||||
if err != nil {
|
||||
s.log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
|
59
pkg/util/sdnotify/sdnotify.go
Normal file
59
pkg/util/sdnotify/sdnotify.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
package sdnotify
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
ReadyEnabled = "READY=1"
|
||||
StoppingEnabled = "STOPPING=1"
|
||||
ReloadingEnabled = "RELOADING=1"
|
||||
)
|
||||
|
||||
var socket *net.UnixAddr
|
||||
|
||||
// Initializes socket with provided name of
|
||||
// environment variable.
|
||||
func InitSocket() error {
|
||||
notifySocket := os.Getenv("NOTIFY_SOCKET")
|
||||
if notifySocket == "" {
|
||||
return fmt.Errorf("\"NOTIFY_SOCKET\" environment variable is not present")
|
||||
}
|
||||
socket = &net.UnixAddr{
|
||||
Name: notifySocket,
|
||||
Net: "unixgram",
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FlagAndStatus sends systemd a combination of a
|
||||
// well-known status and STATUS=%s{status}, separated by newline.
|
||||
func FlagAndStatus(status string) error {
|
||||
status += "\nSTATUS=" + strings.TrimSuffix(status, "=1")
|
||||
return Send(status)
|
||||
}
|
||||
|
||||
// Status sends systemd notify STATUS=%s{status}.
|
||||
func Status(status string) error {
|
||||
return Send(fmt.Sprintf("STATUS=%s", status))
|
||||
}
|
||||
|
||||
// Send state through the notify socket if any.
|
||||
// If the notify socket was not detected, it returns an error.
|
||||
func Send(state string) error {
|
||||
if socket == nil {
|
||||
return fmt.Errorf("socket is not initialized")
|
||||
}
|
||||
conn, err := net.DialUnix(socket.Net, nil, socket)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open unix socket: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
if _, err = conn.Write([]byte(state)); err != nil {
|
||||
return fmt.Errorf("can't write into the unix socket: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Add table
Reference in a new issue