diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index d713cf2fb..cdfa2118c 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -474,7 +474,6 @@ type cfg struct { cfgNetmap cfgNetmap cfgControlService cfgControlService cfgObject cfgObject - cfgNotifications cfgNotifications } // ReadCurrentNetMap reads network map which has been cached at the @@ -633,12 +632,6 @@ type cfgObject struct { skipSessionTokenIssuerVerification bool } -type cfgNotifications struct { - enabled bool - nw notificationWriter - defaultTopic string -} - type cfgLocalStorage struct { localStorage *engine.StorageEngine } diff --git a/cmd/frostfs-node/config/node/config.go b/cmd/frostfs-node/config/node/config.go index 90338556e..97aca274a 100644 --- a/cmd/frostfs-node/config/node/config.go +++ b/cmd/frostfs-node/config/node/config.go @@ -5,7 +5,6 @@ import ( "io/fs" "os" "strconv" - "time" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" @@ -25,12 +24,6 @@ type PersistentStateConfig struct { cfg *config.Config } -// NotificationConfig is a wrapper over "notification" config section -// which provides access to object notification configuration of node. -type NotificationConfig struct { - cfg *config.Config -} - // PersistentPolicyRulesConfig is a wrapper over "persistent_policy_rules" config section // which provides access to persistent policy rules storage configuration of node. type PersistentPolicyRulesConfig struct { @@ -41,16 +34,12 @@ const ( subsection = "node" persistentSessionsSubsection = "persistent_sessions" persistentStateSubsection = "persistent_state" - notificationSubsection = "notification" persistentPolicyRulesSubsection = "persistent_policy_rules" attributePrefix = "attribute" // PersistentStatePathDefault is a default path for persistent state file. PersistentStatePathDefault = ".frostfs-storage-state" - - // NotificationTimeoutDefault is a default timeout for object notification operation. - NotificationTimeoutDefault = 5 * time.Second ) // Key returns the value of "key" config parameter @@ -185,75 +174,6 @@ func (p PersistentStateConfig) Path() string { return PersistentStatePathDefault } -// Notification returns structure that provides access to "notification" -// subsection of "node" section. -func Notification(c *config.Config) NotificationConfig { - return NotificationConfig{ - c.Sub(subsection).Sub(notificationSubsection), - } -} - -// Enabled returns the value of "enabled" config parameter from "notification" -// subsection of "node" section. -// -// Returns false if the value is not presented. -func (n NotificationConfig) Enabled() bool { - return config.BoolSafe(n.cfg, "enabled") -} - -// DefaultTopic returns the value of "default_topic" config parameter from -// "notification" subsection of "node" section. -// -// Returns empty string if the value is not presented. -func (n NotificationConfig) DefaultTopic() string { - return config.StringSafe(n.cfg, "default_topic") -} - -// Endpoint returns the value of "endpoint" config parameter from "notification" -// subsection of "node" section. -// -// Returns empty string if the value is not presented. -func (n NotificationConfig) Endpoint() string { - return config.StringSafe(n.cfg, "endpoint") -} - -// Timeout returns the value of "timeout" config parameter from "notification" -// subsection of "node" section. -// -// Returns NotificationTimeoutDefault if the value is not positive. -func (n NotificationConfig) Timeout() time.Duration { - v := config.DurationSafe(n.cfg, "timeout") - if v > 0 { - return v - } - - return NotificationTimeoutDefault -} - -// CertPath returns the value of "certificate_path" config parameter from "notification" -// subsection of "node" section. -// -// Returns empty string if the value is not presented. -func (n NotificationConfig) CertPath() string { - return config.StringSafe(n.cfg, "certificate") -} - -// KeyPath returns the value of "key_path" config parameter from -// "notification" subsection of "node" section. -// -// Returns empty string if the value is not presented. -func (n NotificationConfig) KeyPath() string { - return config.StringSafe(n.cfg, "key") -} - -// CAPath returns the value of "ca_path" config parameter from -// "notification" subsection of "node" section. -// -// Returns empty string if the value is not presented. -func (n NotificationConfig) CAPath() string { - return config.StringSafe(n.cfg, "ca") -} - const ( // PermDefault is a default permission bits for local override storage file. PermDefault = 0o644 diff --git a/cmd/frostfs-node/config/node/config_test.go b/cmd/frostfs-node/config/node/config_test.go index b0041c870..7b9adecf4 100644 --- a/cmd/frostfs-node/config/node/config_test.go +++ b/cmd/frostfs-node/config/node/config_test.go @@ -2,7 +2,6 @@ package nodeconfig import ( "testing" - "time" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test" @@ -33,25 +32,11 @@ func TestNodeSection(t *testing.T) { relay := Relay(empty) persisessionsPath := PersistentSessions(empty).Path() persistatePath := PersistentState(empty).Path() - notificationDefaultEnabled := Notification(empty).Enabled() - notificationDefaultEndpoint := Notification(empty).Endpoint() - notificationDefaultTimeout := Notification(empty).Timeout() - notificationDefaultTopic := Notification(empty).DefaultTopic() - notificationDefaultCertPath := Notification(empty).CertPath() - notificationDefaultKeyPath := Notification(empty).KeyPath() - notificationDefaultCAPath := Notification(empty).CAPath() require.Empty(t, attribute) require.Equal(t, false, relay) require.Equal(t, "", persisessionsPath) require.Equal(t, PersistentStatePathDefault, persistatePath) - require.Equal(t, false, notificationDefaultEnabled) - require.Equal(t, "", notificationDefaultEndpoint) - require.Equal(t, NotificationTimeoutDefault, notificationDefaultTimeout) - require.Equal(t, "", notificationDefaultTopic) - require.Equal(t, "", notificationDefaultCertPath) - require.Equal(t, "", notificationDefaultKeyPath) - require.Equal(t, "", notificationDefaultCAPath) }) const path = "../../../../config/example/node" @@ -64,13 +49,6 @@ func TestNodeSection(t *testing.T) { wKey := Wallet(c) persisessionsPath := PersistentSessions(c).Path() persistatePath := PersistentState(c).Path() - notificationEnabled := Notification(c).Enabled() - notificationEndpoint := Notification(c).Endpoint() - notificationTimeout := Notification(c).Timeout() - notificationDefaultTopic := Notification(c).DefaultTopic() - notificationCertPath := Notification(c).CertPath() - notificationKeyPath := Notification(c).KeyPath() - notificationCAPath := Notification(c).CAPath() expectedAddr := []struct { str string @@ -122,13 +100,6 @@ func TestNodeSection(t *testing.T) { require.Equal(t, "/sessions", persisessionsPath) require.Equal(t, "/state", persistatePath) - require.Equal(t, true, notificationEnabled) - require.Equal(t, "tls://localhost:4222", notificationEndpoint) - require.Equal(t, 6*time.Second, notificationTimeout) - require.Equal(t, "topic", notificationDefaultTopic) - require.Equal(t, "/cert/path", notificationCertPath) - require.Equal(t, "/key/path", notificationKeyPath) - require.Equal(t, "/ca/path", notificationCAPath) } configtest.ForEachFileType(path, fileConfigTest) diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index dbef1e494..e4f0a434c 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -111,7 +111,6 @@ func initApp(ctx context.Context, c *cfg) { initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) }) initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) }) initAndLog(c, "session", initSessionService) - initAndLog(c, "notification", func(c *cfg) { initNotifications(ctx, c) }) initAndLog(c, "object", initObjectService) initAndLog(c, "tree", initTreeService) initAndLog(c, "apemanager", initAPEManagerService) @@ -143,7 +142,6 @@ func stopAndLog(c *cfg, name string, stopper func() error) { } func bootUp(ctx context.Context, c *cfg) { - runAndLog(ctx, c, "NATS", true, connectNats) runAndLog(ctx, c, "gRPC", false, func(_ context.Context, c *cfg) { serveGRPC(c) }) runAndLog(ctx, c, "notary", true, makeAndWaitNotaryDeposit) diff --git a/cmd/frostfs-node/notificator.go b/cmd/frostfs-node/notificator.go deleted file mode 100644 index f9bb31fed..000000000 --- a/cmd/frostfs-node/notificator.go +++ /dev/null @@ -1,172 +0,0 @@ -package main - -import ( - "context" - "encoding/hex" - "fmt" - - nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/notificator" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/notificator/nats" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.uber.org/zap" -) - -type notificationSource struct { - e *engine.StorageEngine - l *logger.Logger - defaultTopic string -} - -func (n *notificationSource) Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address)) { - log := n.l.With(zap.Uint64("epoch", epoch)) - - listRes, err := n.e.ListContainers(ctx, engine.ListContainersPrm{}) - if err != nil { - log.Error(logs.FrostFSNodeNotificatorCouldNotListContainers, zap.Error(err)) - return - } - - filters := objectSDK.NewSearchFilters() - filters.AddNotificationEpochFilter(epoch) - - var selectPrm engine.SelectPrm - selectPrm.WithFilters(filters) - - for _, c := range listRes.Containers() { - selectPrm.WithContainerID(c) - - selectRes, err := n.e.Select(ctx, selectPrm) - if err != nil { - log.Error(logs.FrostFSNodeNotificatorCouldNotSelectObjectsFromContainer, - zap.Stringer("cid", c), - zap.Error(err), - ) - continue - } - - for _, a := range selectRes.AddressList() { - err = n.processAddress(ctx, a, handler) - if err != nil { - log.Error(logs.FrostFSNodeNotificatorCouldNotProcessObject, - zap.Stringer("address", a), - zap.Error(err), - ) - continue - } - } - } - - log.Debug(logs.FrostFSNodeNotificatorFinishedProcessingObjectNotifications) -} - -func (n *notificationSource) processAddress( - ctx context.Context, - a oid.Address, - h func(topic string, addr oid.Address), -) error { - var prm engine.HeadPrm - prm.WithAddress(a) - - res, err := n.e.Head(ctx, prm) - if err != nil { - return err - } - - ni, err := res.Header().NotificationInfo() - if err != nil { - return fmt.Errorf("could not retrieve notification topic from object: %w", err) - } - - topic := ni.Topic() - - if topic == "" { - topic = n.defaultTopic - } - - h(topic, a) - - return nil -} - -type notificationWriter struct { - l *logger.Logger - w *nats.Writer -} - -func (n notificationWriter) Notify(topic string, address oid.Address) { - if err := n.w.Notify(topic, address); err != nil { - n.l.Warn(logs.FrostFSNodeCouldNotWriteObjectNotification, - zap.Stringer("address", address), - zap.String("topic", topic), - zap.Error(err), - ) - } -} - -func initNotifications(ctx context.Context, c *cfg) { - if nodeconfig.Notification(c.appCfg).Enabled() { - topic := nodeconfig.Notification(c.appCfg).DefaultTopic() - pubKey := hex.EncodeToString(c.cfgNodeInfo.localInfo.PublicKey()) - - if topic == "" { - topic = pubKey - } - - natsSvc := nats.New( - nats.WithConnectionName("FrostFS Storage Node: "+pubKey), // connection name is used in the server side logs - nats.WithTimeout(nodeconfig.Notification(c.appCfg).Timeout()), - nats.WithClientCert( - nodeconfig.Notification(c.appCfg).CertPath(), - nodeconfig.Notification(c.appCfg).KeyPath(), - ), - nats.WithRootCA(nodeconfig.Notification(c.appCfg).CAPath()), - nats.WithLogger(c.log), - ) - - c.cfgNotifications = cfgNotifications{ - enabled: true, - nw: notificationWriter{ - l: c.log, - w: natsSvc, - }, - defaultTopic: topic, - } - - n := notificator.New(new(notificator.Prm). - SetLogger(c.log). - SetNotificationSource( - ¬ificationSource{ - e: c.cfgObject.cfgLocalStorage.localStorage, - l: c.log, - defaultTopic: topic, - }). - SetWriter(c.cfgNotifications.nw), - ) - - addNewEpochAsyncNotificationHandler(c, func(e event.Event) { - ev := e.(netmap.NewEpoch) - - n.ProcessEpoch(ctx, ev.EpochNumber()) - }) - } -} - -func connectNats(ctx context.Context, c *cfg) { - if !c.cfgNotifications.enabled { - return - } - - endpoint := nodeconfig.Notification(c.appCfg).Endpoint() - err := c.cfgNotifications.nw.w.Connect(ctx, endpoint) - if err != nil { - panic(fmt.Sprintf("could not connect to a nats endpoint %s: %v", endpoint, err)) - } - c.log.Info(logs.NatsConnectedToEndpoint, zap.String("endpoint", endpoint)) -} diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 9f34896bc..9d1e2218f 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -14,7 +14,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" @@ -333,15 +332,6 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche engine: ls, } - if c.cfgNotifications.enabled { - os = engineWithNotifications{ - base: os, - nw: c.cfgNotifications.nw, - ns: c.cfgNetmap.state, - defaultTopic: c.cfgNotifications.defaultTopic, - } - } - return putsvc.NewService( keyStorage, c.putClientCache, @@ -503,47 +493,6 @@ func (s *morphEACLFetcher) GetEACL(cnr cid.ID) (*containercore.EACL, error) { return eaclInfo, nil } -type engineWithNotifications struct { - base putsvc.ObjectStorage - nw notificationWriter - ns netmap.State - - defaultTopic string -} - -func (e engineWithNotifications) IsLocked(ctx context.Context, address oid.Address) (bool, error) { - return e.base.IsLocked(ctx, address) -} - -func (e engineWithNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error { - return e.base.Delete(ctx, tombstone, toDelete) -} - -func (e engineWithNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error { - return e.base.Lock(ctx, locker, toLock) -} - -func (e engineWithNotifications) Put(ctx context.Context, o *objectSDK.Object) error { - if err := e.base.Put(ctx, o); err != nil { - return err - } - - ni, err := o.NotificationInfo() - if err == nil { - if epoch := ni.Epoch(); epoch == 0 || epoch == e.ns.CurrentEpoch() { - topic := ni.Topic() - - if topic == "" { - topic = e.defaultTopic - } - - e.nw.Notify(topic, objectCore.AddressOf(o)) - } - } - - return nil -} - type engineWithoutNotifications struct { engine *engine.StorageEngine } diff --git a/config/example/node.env b/config/example/node.env index db31ae35a..9f15c4042 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -22,13 +22,6 @@ FROSTFS_NODE_ATTRIBUTE_1="UN-LOCODE:RU MSK" FROSTFS_NODE_RELAY=true FROSTFS_NODE_PERSISTENT_SESSIONS_PATH=/sessions FROSTFS_NODE_PERSISTENT_STATE_PATH=/state -FROSTFS_NODE_NOTIFICATION_ENABLED=true -FROSTFS_NODE_NOTIFICATION_ENDPOINT=tls://localhost:4222 -FROSTFS_NODE_NOTIFICATION_TIMEOUT=6s -FROSTFS_NODE_NOTIFICATION_DEFAULT_TOPIC=topic -FROSTFS_NODE_NOTIFICATION_CERTIFICATE=/cert/path -FROSTFS_NODE_NOTIFICATION_KEY=/key/path -FROSTFS_NODE_NOTIFICATION_CA=/ca/path # Tree service section FROSTFS_TREE_ENABLED=true diff --git a/config/example/node.json b/config/example/node.json index 80140e5a2..79e6fe89a 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -36,15 +36,6 @@ }, "persistent_state": { "path": "/state" - }, - "notification": { - "enabled": true, - "endpoint": "tls://localhost:4222", - "timeout": "6s", - "default_topic": "topic", - "certificate": "/cert/path", - "key": "/key/path", - "ca": "/ca/path" } }, "grpc": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 13c1823d8..34e796ac4 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -35,14 +35,6 @@ node: path: /sessions # path to persistent session tokens file of Storage node (default: in-memory sessions) persistent_state: path: /state # path to persistent state file of Storage node - notification: - enabled: true # turn on object notification service - endpoint: "tls://localhost:4222" # notification server endpoint - timeout: "6s" # timeout for object notification client connection - default_topic: "topic" # default topic for object notifications if not found in object's meta - certificate: "/cert/path" # path to TLS certificate - key: "/key/path" # path to TLS key - ca: "/ca/path" # path to optional CA certificate grpc: - endpoint: s01.frostfs.devenv:8080 # endpoint for gRPC server diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index ce9065ac2..3a6969abd 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -322,14 +322,6 @@ node: path: /sessions persistent_state: path: /state - notification: - enabled: true - endpoint: tls://localhost:4222 - timeout: 6s - default_topic: topic - certificate: /path/to/cert.pem - key: /path/to/key.pem - ca: /path/to/ca.pem ``` | Parameter | Type | Default value | Description | @@ -341,8 +333,6 @@ node: | `relay` | `bool` | | Enable relay mode. | | `persistent_sessions` | [Persistent sessions config](#persistent_sessions-subsection) | | Persistent session token store configuration. | | `persistent_state` | [Persistent state config](#persistent_state-subsection) | | Persistent state configuration. | -| `notification` | [Notification config](#notification-subsection) | | NATS configuration. | - ## `wallet` subsection N3 wallet configuration. @@ -369,19 +359,6 @@ It is used to correctly handle node restarts or crashes. |-----------|----------|------------------------|------------------------| | `path` | `string` | `.frostfs-storage-state` | Path to the database. | -## `notification` subsection -This is an advanced section, use with caution. - -| Parameter | Type | Default value | Description | -|-----------------|------------|-------------------|-------------------------------------------------------------------| -| `enabled` | `bool` | `false` | Flag to enable the service. | -| `endpoint` | `string` | | NATS endpoint to connect to. | -| `timeout` | `duration` | `5s` | Timeout for the object notification operation. | -| `default_topic` | `string` | node's public key | Default topic to use if an object has no corresponding attribute. | -| `certificate` | `string` | | Path to the client certificate. | -| `key` | `string` | | Path to the client key. | -| `ca` | `string` | | Override root CA used to verify server certificates. | - # `apiclient` section Configuration for the FrostFS API client used for communication with other FrostFS nodes. diff --git a/go.mod b/go.mod index 093560953..42896ed8c 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,6 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.12.1 - github.com/nats-io/nats.go v1.32.0 github.com/nspcc-dev/neo-go v0.106.0 github.com/olekukonko/tablewriter v0.0.5 github.com/panjf2000/ants/v2 v2.9.0 @@ -96,8 +95,6 @@ require ( github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multihash v0.2.3 // indirect github.com/multiformats/go-varint v0.0.7 // indirect - github.com/nats-io/nkeys v0.4.7 // indirect - github.com/nats-io/nuid v1.0.1 // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect github.com/nspcc-dev/rfc6979 v0.2.1 // indirect diff --git a/go.sum b/go.sum index 6f5034f00..07632a032 100644 --- a/go.sum +++ b/go.sum @@ -184,12 +184,6 @@ github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7B github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= -github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0= -github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= -github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= -github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= -github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 h1:mD9hU3v+zJcnHAVmHnZKt3I++tvn30gBj2rP2PocZMk= github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2/go.mod h1:U5VfmPNM88P4RORFb6KSUVBdJBDhlqggJZYGXGPxOcc= github.com/nspcc-dev/neo-go v0.106.0 h1:YiOdW/GcLmbVSvxMRfD5aytO6n3TDHrEA97VHMawy6g= diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 7dc63341d..8dae61c3d 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -132,10 +132,6 @@ const ( UtilCouldNotPushTaskToWorkerPool = "could not push task to worker pool" V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring" V2CantCheckIfRequestFromContainerNode = "can't check if request from container node" - NatsNatsConnectionWasLost = "nats: connection was lost" - NatsNatsReconnectedToTheServer = "nats: reconnected to the server" - NatsNatsClosingConnectionAsTheContextIsDone = "nats: closing connection as the context is done" - NatsConnectedToEndpoint = "nats: successfully connected to endpoint" ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch" ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch = "could not restore notification subscription after RPC switch" ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch = "could not restore notary notification subscription after RPC switch" diff --git a/pkg/services/notificator/deps.go b/pkg/services/notificator/deps.go deleted file mode 100644 index d6330f788..000000000 --- a/pkg/services/notificator/deps.go +++ /dev/null @@ -1,22 +0,0 @@ -package notificator - -import ( - "context" - - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" -) - -// NotificationSource is a source of object notifications. -type NotificationSource interface { - // Iterate must iterate over all notifications for the - // provided epoch and call handler for all of them. - Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address)) -} - -// NotificationWriter notifies all the subscribers -// about new object notifications. -type NotificationWriter interface { - // Notify must notify about an event generated - // from an object with a specific topic. - Notify(topic string, address oid.Address) -} diff --git a/pkg/services/notificator/nats/options.go b/pkg/services/notificator/nats/options.go deleted file mode 100644 index c9ba2ed26..000000000 --- a/pkg/services/notificator/nats/options.go +++ /dev/null @@ -1,38 +0,0 @@ -package nats - -import ( - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "github.com/nats-io/nats.go" -) - -func WithClientCert(certPath, keyPath string) Option { - return func(o *opts) { - o.nOpts = append(o.nOpts, nats.ClientCert(certPath, keyPath)) - } -} - -func WithRootCA(paths ...string) Option { - return func(o *opts) { - o.nOpts = append(o.nOpts, nats.RootCAs(paths...)) - } -} - -func WithTimeout(timeout time.Duration) Option { - return func(o *opts) { - o.nOpts = append(o.nOpts, nats.Timeout(timeout)) - } -} - -func WithConnectionName(name string) Option { - return func(o *opts) { - o.nOpts = append(o.nOpts, nats.Name(name)) - } -} - -func WithLogger(logger *logger.Logger) Option { - return func(o *opts) { - o.log = logger - } -} diff --git a/pkg/services/notificator/nats/service.go b/pkg/services/notificator/nats/service.go deleted file mode 100644 index 9de538f2b..000000000 --- a/pkg/services/notificator/nats/service.go +++ /dev/null @@ -1,129 +0,0 @@ -package nats - -import ( - "context" - "errors" - "fmt" - "sync" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/nats-io/nats.go" - "go.uber.org/zap" -) - -// Writer is a NATS object notification writer. -// It handles NATS JetStream connections and allows -// sending string representation of the address to -// the NATS server. -// -// For correct operation must be created via New function. -// new(Writer) or Writer{} construction leads to undefined -// behaviour and is not safe. -type Writer struct { - js nats.JetStreamContext - nc *nats.Conn - - m sync.RWMutex - createdStreams map[string]struct{} - opts -} - -type opts struct { - log *logger.Logger - nOpts []nats.Option -} - -type Option func(*opts) - -var errConnIsClosed = errors.New("connection to the server is closed") - -// Notify sends object address's string representation to the provided topic. -// Uses first 4 bytes of object ID as a message ID to support 'exactly once' -// message delivery. -// -// Returns error only if: -// 1. underlying connection was closed and has not been established again; -// 2. NATS server could not respond that it has saved the message. -func (n *Writer) Notify(topic string, address oid.Address) error { - if !n.nc.IsConnected() { - return errConnIsClosed - } - - // use first 4 byte of the encoded string as - // message ID for the 'exactly once' delivery - messageID := address.Object().EncodeToString()[:4] - - // check if the stream was previously created - n.m.RLock() - _, created := n.createdStreams[topic] - n.m.RUnlock() - - if !created { - _, err := n.js.AddStream(&nats.StreamConfig{ - Name: topic, - }) - if err != nil { - return fmt.Errorf("could not add stream: %w", err) - } - - n.m.Lock() - n.createdStreams[topic] = struct{}{} - n.m.Unlock() - } - - _, err := n.js.Publish(topic, []byte(address.EncodeToString()), nats.MsgId(messageID)) - return err -} - -// New creates new Writer. -func New(oo ...Option) *Writer { - w := &Writer{ - createdStreams: make(map[string]struct{}), - opts: opts{ - log: &logger.Logger{Logger: zap.L()}, - nOpts: make([]nats.Option, 0, len(oo)+3), - }, - } - - for _, o := range oo { - o(&w.opts) - } - - w.opts.nOpts = append(w.opts.nOpts, - nats.NoCallbacksAfterClientClose(), // do not call callbacks when it was planned writer stop - nats.DisconnectErrHandler(func(_ *nats.Conn, err error) { - w.log.Error(logs.NatsNatsConnectionWasLost, zap.Error(err)) - }), - nats.ReconnectHandler(func(_ *nats.Conn) { - w.log.Warn(logs.NatsNatsReconnectedToTheServer) - }), - ) - - return w -} - -// Connect tries to connect to a specified NATS endpoint. -// -// Connection is closed when passed context is done. -func (n *Writer) Connect(ctx context.Context, endpoint string) error { - nc, err := nats.Connect(endpoint, n.opts.nOpts...) - if err != nil { - return fmt.Errorf("could not connect to server: %w", err) - } - - n.nc = nc - - // usage w/o options is error-free - n.js, _ = nc.JetStream() - - go func() { - <-ctx.Done() - n.opts.log.Info(logs.NatsNatsClosingConnectionAsTheContextIsDone) - - nc.Close() - }() - - return nil -} diff --git a/pkg/services/notificator/service.go b/pkg/services/notificator/service.go deleted file mode 100644 index bbf4e4823..000000000 --- a/pkg/services/notificator/service.go +++ /dev/null @@ -1,88 +0,0 @@ -package notificator - -import ( - "context" - "fmt" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.uber.org/zap" -) - -// Prm groups Notificator constructor's -// parameters. All are required. -type Prm struct { - writer NotificationWriter - notificationSource NotificationSource - logger *logger.Logger -} - -// SetLogger sets a logger. -func (prm *Prm) SetLogger(v *logger.Logger) *Prm { - prm.logger = v - return prm -} - -// SetWriter sets notification writer. -func (prm *Prm) SetWriter(v NotificationWriter) *Prm { - prm.writer = v - return prm -} - -// SetNotificationSource sets notification source. -func (prm *Prm) SetNotificationSource(v NotificationSource) *Prm { - prm.notificationSource = v - return prm -} - -// Notificator is a notification producer that handles -// objects with defined notification epoch. -// -// Working client must be created via constructor New. -// Using the Client that has been created with new(Client) -// expression (or just declaring a Client variable) is unsafe -// and can lead to panic. -type Notificator struct { - w NotificationWriter - ns NotificationSource - l *logger.Logger -} - -// New creates, initializes and returns the Notificator instance. -// -// Panics if any field of the passed Prm structure is not set/set -// to nil. -func New(prm *Prm) *Notificator { - panicOnNil := func(v any, name string) { - if v == nil { - panic(fmt.Sprintf("Notificator constructor: %s is nil\n", name)) - } - } - - panicOnNil(prm.writer, "NotificationWriter") - panicOnNil(prm.notificationSource, "NotificationSource") - panicOnNil(prm.logger, "Logger") - - return &Notificator{ - w: prm.writer, - ns: prm.notificationSource, - l: prm.logger, - } -} - -// ProcessEpoch looks for all objects with defined epoch in the storage -// and passes their addresses to the NotificationWriter. -func (n *Notificator) ProcessEpoch(ctx context.Context, epoch uint64) { - logger := n.l.With(zap.Uint64("epoch", epoch)) - logger.Debug(logs.NotificatorNotificatorStartProcessingObjectNotifications) - - n.ns.Iterate(ctx, epoch, func(topic string, addr oid.Address) { - n.l.Debug(logs.NotificatorNotificatorProcessingObjectNotification, - zap.String("topic", topic), - zap.Stringer("address", addr), - ) - - n.w.Notify(topic, addr) - }) -}