forked from TrueCloudLab/frostfs-node
[#1183] node/notificator: Implement notificator
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
7d0e3648a2
commit
90ff066940
2 changed files with 103 additions and 0 deletions
18
pkg/services/notificator/deps.go
Normal file
18
pkg/services/notificator/deps.go
Normal file
|
@ -0,0 +1,18 @@
|
|||
package notificator
|
||||
|
||||
import objectSDKAddress "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||
|
||||
// NotificationSource is a source of object notifications.
|
||||
type NotificationSource interface {
|
||||
// Iterate must iterate over all notifications for the
|
||||
// provided epoch and call handler for all of them.
|
||||
Iterate(epoch uint64, handler func(topic string, addr *objectSDKAddress.Address))
|
||||
}
|
||||
|
||||
// NotificationWriter notifies all the subscribers
|
||||
// about new object notifications.
|
||||
type NotificationWriter interface {
|
||||
// Notify must send string representation of the object
|
||||
// address into the specified topic.
|
||||
Notify(topic string, address *objectSDKAddress.Address)
|
||||
}
|
85
pkg/services/notificator/service.go
Normal file
85
pkg/services/notificator/service.go
Normal file
|
@ -0,0 +1,85 @@
|
|||
package notificator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
objectSDKAddress "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Prm groups Notificator constructor's
|
||||
// parameters. All are required.
|
||||
type Prm struct {
|
||||
writer NotificationWriter
|
||||
notificationSource NotificationSource
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// SetLogger sets a logger.
|
||||
func (prm *Prm) SetLogger(v *zap.Logger) *Prm {
|
||||
prm.logger = v
|
||||
return prm
|
||||
}
|
||||
|
||||
// SetWriter sets notification writer.
|
||||
func (prm *Prm) SetWriter(v NotificationWriter) *Prm {
|
||||
prm.writer = v
|
||||
return prm
|
||||
}
|
||||
|
||||
// SetNotificationSource sets notification source.
|
||||
func (prm *Prm) SetNotificationSource(v NotificationSource) *Prm {
|
||||
prm.notificationSource = v
|
||||
return prm
|
||||
}
|
||||
|
||||
// Notificator is a notification producer that handles
|
||||
// objects with defined notification epoch.
|
||||
//
|
||||
// Working client must be created via constructor New.
|
||||
// Using the Client that has been created with new(Client)
|
||||
// expression (or just declaring a Client variable) is unsafe
|
||||
// and can lead to panic.
|
||||
type Notificator struct {
|
||||
w NotificationWriter
|
||||
ns NotificationSource
|
||||
l *zap.Logger
|
||||
}
|
||||
|
||||
// New creates, initializes and returns the Notificator instance.
|
||||
//
|
||||
// Panics if any field of the passed Prm structure is not set/set
|
||||
// to nil.
|
||||
func New(prm *Prm) *Notificator {
|
||||
panicOnNil := func(v interface{}, name string) {
|
||||
if v == nil {
|
||||
panic(fmt.Sprintf("Notificator constructor: %s is nil\n", name))
|
||||
}
|
||||
}
|
||||
|
||||
panicOnNil(prm.writer, "NotificationWriter")
|
||||
panicOnNil(prm.notificationSource, "NotificationSource")
|
||||
panicOnNil(prm.logger, "Logger")
|
||||
|
||||
return &Notificator{
|
||||
w: prm.writer,
|
||||
ns: prm.notificationSource,
|
||||
l: prm.logger,
|
||||
}
|
||||
}
|
||||
|
||||
// ProcessEpoch looks for all objects with defined epoch in the storage
|
||||
// and passes their addresses to the NotificationWriter.
|
||||
func (n *Notificator) ProcessEpoch(epoch uint64) {
|
||||
logger := n.l.With(zap.Uint64("epoch", epoch))
|
||||
logger.Debug("notificator: start processing object notifications")
|
||||
|
||||
n.ns.Iterate(epoch, func(topic string, addr *objectSDKAddress.Address) {
|
||||
n.l.Debug("notificator: processing object notification",
|
||||
zap.String("topic", topic),
|
||||
zap.Stringer("address", addr),
|
||||
)
|
||||
|
||||
n.w.Notify(topic, addr)
|
||||
})
|
||||
}
|
Loading…
Reference in a new issue