From 5e90d85020d4a78463fe890ace4276d99d57c570 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Sat, 5 Mar 2022 14:51:45 +0300 Subject: [PATCH] [#1183] node: Use NATS client as notification writer Signed-off-by: Pavel Karpy --- cmd/neofs-node/notificator.go | 51 ++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/cmd/neofs-node/notificator.go b/cmd/neofs-node/notificator.go index 1be2e597c..9525d5704 100644 --- a/cmd/neofs-node/notificator.go +++ b/cmd/neofs-node/notificator.go @@ -9,22 +9,12 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-node/pkg/services/notificator" + "github.com/nspcc-dev/neofs-node/pkg/services/notificator/nats" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" "go.uber.org/zap" ) -type simpleNotificationWriter struct { - l *zap.Logger -} - -func (s simpleNotificationWriter) Notify(topic string, address *addressSDK.Address) { - s.l.Debug("got notification to write", - zap.String("topic", topic), - zap.Stringer("address", address), - ) -} - type notificationSource struct { e *engine.StorageEngine l *zap.Logger @@ -101,12 +91,44 @@ func (n *notificationSource) processAddress( return nil } +type notificationWriter struct { + l *zap.Logger + w *nats.Writer +} + +func (n notificationWriter) Notify(topic string, address *addressSDK.Address) { + if err := n.w.Notify(topic, address); err != nil { + n.l.Warn("could not write object notification", + zap.Stringer("address", address), + zap.String("topic", topic), + zap.Error(err), + ) + } +} + func initNotification(c *cfg) { if nodeconfig.Notification(c.appCfg).Enabled() { topic := nodeconfig.Notification(c.appCfg).DefaultTopic() + pubKey := base58.Encode(c.cfgNodeInfo.localInfo.PublicKey()) if topic == "" { - topic = base58.Encode(c.cfgNodeInfo.localInfo.PublicKey()) + topic = pubKey + } + + natsSvc, err := nats.New( + c.ctx, + nodeconfig.Notification(c.appCfg).Endpoint(), + nats.WithConnectionName("NeoFS Storage Node: "+pubKey), // connection name is used in the server side logs + nats.WithTimeout(nodeconfig.Notification(c.appCfg).Timeout()), + nats.WithClientCert( + nodeconfig.Notification(c.appCfg).CertPath(), + nodeconfig.Notification(c.appCfg).KeyPath(), + ), + nats.WithRootCA(nodeconfig.Notification(c.appCfg).CAPath()), + nats.WithLogger(c.log), + ) + if err != nil { + panic("could not created object notificator: " + err.Error()) } n := notificator.New(new(notificator.Prm). @@ -117,7 +139,10 @@ func initNotification(c *cfg) { l: c.log, defaultTopic: topic, }). - SetWriter(simpleNotificationWriter{l: c.log}), + SetWriter(notificationWriter{ + l: c.log, + w: natsSvc, + }), ) addNewEpochAsyncNotificationHandler(c, func(e event.Event) {