diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index e8e5332b..a054f3ea 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -40,6 +40,7 @@ import ( apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/util/signature" @@ -250,11 +251,25 @@ func initObjectService(c *cfg) { c.workers = append(c.workers, pol) + var os putsvc.ObjectStorage + if c.cfgNotifications.enabled { + os = engineWithNotifications{ + e: ls, + nw: c.cfgNotifications.nw, + ns: c.cfgNetmap.state, + defaultTopic: c.cfgNotifications.defaultTopic, + } + } else { + os = engineWithoutNotifications{ + e: ls, + } + } + sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(coreConstructor), putsvc.WithMaxSizeSource(c), - putsvc.WithLocalStorage(ls), + putsvc.WithObjectStorage(os), putsvc.WithContainerSource(c.cfgObject.cnrSource), putsvc.WithNetworkMapSource(c.cfgObject.netMapSource), putsvc.WithNetmapKeys(c), @@ -531,3 +546,40 @@ func (c *reputationClientConstructor) Get(info coreclient.NodeInfo) (coreclient. return cl, nil } + +type engineWithNotifications struct { + e *engine.StorageEngine + nw notificationWriter + ns netmap.State + + defaultTopic string +} + +func (e engineWithNotifications) Put(o *objectSDK.Object) error { + if err := engine.Put(e.e, o); err != nil { + return err + } + + ni, err := o.NotificationInfo() + if err == nil { + if epoch := ni.Epoch(); epoch == 0 || epoch == e.ns.CurrentEpoch() { + topic := ni.Topic() + + if topic == "" { + topic = e.defaultTopic + } + + e.nw.Notify(topic, objectCore.AddressOf(o)) + } + } + + return nil +} + +type engineWithoutNotifications struct { + e *engine.StorageEngine +} + +func (e engineWithoutNotifications) Put(o *objectSDK.Object) error { + return engine.Put(e.e, o) +}