package main import ( "context" "encoding/hex" "fmt" nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/notificator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/notificator/nats" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) type notificationSource struct { e *engine.StorageEngine l *logger.Logger defaultTopic string } func (n *notificationSource) Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address)) { log := n.l.With(zap.Uint64("epoch", epoch)) listRes, err := n.e.ListContainers(engine.ListContainersPrm{}) if err != nil { log.Error(logs.FrostFSNodeNotificatorCouldNotListContainers, zap.Error(err)) return } filters := objectSDK.NewSearchFilters() filters.AddNotificationEpochFilter(epoch) var selectPrm engine.SelectPrm selectPrm.WithFilters(filters) for _, c := range listRes.Containers() { selectPrm.WithContainerID(c) selectRes, err := n.e.Select(ctx, selectPrm) if err != nil { log.Error(logs.FrostFSNodeNotificatorCouldNotSelectObjectsFromContainer, zap.Stringer("cid", c), zap.Error(err), ) continue } for _, a := range selectRes.AddressList() { err = n.processAddress(ctx, a, handler) if err != nil { log.Error(logs.FrostFSNodeNotificatorCouldNotProcessObject, zap.Stringer("address", a), zap.Error(err), ) continue } } } log.Debug(logs.FrostFSNodeNotificatorFinishedProcessingObjectNotifications) } func (n *notificationSource) processAddress( ctx context.Context, a oid.Address, h func(topic string, addr oid.Address), ) error { var prm engine.HeadPrm prm.WithAddress(a) res, err := n.e.Head(ctx, prm) if err != nil { return err } ni, err := res.Header().NotificationInfo() if err != nil { return fmt.Errorf("could not retrieve notification topic from object: %w", err) } topic := ni.Topic() if topic == "" { topic = n.defaultTopic } h(topic, a) return nil } type notificationWriter struct { l *logger.Logger w *nats.Writer } func (n notificationWriter) Notify(topic string, address oid.Address) { if err := n.w.Notify(topic, address); err != nil { n.l.Warn(logs.FrostFSNodeCouldNotWriteObjectNotification, zap.Stringer("address", address), zap.String("topic", topic), zap.Error(err), ) } } func initNotifications(ctx context.Context, c *cfg) { if nodeconfig.Notification(c.appCfg).Enabled() { topic := nodeconfig.Notification(c.appCfg).DefaultTopic() pubKey := hex.EncodeToString(c.cfgNodeInfo.localInfo.PublicKey()) if topic == "" { topic = pubKey } natsSvc := nats.New( nats.WithConnectionName("FrostFS Storage Node: "+pubKey), // connection name is used in the server side logs nats.WithTimeout(nodeconfig.Notification(c.appCfg).Timeout()), nats.WithClientCert( nodeconfig.Notification(c.appCfg).CertPath(), nodeconfig.Notification(c.appCfg).KeyPath(), ), nats.WithRootCA(nodeconfig.Notification(c.appCfg).CAPath()), nats.WithLogger(c.log), ) c.cfgNotifications = cfgNotifications{ enabled: true, nw: notificationWriter{ l: c.log, w: natsSvc, }, defaultTopic: topic, } n := notificator.New(new(notificator.Prm). SetLogger(c.log). SetNotificationSource( ¬ificationSource{ e: c.cfgObject.cfgLocalStorage.localStorage, l: c.log, defaultTopic: topic, }). SetWriter(c.cfgNotifications.nw), ) addNewEpochAsyncNotificationHandler(c, func(e event.Event) { ev := e.(netmap.NewEpoch) n.ProcessEpoch(ctx, ev.EpochNumber()) }) } } func connectNats(ctx context.Context, c *cfg) { if !c.cfgNotifications.enabled { return } endpoint := nodeconfig.Notification(c.appCfg).Endpoint() err := c.cfgNotifications.nw.w.Connect(ctx, endpoint) if err != nil { panic(fmt.Sprintf("could not connect to a nats endpoint %s: %v", endpoint, err)) } }