node: Remove notification functionality #1161

Merged
fyrchik merged 1 commit from fyrchik/frostfs-node:remove-nats into master 2024-06-07 12:10:57 +00:00
17 changed files with 0 additions and 672 deletions
Showing only changes of commit 95d9455d30 - Show all commits

View file

@ -474,7 +474,6 @@ type cfg struct {
cfgNetmap cfgNetmap
cfgControlService cfgControlService
cfgObject cfgObject
cfgNotifications cfgNotifications
}
// ReadCurrentNetMap reads network map which has been cached at the
@ -633,12 +632,6 @@ type cfgObject struct {
skipSessionTokenIssuerVerification bool
}
type cfgNotifications struct {
enabled bool
nw notificationWriter
defaultTopic string
}
type cfgLocalStorage struct {
localStorage *engine.StorageEngine
}

View file

@ -5,7 +5,6 @@ import (
"io/fs"
"os"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -25,12 +24,6 @@ type PersistentStateConfig struct {
cfg *config.Config
}
// NotificationConfig is a wrapper over "notification" config section
// which provides access to object notification configuration of node.
type NotificationConfig struct {
cfg *config.Config
}
// PersistentPolicyRulesConfig is a wrapper over "persistent_policy_rules" config section
// which provides access to persistent policy rules storage configuration of node.
type PersistentPolicyRulesConfig struct {
@ -41,16 +34,12 @@ const (
subsection = "node"
persistentSessionsSubsection = "persistent_sessions"
persistentStateSubsection = "persistent_state"
notificationSubsection = "notification"
persistentPolicyRulesSubsection = "persistent_policy_rules"
attributePrefix = "attribute"
// PersistentStatePathDefault is a default path for persistent state file.
PersistentStatePathDefault = ".frostfs-storage-state"
// NotificationTimeoutDefault is a default timeout for object notification operation.
NotificationTimeoutDefault = 5 * time.Second
)
// Key returns the value of "key" config parameter
@ -185,75 +174,6 @@ func (p PersistentStateConfig) Path() string {
return PersistentStatePathDefault
}
// Notification returns structure that provides access to "notification"
// subsection of "node" section.
func Notification(c *config.Config) NotificationConfig {
return NotificationConfig{
c.Sub(subsection).Sub(notificationSubsection),
}
}
// Enabled returns the value of "enabled" config parameter from "notification"
// subsection of "node" section.
//
// Returns false if the value is not presented.
func (n NotificationConfig) Enabled() bool {
return config.BoolSafe(n.cfg, "enabled")
}
// DefaultTopic returns the value of "default_topic" config parameter from
// "notification" subsection of "node" section.
//
// Returns empty string if the value is not presented.
func (n NotificationConfig) DefaultTopic() string {
return config.StringSafe(n.cfg, "default_topic")
}
// Endpoint returns the value of "endpoint" config parameter from "notification"
// subsection of "node" section.
//
// Returns empty string if the value is not presented.
func (n NotificationConfig) Endpoint() string {
return config.StringSafe(n.cfg, "endpoint")
}
// Timeout returns the value of "timeout" config parameter from "notification"
// subsection of "node" section.
//
// Returns NotificationTimeoutDefault if the value is not positive.
func (n NotificationConfig) Timeout() time.Duration {
v := config.DurationSafe(n.cfg, "timeout")
if v > 0 {
return v
}
return NotificationTimeoutDefault
}
// CertPath returns the value of "certificate_path" config parameter from "notification"
// subsection of "node" section.
//
// Returns empty string if the value is not presented.
func (n NotificationConfig) CertPath() string {
return config.StringSafe(n.cfg, "certificate")
}
// KeyPath returns the value of "key_path" config parameter from
// "notification" subsection of "node" section.
//
// Returns empty string if the value is not presented.
func (n NotificationConfig) KeyPath() string {
return config.StringSafe(n.cfg, "key")
}
// CAPath returns the value of "ca_path" config parameter from
// "notification" subsection of "node" section.
//
// Returns empty string if the value is not presented.
func (n NotificationConfig) CAPath() string {
return config.StringSafe(n.cfg, "ca")
}
const (
// PermDefault is a default permission bits for local override storage file.
PermDefault = 0o644

View file

@ -2,7 +2,6 @@ package nodeconfig
import (
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
@ -33,25 +32,11 @@ func TestNodeSection(t *testing.T) {
relay := Relay(empty)
persisessionsPath := PersistentSessions(empty).Path()
persistatePath := PersistentState(empty).Path()
notificationDefaultEnabled := Notification(empty).Enabled()
notificationDefaultEndpoint := Notification(empty).Endpoint()
notificationDefaultTimeout := Notification(empty).Timeout()
notificationDefaultTopic := Notification(empty).DefaultTopic()
notificationDefaultCertPath := Notification(empty).CertPath()
notificationDefaultKeyPath := Notification(empty).KeyPath()
notificationDefaultCAPath := Notification(empty).CAPath()
require.Empty(t, attribute)
require.Equal(t, false, relay)
require.Equal(t, "", persisessionsPath)
require.Equal(t, PersistentStatePathDefault, persistatePath)
require.Equal(t, false, notificationDefaultEnabled)
require.Equal(t, "", notificationDefaultEndpoint)
require.Equal(t, NotificationTimeoutDefault, notificationDefaultTimeout)
require.Equal(t, "", notificationDefaultTopic)
require.Equal(t, "", notificationDefaultCertPath)
require.Equal(t, "", notificationDefaultKeyPath)
require.Equal(t, "", notificationDefaultCAPath)
})
const path = "../../../../config/example/node"
@ -64,13 +49,6 @@ func TestNodeSection(t *testing.T) {
wKey := Wallet(c)
persisessionsPath := PersistentSessions(c).Path()
persistatePath := PersistentState(c).Path()
notificationEnabled := Notification(c).Enabled()
notificationEndpoint := Notification(c).Endpoint()
notificationTimeout := Notification(c).Timeout()
notificationDefaultTopic := Notification(c).DefaultTopic()
notificationCertPath := Notification(c).CertPath()
notificationKeyPath := Notification(c).KeyPath()
notificationCAPath := Notification(c).CAPath()
expectedAddr := []struct {
str string
@ -122,13 +100,6 @@ func TestNodeSection(t *testing.T) {
require.Equal(t, "/sessions", persisessionsPath)
require.Equal(t, "/state", persistatePath)
require.Equal(t, true, notificationEnabled)
require.Equal(t, "tls://localhost:4222", notificationEndpoint)
require.Equal(t, 6*time.Second, notificationTimeout)
require.Equal(t, "topic", notificationDefaultTopic)
require.Equal(t, "/cert/path", notificationCertPath)
require.Equal(t, "/key/path", notificationKeyPath)
require.Equal(t, "/ca/path", notificationCAPath)
}
configtest.ForEachFileType(path, fileConfigTest)

View file

@ -111,7 +111,6 @@ func initApp(ctx context.Context, c *cfg) {
initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) })
initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) })
initAndLog(c, "session", initSessionService)
initAndLog(c, "notification", func(c *cfg) { initNotifications(ctx, c) })
initAndLog(c, "object", initObjectService)
initAndLog(c, "tree", initTreeService)
initAndLog(c, "apemanager", initAPEManagerService)
@ -143,7 +142,6 @@ func stopAndLog(c *cfg, name string, stopper func() error) {
}
func bootUp(ctx context.Context, c *cfg) {
runAndLog(ctx, c, "NATS", true, connectNats)
runAndLog(ctx, c, "gRPC", false, func(_ context.Context, c *cfg) { serveGRPC(c) })
runAndLog(ctx, c, "notary", true, makeAndWaitNotaryDeposit)

View file

@ -1,172 +0,0 @@
package main
import (
"context"
"encoding/hex"
"fmt"
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/notificator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/notificator/nats"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type notificationSource struct {
e *engine.StorageEngine
l *logger.Logger
defaultTopic string
}
func (n *notificationSource) Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address)) {
log := n.l.With(zap.Uint64("epoch", epoch))
listRes, err := n.e.ListContainers(ctx, engine.ListContainersPrm{})
if err != nil {
log.Error(logs.FrostFSNodeNotificatorCouldNotListContainers, zap.Error(err))
return
}
filters := objectSDK.NewSearchFilters()
filters.AddNotificationEpochFilter(epoch)
var selectPrm engine.SelectPrm
selectPrm.WithFilters(filters)
for _, c := range listRes.Containers() {
selectPrm.WithContainerID(c)
selectRes, err := n.e.Select(ctx, selectPrm)
if err != nil {
log.Error(logs.FrostFSNodeNotificatorCouldNotSelectObjectsFromContainer,
zap.Stringer("cid", c),
zap.Error(err),
)
continue
}
for _, a := range selectRes.AddressList() {
err = n.processAddress(ctx, a, handler)
if err != nil {
log.Error(logs.FrostFSNodeNotificatorCouldNotProcessObject,
zap.Stringer("address", a),
zap.Error(err),
)
continue
}
}
}
log.Debug(logs.FrostFSNodeNotificatorFinishedProcessingObjectNotifications)
}
func (n *notificationSource) processAddress(
ctx context.Context,
a oid.Address,
h func(topic string, addr oid.Address),
) error {
var prm engine.HeadPrm
prm.WithAddress(a)
res, err := n.e.Head(ctx, prm)
if err != nil {
return err
}
ni, err := res.Header().NotificationInfo()
if err != nil {
return fmt.Errorf("could not retrieve notification topic from object: %w", err)
}
topic := ni.Topic()
if topic == "" {
topic = n.defaultTopic
}
h(topic, a)
return nil
}
type notificationWriter struct {
l *logger.Logger
w *nats.Writer
}
func (n notificationWriter) Notify(topic string, address oid.Address) {
if err := n.w.Notify(topic, address); err != nil {
n.l.Warn(logs.FrostFSNodeCouldNotWriteObjectNotification,
zap.Stringer("address", address),
zap.String("topic", topic),
zap.Error(err),
)
}
}
func initNotifications(ctx context.Context, c *cfg) {
if nodeconfig.Notification(c.appCfg).Enabled() {
topic := nodeconfig.Notification(c.appCfg).DefaultTopic()
pubKey := hex.EncodeToString(c.cfgNodeInfo.localInfo.PublicKey())
if topic == "" {
topic = pubKey
}
natsSvc := nats.New(
nats.WithConnectionName("FrostFS Storage Node: "+pubKey), // connection name is used in the server side logs
nats.WithTimeout(nodeconfig.Notification(c.appCfg).Timeout()),
nats.WithClientCert(
nodeconfig.Notification(c.appCfg).CertPath(),
nodeconfig.Notification(c.appCfg).KeyPath(),
),
nats.WithRootCA(nodeconfig.Notification(c.appCfg).CAPath()),
nats.WithLogger(c.log),
)
c.cfgNotifications = cfgNotifications{
enabled: true,
nw: notificationWriter{
l: c.log,
w: natsSvc,
},
defaultTopic: topic,
}
n := notificator.New(new(notificator.Prm).
SetLogger(c.log).
SetNotificationSource(
&notificationSource{
e: c.cfgObject.cfgLocalStorage.localStorage,
l: c.log,
defaultTopic: topic,
}).
SetWriter(c.cfgNotifications.nw),
)
addNewEpochAsyncNotificationHandler(c, func(e event.Event) {
ev := e.(netmap.NewEpoch)
n.ProcessEpoch(ctx, ev.EpochNumber())
})
}
}
func connectNats(ctx context.Context, c *cfg) {
if !c.cfgNotifications.enabled {
return
}
endpoint := nodeconfig.Notification(c.appCfg).Endpoint()
err := c.cfgNotifications.nw.w.Connect(ctx, endpoint)
if err != nil {
panic(fmt.Sprintf("could not connect to a nats endpoint %s: %v", endpoint, err))
}
c.log.Info(logs.NatsConnectedToEndpoint, zap.String("endpoint", endpoint))
}

View file

@ -14,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
@ -333,15 +332,6 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
engine: ls,
}
if c.cfgNotifications.enabled {
os = engineWithNotifications{
base: os,
nw: c.cfgNotifications.nw,
ns: c.cfgNetmap.state,
defaultTopic: c.cfgNotifications.defaultTopic,
}
}
return putsvc.NewService(
keyStorage,
c.putClientCache,
@ -503,47 +493,6 @@ func (s *morphEACLFetcher) GetEACL(cnr cid.ID) (*containercore.EACL, error) {
return eaclInfo, nil
}
type engineWithNotifications struct {
base putsvc.ObjectStorage
nw notificationWriter
ns netmap.State
defaultTopic string
}
func (e engineWithNotifications) IsLocked(ctx context.Context, address oid.Address) (bool, error) {
return e.base.IsLocked(ctx, address)
}
func (e engineWithNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error {
return e.base.Delete(ctx, tombstone, toDelete)
}
func (e engineWithNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
return e.base.Lock(ctx, locker, toLock)
}
func (e engineWithNotifications) Put(ctx context.Context, o *objectSDK.Object) error {
if err := e.base.Put(ctx, o); err != nil {
return err
}
ni, err := o.NotificationInfo()
if err == nil {
if epoch := ni.Epoch(); epoch == 0 || epoch == e.ns.CurrentEpoch() {
topic := ni.Topic()
if topic == "" {
topic = e.defaultTopic
}
e.nw.Notify(topic, objectCore.AddressOf(o))
}
}
return nil
}
type engineWithoutNotifications struct {
engine *engine.StorageEngine
}

View file

@ -22,13 +22,6 @@ FROSTFS_NODE_ATTRIBUTE_1="UN-LOCODE:RU MSK"
FROSTFS_NODE_RELAY=true
FROSTFS_NODE_PERSISTENT_SESSIONS_PATH=/sessions
FROSTFS_NODE_PERSISTENT_STATE_PATH=/state
FROSTFS_NODE_NOTIFICATION_ENABLED=true
FROSTFS_NODE_NOTIFICATION_ENDPOINT=tls://localhost:4222
FROSTFS_NODE_NOTIFICATION_TIMEOUT=6s
FROSTFS_NODE_NOTIFICATION_DEFAULT_TOPIC=topic
FROSTFS_NODE_NOTIFICATION_CERTIFICATE=/cert/path
FROSTFS_NODE_NOTIFICATION_KEY=/key/path
FROSTFS_NODE_NOTIFICATION_CA=/ca/path
# Tree service section
FROSTFS_TREE_ENABLED=true

View file

@ -36,15 +36,6 @@
},
"persistent_state": {
"path": "/state"
},
"notification": {
"enabled": true,
"endpoint": "tls://localhost:4222",
"timeout": "6s",
"default_topic": "topic",
"certificate": "/cert/path",
"key": "/key/path",
"ca": "/ca/path"
}
},
"grpc": {

View file

@ -35,14 +35,6 @@ node:
path: /sessions # path to persistent session tokens file of Storage node (default: in-memory sessions)
persistent_state:
path: /state # path to persistent state file of Storage node
notification:
enabled: true # turn on object notification service
endpoint: "tls://localhost:4222" # notification server endpoint
timeout: "6s" # timeout for object notification client connection
default_topic: "topic" # default topic for object notifications if not found in object's meta
certificate: "/cert/path" # path to TLS certificate
key: "/key/path" # path to TLS key
ca: "/ca/path" # path to optional CA certificate
grpc:
- endpoint: s01.frostfs.devenv:8080 # endpoint for gRPC server

View file

@ -322,14 +322,6 @@ node:
path: /sessions
persistent_state:
path: /state
notification:
enabled: true
endpoint: tls://localhost:4222
timeout: 6s
default_topic: topic
certificate: /path/to/cert.pem
key: /path/to/key.pem
ca: /path/to/ca.pem
```
| Parameter | Type | Default value | Description |
@ -341,8 +333,6 @@ node:
| `relay` | `bool` | | Enable relay mode. |
| `persistent_sessions` | [Persistent sessions config](#persistent_sessions-subsection) | | Persistent session token store configuration. |
| `persistent_state` | [Persistent state config](#persistent_state-subsection) | | Persistent state configuration. |
| `notification` | [Notification config](#notification-subsection) | | NATS configuration. |
## `wallet` subsection
N3 wallet configuration.
@ -369,19 +359,6 @@ It is used to correctly handle node restarts or crashes.
|-----------|----------|------------------------|------------------------|
| `path` | `string` | `.frostfs-storage-state` | Path to the database. |
## `notification` subsection
This is an advanced section, use with caution.
| Parameter | Type | Default value | Description |
|-----------------|------------|-------------------|-------------------------------------------------------------------|
| `enabled` | `bool` | `false` | Flag to enable the service. |
| `endpoint` | `string` | | NATS endpoint to connect to. |
| `timeout` | `duration` | `5s` | Timeout for the object notification operation. |
| `default_topic` | `string` | node's public key | Default topic to use if an object has no corresponding attribute. |
| `certificate` | `string` | | Path to the client certificate. |
| `key` | `string` | | Path to the client key. |
| `ca` | `string` | | Override root CA used to verify server certificates. |
# `apiclient` section
Configuration for the FrostFS API client used for communication with other FrostFS nodes.

3
go.mod
View file

@ -23,7 +23,6 @@ require (
github.com/mitchellh/go-homedir v1.1.0
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.1
github.com/nats-io/nats.go v1.32.0
github.com/nspcc-dev/neo-go v0.106.0
github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.9.0
@ -96,8 +95,6 @@ require (
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -132,10 +132,6 @@ const (
UtilCouldNotPushTaskToWorkerPool = "could not push task to worker pool"
V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring"
V2CantCheckIfRequestFromContainerNode = "can't check if request from container node"
NatsNatsConnectionWasLost = "nats: connection was lost"
NatsNatsReconnectedToTheServer = "nats: reconnected to the server"
NatsNatsClosingConnectionAsTheContextIsDone = "nats: closing connection as the context is done"
NatsConnectedToEndpoint = "nats: successfully connected to endpoint"
ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch"
ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch = "could not restore notification subscription after RPC switch"
ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch = "could not restore notary notification subscription after RPC switch"

View file

@ -1,22 +0,0 @@
package notificator
import (
"context"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
// NotificationSource is a source of object notifications.
type NotificationSource interface {
// Iterate must iterate over all notifications for the
// provided epoch and call handler for all of them.
Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address))
}
// NotificationWriter notifies all the subscribers
// about new object notifications.
type NotificationWriter interface {
// Notify must notify about an event generated
// from an object with a specific topic.
Notify(topic string, address oid.Address)
}

View file

@ -1,38 +0,0 @@
package nats
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/nats-io/nats.go"
)
func WithClientCert(certPath, keyPath string) Option {
return func(o *opts) {
o.nOpts = append(o.nOpts, nats.ClientCert(certPath, keyPath))
}
}
func WithRootCA(paths ...string) Option {
return func(o *opts) {
o.nOpts = append(o.nOpts, nats.RootCAs(paths...))
}
}
func WithTimeout(timeout time.Duration) Option {
return func(o *opts) {
o.nOpts = append(o.nOpts, nats.Timeout(timeout))
}
}
func WithConnectionName(name string) Option {
return func(o *opts) {
o.nOpts = append(o.nOpts, nats.Name(name))
}
}
func WithLogger(logger *logger.Logger) Option {
return func(o *opts) {
o.log = logger
}
}

View file

@ -1,129 +0,0 @@
package nats
import (
"context"
"errors"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nats-io/nats.go"
"go.uber.org/zap"
)
// Writer is a NATS object notification writer.
// It handles NATS JetStream connections and allows
// sending string representation of the address to
// the NATS server.
//
// For correct operation must be created via New function.
// new(Writer) or Writer{} construction leads to undefined
// behaviour and is not safe.
type Writer struct {
js nats.JetStreamContext
nc *nats.Conn
m sync.RWMutex
createdStreams map[string]struct{}
opts
}
type opts struct {
log *logger.Logger
nOpts []nats.Option
}
type Option func(*opts)
var errConnIsClosed = errors.New("connection to the server is closed")
// Notify sends object address's string representation to the provided topic.
// Uses first 4 bytes of object ID as a message ID to support 'exactly once'
// message delivery.
//
// Returns error only if:
// 1. underlying connection was closed and has not been established again;
// 2. NATS server could not respond that it has saved the message.
func (n *Writer) Notify(topic string, address oid.Address) error {
if !n.nc.IsConnected() {
return errConnIsClosed
}
// use first 4 byte of the encoded string as
// message ID for the 'exactly once' delivery
messageID := address.Object().EncodeToString()[:4]
// check if the stream was previously created
n.m.RLock()
_, created := n.createdStreams[topic]
n.m.RUnlock()
if !created {
_, err := n.js.AddStream(&nats.StreamConfig{
Name: topic,
})
if err != nil {
return fmt.Errorf("could not add stream: %w", err)
}
n.m.Lock()
n.createdStreams[topic] = struct{}{}
n.m.Unlock()
}
_, err := n.js.Publish(topic, []byte(address.EncodeToString()), nats.MsgId(messageID))
return err
}
// New creates new Writer.
func New(oo ...Option) *Writer {
w := &Writer{
createdStreams: make(map[string]struct{}),
opts: opts{
log: &logger.Logger{Logger: zap.L()},
nOpts: make([]nats.Option, 0, len(oo)+3),
},
}
for _, o := range oo {
o(&w.opts)
}
w.opts.nOpts = append(w.opts.nOpts,
nats.NoCallbacksAfterClientClose(), // do not call callbacks when it was planned writer stop
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
w.log.Error(logs.NatsNatsConnectionWasLost, zap.Error(err))
}),
nats.ReconnectHandler(func(_ *nats.Conn) {
w.log.Warn(logs.NatsNatsReconnectedToTheServer)
}),
)
return w
}
// Connect tries to connect to a specified NATS endpoint.
//
// Connection is closed when passed context is done.
func (n *Writer) Connect(ctx context.Context, endpoint string) error {
nc, err := nats.Connect(endpoint, n.opts.nOpts...)
if err != nil {
return fmt.Errorf("could not connect to server: %w", err)
}
n.nc = nc
// usage w/o options is error-free
n.js, _ = nc.JetStream()
go func() {
<-ctx.Done()
n.opts.log.Info(logs.NatsNatsClosingConnectionAsTheContextIsDone)
nc.Close()
}()
return nil
}

View file

@ -1,88 +0,0 @@
package notificator
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
// Prm groups Notificator constructor's
// parameters. All are required.
type Prm struct {
writer NotificationWriter
notificationSource NotificationSource
logger *logger.Logger
}
// SetLogger sets a logger.
func (prm *Prm) SetLogger(v *logger.Logger) *Prm {
prm.logger = v
return prm
}
// SetWriter sets notification writer.
func (prm *Prm) SetWriter(v NotificationWriter) *Prm {
prm.writer = v
return prm
}
// SetNotificationSource sets notification source.
func (prm *Prm) SetNotificationSource(v NotificationSource) *Prm {
prm.notificationSource = v
return prm
}
// Notificator is a notification producer that handles
// objects with defined notification epoch.
//
// Working client must be created via constructor New.
// Using the Client that has been created with new(Client)
// expression (or just declaring a Client variable) is unsafe
// and can lead to panic.
type Notificator struct {
w NotificationWriter
ns NotificationSource
l *logger.Logger
}
// New creates, initializes and returns the Notificator instance.
//
// Panics if any field of the passed Prm structure is not set/set
// to nil.
func New(prm *Prm) *Notificator {
panicOnNil := func(v any, name string) {
if v == nil {
panic(fmt.Sprintf("Notificator constructor: %s is nil\n", name))
}
}
panicOnNil(prm.writer, "NotificationWriter")
panicOnNil(prm.notificationSource, "NotificationSource")
panicOnNil(prm.logger, "Logger")
return &Notificator{
w: prm.writer,
ns: prm.notificationSource,
l: prm.logger,
}
}
// ProcessEpoch looks for all objects with defined epoch in the storage
// and passes their addresses to the NotificationWriter.
func (n *Notificator) ProcessEpoch(ctx context.Context, epoch uint64) {
logger := n.l.With(zap.Uint64("epoch", epoch))
logger.Debug(logs.NotificatorNotificatorStartProcessingObjectNotifications)
n.ns.Iterate(ctx, epoch, func(topic string, addr oid.Address) {
n.l.Debug(logs.NotificatorNotificatorProcessingObjectNotification,
zap.String("topic", topic),
zap.Stringer("address", addr),
)
n.w.Notify(topic, addr)
})
}