From 90ff0669407d8dd109fab22d969c8846859b45cb Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 17 Feb 2022 15:42:07 +0300 Subject: [PATCH] [#1183] node/notificator: Implement notificator Signed-off-by: Pavel Karpy --- pkg/services/notificator/deps.go | 18 ++++++ pkg/services/notificator/service.go | 85 +++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 pkg/services/notificator/deps.go create mode 100644 pkg/services/notificator/service.go diff --git a/pkg/services/notificator/deps.go b/pkg/services/notificator/deps.go new file mode 100644 index 00000000..db0c2ded --- /dev/null +++ b/pkg/services/notificator/deps.go @@ -0,0 +1,18 @@ +package notificator + +import objectSDKAddress "github.com/nspcc-dev/neofs-sdk-go/object/address" + +// NotificationSource is a source of object notifications. +type NotificationSource interface { + // Iterate must iterate over all notifications for the + // provided epoch and call handler for all of them. + Iterate(epoch uint64, handler func(topic string, addr *objectSDKAddress.Address)) +} + +// NotificationWriter notifies all the subscribers +// about new object notifications. +type NotificationWriter interface { + // Notify must send string representation of the object + // address into the specified topic. + Notify(topic string, address *objectSDKAddress.Address) +} diff --git a/pkg/services/notificator/service.go b/pkg/services/notificator/service.go new file mode 100644 index 00000000..6994f8b3 --- /dev/null +++ b/pkg/services/notificator/service.go @@ -0,0 +1,85 @@ +package notificator + +import ( + "fmt" + + objectSDKAddress "github.com/nspcc-dev/neofs-sdk-go/object/address" + "go.uber.org/zap" +) + +// Prm groups Notificator constructor's +// parameters. All are required. +type Prm struct { + writer NotificationWriter + notificationSource NotificationSource + logger *zap.Logger +} + +// SetLogger sets a logger. +func (prm *Prm) SetLogger(v *zap.Logger) *Prm { + prm.logger = v + return prm +} + +// SetWriter sets notification writer. +func (prm *Prm) SetWriter(v NotificationWriter) *Prm { + prm.writer = v + return prm +} + +// SetNotificationSource sets notification source. +func (prm *Prm) SetNotificationSource(v NotificationSource) *Prm { + prm.notificationSource = v + return prm +} + +// Notificator is a notification producer that handles +// objects with defined notification epoch. +// +// Working client must be created via constructor New. +// Using the Client that has been created with new(Client) +// expression (or just declaring a Client variable) is unsafe +// and can lead to panic. +type Notificator struct { + w NotificationWriter + ns NotificationSource + l *zap.Logger +} + +// New creates, initializes and returns the Notificator instance. +// +// Panics if any field of the passed Prm structure is not set/set +// to nil. +func New(prm *Prm) *Notificator { + panicOnNil := func(v interface{}, name string) { + if v == nil { + panic(fmt.Sprintf("Notificator constructor: %s is nil\n", name)) + } + } + + panicOnNil(prm.writer, "NotificationWriter") + panicOnNil(prm.notificationSource, "NotificationSource") + panicOnNil(prm.logger, "Logger") + + return &Notificator{ + w: prm.writer, + ns: prm.notificationSource, + l: prm.logger, + } +} + +// ProcessEpoch looks for all objects with defined epoch in the storage +// and passes their addresses to the NotificationWriter. +func (n *Notificator) ProcessEpoch(epoch uint64) { + logger := n.l.With(zap.Uint64("epoch", epoch)) + logger.Debug("notificator: start processing object notifications") + + n.ns.Iterate(epoch, func(topic string, addr *objectSDKAddress.Address) { + n.l.Debug("notificator: processing object notification", + zap.String("topic", topic), + zap.Stringer("address", addr), + ) + + n.w.Notify(topic, addr) + }) +}