[#1183] node: Use NATS client as notification writer

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-03-05 14:51:45 +03:00 committed by Alex Vanin
parent 1e96f62294
commit 5e90d85020

View file

@ -9,22 +9,12 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/nspcc-dev/neofs-node/pkg/services/notificator" "github.com/nspcc-dev/neofs-node/pkg/services/notificator"
"github.com/nspcc-dev/neofs-node/pkg/services/notificator/nats"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
"go.uber.org/zap" "go.uber.org/zap"
) )
type simpleNotificationWriter struct {
l *zap.Logger
}
func (s simpleNotificationWriter) Notify(topic string, address *addressSDK.Address) {
s.l.Debug("got notification to write",
zap.String("topic", topic),
zap.Stringer("address", address),
)
}
type notificationSource struct { type notificationSource struct {
e *engine.StorageEngine e *engine.StorageEngine
l *zap.Logger l *zap.Logger
@ -101,12 +91,44 @@ func (n *notificationSource) processAddress(
return nil return nil
} }
type notificationWriter struct {
l *zap.Logger
w *nats.Writer
}
func (n notificationWriter) Notify(topic string, address *addressSDK.Address) {
if err := n.w.Notify(topic, address); err != nil {
n.l.Warn("could not write object notification",
zap.Stringer("address", address),
zap.String("topic", topic),
zap.Error(err),
)
}
}
func initNotification(c *cfg) { func initNotification(c *cfg) {
if nodeconfig.Notification(c.appCfg).Enabled() { if nodeconfig.Notification(c.appCfg).Enabled() {
topic := nodeconfig.Notification(c.appCfg).DefaultTopic() topic := nodeconfig.Notification(c.appCfg).DefaultTopic()
pubKey := base58.Encode(c.cfgNodeInfo.localInfo.PublicKey())
if topic == "" { if topic == "" {
topic = base58.Encode(c.cfgNodeInfo.localInfo.PublicKey()) topic = pubKey
}
natsSvc, err := nats.New(
c.ctx,
nodeconfig.Notification(c.appCfg).Endpoint(),
nats.WithConnectionName("NeoFS Storage Node: "+pubKey), // connection name is used in the server side logs
nats.WithTimeout(nodeconfig.Notification(c.appCfg).Timeout()),
nats.WithClientCert(
nodeconfig.Notification(c.appCfg).CertPath(),
nodeconfig.Notification(c.appCfg).KeyPath(),
),
nats.WithRootCA(nodeconfig.Notification(c.appCfg).CAPath()),
nats.WithLogger(c.log),
)
if err != nil {
panic("could not created object notificator: " + err.Error())
} }
n := notificator.New(new(notificator.Prm). n := notificator.New(new(notificator.Prm).
@ -117,7 +139,10 @@ func initNotification(c *cfg) {
l: c.log, l: c.log,
defaultTopic: topic, defaultTopic: topic,
}). }).
SetWriter(simpleNotificationWriter{l: c.log}), SetWriter(notificationWriter{
l: c.log,
w: natsSvc,
}),
) )
addNewEpochAsyncNotificationHandler(c, func(e event.Event) { addNewEpochAsyncNotificationHandler(c, func(e event.Event) {