frostfs-node/pkg/services/notificator/service.go

89 lines
2.3 KiB
Go

package notificator
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
// Prm groups Notificator constructor's
// parameters. All are required.
type Prm struct {
writer NotificationWriter
notificationSource NotificationSource
logger *logger.Logger
}
// SetLogger sets a logger.
func (prm *Prm) SetLogger(v *logger.Logger) *Prm {
prm.logger = v
return prm
}
// SetWriter sets notification writer.
func (prm *Prm) SetWriter(v NotificationWriter) *Prm {
prm.writer = v
return prm
}
// SetNotificationSource sets notification source.
func (prm *Prm) SetNotificationSource(v NotificationSource) *Prm {
prm.notificationSource = v
return prm
}
// Notificator is a notification producer that handles
// objects with defined notification epoch.
//
// Working client must be created via constructor New.
// Using the Client that has been created with new(Client)
// expression (or just declaring a Client variable) is unsafe
// and can lead to panic.
type Notificator struct {
w NotificationWriter
ns NotificationSource
l *logger.Logger
}
// New creates, initializes and returns the Notificator instance.
//
// Panics if any field of the passed Prm structure is not set/set
// to nil.
func New(prm *Prm) *Notificator {
panicOnNil := func(v any, name string) {
if v == nil {
panic(fmt.Sprintf("Notificator constructor: %s is nil\n", name))
}
}
panicOnNil(prm.writer, "NotificationWriter")
panicOnNil(prm.notificationSource, "NotificationSource")
panicOnNil(prm.logger, "Logger")
return &Notificator{
w: prm.writer,
ns: prm.notificationSource,
l: prm.logger,
}
}
// ProcessEpoch looks for all objects with defined epoch in the storage
// and passes their addresses to the NotificationWriter.
func (n *Notificator) ProcessEpoch(ctx context.Context, epoch uint64) {
logger := n.l.With(zap.Uint64("epoch", epoch))
logger.Debug(logs.NotificatorNotificatorStartProcessingObjectNotifications)
n.ns.Iterate(ctx, epoch, func(topic string, addr oid.Address) {
n.l.Debug(logs.NotificatorNotificatorProcessingObjectNotification,
zap.String("topic", topic),
zap.Stringer("address", addr),
)
n.w.Notify(topic, addr)
})
}