systemd notify protocol #810
12 changed files with 144 additions and 6 deletions
|
@ -62,6 +62,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
@ -361,6 +362,8 @@ type internals struct {
|
||||||
healthStatus *atomic.Int32
|
healthStatus *atomic.Int32
|
||||||
// is node under maintenance
|
// is node under maintenance
|
||||||
isMaintenance atomic.Bool
|
isMaintenance atomic.Bool
|
||||||
|
|
||||||
|
sdNotify bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// starts node's maintenance.
|
// starts node's maintenance.
|
||||||
|
@ -632,9 +635,18 @@ func initInternals(appCfg *config.Config, log *logger.Logger) internals {
|
||||||
log: log,
|
log: log,
|
||||||
apiVersion: version.Current(),
|
apiVersion: version.Current(),
|
||||||
healthStatus: &healthStatus,
|
healthStatus: &healthStatus,
|
||||||
|
sdNotify: initSdNotify(appCfg),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func initSdNotify(appCfg *config.Config) bool {
|
||||||
|
if config.BoolSafe(appCfg.Sub("systemdnotify"), "enabled") {
|
||||||
|
fatalOnErr(sdnotify.InitSocket())
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkState, relayOnly bool) shared {
|
func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkState, relayOnly bool) shared {
|
||||||
var netAddr network.AddressGroup
|
var netAddr network.AddressGroup
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
controlconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/control"
|
controlconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/control"
|
||||||
|
@ -9,6 +10,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
|
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
@ -83,12 +85,14 @@ func (c *cfg) NetmapStatus() control.NetmapStatus {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) setHealthStatus(st control.HealthStatus) {
|
func (c *cfg) setHealthStatus(st control.HealthStatus) {
|
||||||
|
c.notifySystemd(st)
|
||||||
c.healthStatus.Store(int32(st))
|
c.healthStatus.Store(int32(st))
|
||||||
c.metricsCollector.State().SetHealth(int32(st))
|
c.metricsCollector.State().SetHealth(int32(st))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) compareAndSwapHealthStatus(oldSt, newSt control.HealthStatus) (swapped bool) {
|
func (c *cfg) compareAndSwapHealthStatus(oldSt, newSt control.HealthStatus) (swapped bool) {
|
||||||
if swapped = c.healthStatus.CompareAndSwap(int32(oldSt), int32(newSt)); swapped {
|
if swapped = c.healthStatus.CompareAndSwap(int32(oldSt), int32(newSt)); swapped {
|
||||||
|
c.notifySystemd(newSt)
|
||||||
c.metricsCollector.State().SetHealth(int32(newSt))
|
c.metricsCollector.State().SetHealth(int32(newSt))
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -96,6 +100,7 @@ func (c *cfg) compareAndSwapHealthStatus(oldSt, newSt control.HealthStatus) (swa
|
||||||
|
|
||||||
func (c *cfg) swapHealthStatus(st control.HealthStatus) (old control.HealthStatus) {
|
func (c *cfg) swapHealthStatus(st control.HealthStatus) (old control.HealthStatus) {
|
||||||
old = control.HealthStatus(c.healthStatus.Swap(int32(st)))
|
old = control.HealthStatus(c.healthStatus.Swap(int32(st)))
|
||||||
|
c.notifySystemd(st)
|
||||||
c.metricsCollector.State().SetHealth(int32(st))
|
c.metricsCollector.State().SetHealth(int32(st))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -103,3 +108,23 @@ func (c *cfg) swapHealthStatus(st control.HealthStatus) (old control.HealthStatu
|
||||||
func (c *cfg) HealthStatus() control.HealthStatus {
|
func (c *cfg) HealthStatus() control.HealthStatus {
|
||||||
return control.HealthStatus(c.healthStatus.Load())
|
return control.HealthStatus(c.healthStatus.Load())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cfg) notifySystemd(st control.HealthStatus) {
|
||||||
|
if !c.sdNotify {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
switch st {
|
||||||
|
case control.HealthStatus_READY:
|
||||||
|
err = sdnotify.FlagAndStatus(sdnotify.ReadyEnabled)
|
||||||
|
case control.HealthStatus_SHUTTING_DOWN:
|
||||||
|
err = sdnotify.FlagAndStatus(sdnotify.StoppingEnabled)
|
||||||
|
case control.HealthStatus_RECONFIGURING:
|
||||||
|
err = sdnotify.FlagAndStatus(sdnotify.ReloadingEnabled)
|
||||||
|
default:
|
||||||
|
err = sdnotify.Status(fmt.Sprintf("%v", st))
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
c.log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -23,7 +23,8 @@ var _ engine.LocalOverrideEngine = (*accessPolicyEngine)(nil)
|
||||||
|
|
||||||
func newAccessPolicyEngine(
|
func newAccessPolicyEngine(
|
||||||
morphChainStorage engine.MorphRuleChainStorage,
|
morphChainStorage engine.MorphRuleChainStorage,
|
||||||
localOverrideDatabase chainbase.LocalOverrideDatabase) *accessPolicyEngine {
|
localOverrideDatabase chainbase.LocalOverrideDatabase,
|
||||||
|
) *accessPolicyEngine {
|
||||||
return &accessPolicyEngine{
|
return &accessPolicyEngine{
|
||||||
chainRouter: engine.NewDefaultChainRouterWithLocalOverrides(
|
chainRouter: engine.NewDefaultChainRouterWithLocalOverrides(
|
||||||
morphChainStorage,
|
morphChainStorage,
|
||||||
|
|
|
@ -119,3 +119,6 @@ prometheus:
|
||||||
enabled: true
|
enabled: true
|
||||||
address: localhost:9090 # Endpoint for application prometheus metrics; disabled by default
|
address: localhost:9090 # Endpoint for application prometheus metrics; disabled by default
|
||||||
shutdown_timeout: 30s # Timeout for metrics HTTP server graceful shutdown
|
shutdown_timeout: 30s # Timeout for metrics HTTP server graceful shutdown
|
||||||
|
|
||||||
|
systemdnotify:
|
||||||
|
enabled: true
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
logger:
|
logger:
|
||||||
level: debug # logger level: one of "debug", "info" (default), "warn", "error", "dpanic", "panic", "fatal"
|
level: debug # logger level: one of "debug", "info" (default), "warn", "error", "dpanic", "panic", "fatal"
|
||||||
|
|
||||||
|
systemdnotify:
|
||||||
|
enabled: true
|
||||||
|
|
||||||
pprof:
|
pprof:
|
||||||
enabled: true
|
enabled: true
|
||||||
address: localhost:6060 # endpoint for Node profiling
|
address: localhost:6060 # endpoint for Node profiling
|
||||||
|
|
3
debian/frostfs-ir.service
vendored
3
debian/frostfs-ir.service
vendored
|
@ -3,7 +3,8 @@ Description=FrostFS InnerRing node
|
||||||
Requires=network.target
|
Requires=network.target
|
||||||
|
|
||||||
[Service]
|
[Service]
|
||||||
Type=simple
|
Type=notify
|
||||||
|
NotifyAccess=all
|
||||||
ExecStart=/usr/bin/frostfs-ir --config /etc/frostfs/ir/config.yml
|
ExecStart=/usr/bin/frostfs-ir --config /etc/frostfs/ir/config.yml
|
||||||
User=frostfs-ir
|
User=frostfs-ir
|
||||||
Group=frostfs-ir
|
Group=frostfs-ir
|
||||||
|
|
3
debian/frostfs-storage.service
vendored
3
debian/frostfs-storage.service
vendored
|
@ -3,7 +3,8 @@ Description=FrostFS Storage node
|
||||||
Requires=network.target
|
Requires=network.target
|
||||||
|
|
||||||
[Service]
|
[Service]
|
||||||
Type=simple
|
Type=notify
|
||||||
|
NotifyAccess=all
|
||||||
ExecStart=/usr/bin/frostfs-node --config /etc/frostfs/storage/config.yml
|
ExecStart=/usr/bin/frostfs-node --config /etc/frostfs/storage/config.yml
|
||||||
User=frostfs-storage
|
User=frostfs-storage
|
||||||
Group=frostfs-storage
|
Group=frostfs-storage
|
||||||
|
|
|
@ -554,4 +554,5 @@ const (
|
||||||
BlobovniczaSavingCountersToMetaSuccess = "saving counters to blobovnicza's meta completed successfully"
|
BlobovniczaSavingCountersToMetaSuccess = "saving counters to blobovnicza's meta completed successfully"
|
||||||
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
|
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
|
||||||
ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache"
|
ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache"
|
||||||
|
FailedToReportStatusToSystemd = "failed to report status to systemd"
|
||||||
)
|
)
|
||||||
|
|
|
@ -22,9 +22,7 @@ type boltLocalOverrideStorage struct {
|
||||||
db *bbolt.DB
|
db *bbolt.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var chainBucket = []byte{0}
|
||||||
chainBucket = []byte{0}
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrChainBucketNotFound = logicerr.New("chain root bucket has not been found")
|
ErrChainBucketNotFound = logicerr.New("chain root bucket has not been found")
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/precision"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/precision"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
|
@ -73,6 +74,7 @@ type (
|
||||||
predefinedValidators keys.PublicKeys
|
predefinedValidators keys.PublicKeys
|
||||||
initialEpochTickDelta uint32
|
initialEpochTickDelta uint32
|
||||||
withoutMainNet bool
|
withoutMainNet bool
|
||||||
|
sdNotify bool
|
||||||
|
|
||||||
// runtime processors
|
// runtime processors
|
||||||
netmapProcessor *netmap.Processor
|
netmapProcessor *netmap.Processor
|
||||||
|
@ -336,6 +338,11 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
||||||
irMetrics: metrics,
|
irMetrics: metrics,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
server.sdNotify, err = server.initSdNotify(cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
server.setHealthStatus(control.HealthStatus_HEALTH_STATUS_UNDEFINED)
|
server.setHealthStatus(control.HealthStatus_HEALTH_STATUS_UNDEFINED)
|
||||||
|
|
||||||
// parse notary support
|
// parse notary support
|
||||||
|
@ -404,6 +411,13 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
||||||
return server, nil
|
return server, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) initSdNotify(cfg *viper.Viper) (bool, error) {
|
||||||
|
if cfg.GetBool("systemdnotify.enabled") {
|
||||||
|
return true, sdnotify.InitSocket()
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
|
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
|
||||||
var (
|
var (
|
||||||
sub subscriber.Subscriber
|
sub subscriber.Subscriber
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
||||||
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
@ -154,6 +155,7 @@ func (s *Server) ResetEpochTimer(h uint32) error {
|
||||||
|
|
||||||
func (s *Server) setHealthStatus(hs control.HealthStatus) {
|
func (s *Server) setHealthStatus(hs control.HealthStatus) {
|
||||||
s.healthStatus.Store(int32(hs))
|
s.healthStatus.Store(int32(hs))
|
||||||
|
s.notifySystemd(hs)
|
||||||
if s.irMetrics != nil {
|
if s.irMetrics != nil {
|
||||||
s.irMetrics.SetHealth(int32(hs))
|
s.irMetrics.SetHealth(int32(hs))
|
||||||
}
|
}
|
||||||
|
@ -173,3 +175,21 @@ func initPersistentStateStorage(cfg *viper.Viper) (*state.PersistentStorage, err
|
||||||
|
|
||||||
return persistStorage, nil
|
return persistStorage, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) notifySystemd(st control.HealthStatus) {
|
||||||
|
if !s.sdNotify {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
switch st {
|
||||||
|
case control.HealthStatus_READY:
|
||||||
|
err = sdnotify.FlagAndStatus(sdnotify.ReadyEnabled)
|
||||||
|
case control.HealthStatus_SHUTTING_DOWN:
|
||||||
|
err = sdnotify.FlagAndStatus(sdnotify.StoppingEnabled)
|
||||||
|
default:
|
||||||
|
err = sdnotify.Status(fmt.Sprintf("%v", st))
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
s.log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
59
pkg/util/sdnotify/sdnotify.go
Normal file
59
pkg/util/sdnotify/sdnotify.go
Normal file
|
@ -0,0 +1,59 @@
|
||||||
|
package sdnotify
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
ReadyEnabled = "READY=1"
|
||||||
|
StoppingEnabled = "STOPPING=1"
|
||||||
|
ReloadingEnabled = "RELOADING=1"
|
||||||
|
)
|
||||||
|
|
||||||
|
var socket *net.UnixAddr
|
||||||
|
|
||||||
|
// Initializes socket with provided name of
|
||||||
|
// environment variable.
|
||||||
|
func InitSocket() error {
|
||||||
|
notifySocket := os.Getenv("NOTIFY_SOCKET")
|
||||||
|
if notifySocket == "" {
|
||||||
|
return fmt.Errorf("\"NOTIFY_SOCKET\" environment variable is not present")
|
||||||
|
}
|
||||||
|
socket = &net.UnixAddr{
|
||||||
|
Name: notifySocket,
|
||||||
|
Net: "unixgram",
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FlagAndStatus sends systemd a combination of a
|
||||||
|
// well-known status and STATUS=%s{status}, separated by newline.
|
||||||
|
func FlagAndStatus(status string) error {
|
||||||
|
status += "\nSTATUS=" + strings.TrimSuffix(status, "=1")
|
||||||
|
return Send(status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Status sends systemd notify STATUS=%s{status}.
|
||||||
|
func Status(status string) error {
|
||||||
|
return Send(fmt.Sprintf("STATUS=%s", status))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send state through the notify socket if any.
|
||||||
|
// If the notify socket was not detected, it returns an error.
|
||||||
|
func Send(state string) error {
|
||||||
|
if socket == nil {
|
||||||
|
return fmt.Errorf("socket is not initialized")
|
||||||
|
}
|
||||||
|
conn, err := net.DialUnix(socket.Net, nil, socket)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't open unix socket: %v", err)
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
if _, err = conn.Write([]byte(state)); err != nil {
|
||||||
|
return fmt.Errorf("can't write into the unix socket: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue