From 52e742bac1bbe20cfa1d499ab97ad5bd3a079aaf Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 15 Mar 2022 22:46:43 +0300 Subject: [PATCH] [#1243] node: Add "hot" notifications Wrap engine with notifications writer (if configured so) to allow sending notifications right after the object is saved in the local storage. "Hot" notifications are sent for objects with the following tick epoch values: 1. 0; 2. *current epoch*. Signed-off-by: Pavel Karpy --- cmd/neofs-node/object.go | 54 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index e8e5332b79..a054f3ea93 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -40,6 +40,7 @@ import ( apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/util/signature" @@ -250,11 +251,25 @@ func initObjectService(c *cfg) { c.workers = append(c.workers, pol) + var os putsvc.ObjectStorage + if c.cfgNotifications.enabled { + os = engineWithNotifications{ + e: ls, + nw: c.cfgNotifications.nw, + ns: c.cfgNetmap.state, + defaultTopic: c.cfgNotifications.defaultTopic, + } + } else { + os = engineWithoutNotifications{ + e: ls, + } + } + sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(coreConstructor), putsvc.WithMaxSizeSource(c), - putsvc.WithLocalStorage(ls), + putsvc.WithObjectStorage(os), putsvc.WithContainerSource(c.cfgObject.cnrSource), putsvc.WithNetworkMapSource(c.cfgObject.netMapSource), putsvc.WithNetmapKeys(c), @@ -531,3 +546,40 @@ func (c *reputationClientConstructor) Get(info coreclient.NodeInfo) (coreclient. return cl, nil } + +type engineWithNotifications struct { + e *engine.StorageEngine + nw notificationWriter + ns netmap.State + + defaultTopic string +} + +func (e engineWithNotifications) Put(o *objectSDK.Object) error { + if err := engine.Put(e.e, o); err != nil { + return err + } + + ni, err := o.NotificationInfo() + if err == nil { + if epoch := ni.Epoch(); epoch == 0 || epoch == e.ns.CurrentEpoch() { + topic := ni.Topic() + + if topic == "" { + topic = e.defaultTopic + } + + e.nw.Notify(topic, objectCore.AddressOf(o)) + } + } + + return nil +} + +type engineWithoutNotifications struct { + e *engine.StorageEngine +} + +func (e engineWithoutNotifications) Put(o *objectSDK.Object) error { + return engine.Put(e.e, o) +}