node: Remove notification functionality #1161
17 changed files with 0 additions and 672 deletions
|
@ -474,7 +474,6 @@ type cfg struct {
|
|||
cfgNetmap cfgNetmap
|
||||
cfgControlService cfgControlService
|
||||
cfgObject cfgObject
|
||||
cfgNotifications cfgNotifications
|
||||
}
|
||||
|
||||
// ReadCurrentNetMap reads network map which has been cached at the
|
||||
|
@ -633,12 +632,6 @@ type cfgObject struct {
|
|||
skipSessionTokenIssuerVerification bool
|
||||
}
|
||||
|
||||
type cfgNotifications struct {
|
||||
enabled bool
|
||||
nw notificationWriter
|
||||
defaultTopic string
|
||||
}
|
||||
|
||||
type cfgLocalStorage struct {
|
||||
localStorage *engine.StorageEngine
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"io/fs"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
|
@ -25,12 +24,6 @@ type PersistentStateConfig struct {
|
|||
cfg *config.Config
|
||||
}
|
||||
|
||||
// NotificationConfig is a wrapper over "notification" config section
|
||||
// which provides access to object notification configuration of node.
|
||||
type NotificationConfig struct {
|
||||
cfg *config.Config
|
||||
}
|
||||
|
||||
// PersistentPolicyRulesConfig is a wrapper over "persistent_policy_rules" config section
|
||||
// which provides access to persistent policy rules storage configuration of node.
|
||||
type PersistentPolicyRulesConfig struct {
|
||||
|
@ -41,16 +34,12 @@ const (
|
|||
subsection = "node"
|
||||
persistentSessionsSubsection = "persistent_sessions"
|
||||
persistentStateSubsection = "persistent_state"
|
||||
notificationSubsection = "notification"
|
||||
persistentPolicyRulesSubsection = "persistent_policy_rules"
|
||||
|
||||
attributePrefix = "attribute"
|
||||
|
||||
// PersistentStatePathDefault is a default path for persistent state file.
|
||||
PersistentStatePathDefault = ".frostfs-storage-state"
|
||||
|
||||
// NotificationTimeoutDefault is a default timeout for object notification operation.
|
||||
NotificationTimeoutDefault = 5 * time.Second
|
||||
)
|
||||
|
||||
// Key returns the value of "key" config parameter
|
||||
|
@ -185,75 +174,6 @@ func (p PersistentStateConfig) Path() string {
|
|||
return PersistentStatePathDefault
|
||||
}
|
||||
|
||||
// Notification returns structure that provides access to "notification"
|
||||
// subsection of "node" section.
|
||||
func Notification(c *config.Config) NotificationConfig {
|
||||
return NotificationConfig{
|
||||
c.Sub(subsection).Sub(notificationSubsection),
|
||||
}
|
||||
}
|
||||
|
||||
// Enabled returns the value of "enabled" config parameter from "notification"
|
||||
// subsection of "node" section.
|
||||
//
|
||||
// Returns false if the value is not presented.
|
||||
func (n NotificationConfig) Enabled() bool {
|
||||
return config.BoolSafe(n.cfg, "enabled")
|
||||
}
|
||||
|
||||
// DefaultTopic returns the value of "default_topic" config parameter from
|
||||
// "notification" subsection of "node" section.
|
||||
//
|
||||
// Returns empty string if the value is not presented.
|
||||
func (n NotificationConfig) DefaultTopic() string {
|
||||
return config.StringSafe(n.cfg, "default_topic")
|
||||
}
|
||||
|
||||
// Endpoint returns the value of "endpoint" config parameter from "notification"
|
||||
// subsection of "node" section.
|
||||
//
|
||||
// Returns empty string if the value is not presented.
|
||||
func (n NotificationConfig) Endpoint() string {
|
||||
return config.StringSafe(n.cfg, "endpoint")
|
||||
}
|
||||
|
||||
// Timeout returns the value of "timeout" config parameter from "notification"
|
||||
// subsection of "node" section.
|
||||
//
|
||||
// Returns NotificationTimeoutDefault if the value is not positive.
|
||||
func (n NotificationConfig) Timeout() time.Duration {
|
||||
v := config.DurationSafe(n.cfg, "timeout")
|
||||
if v > 0 {
|
||||
return v
|
||||
}
|
||||
|
||||
return NotificationTimeoutDefault
|
||||
}
|
||||
|
||||
// CertPath returns the value of "certificate_path" config parameter from "notification"
|
||||
// subsection of "node" section.
|
||||
//
|
||||
// Returns empty string if the value is not presented.
|
||||
func (n NotificationConfig) CertPath() string {
|
||||
return config.StringSafe(n.cfg, "certificate")
|
||||
}
|
||||
|
||||
// KeyPath returns the value of "key_path" config parameter from
|
||||
// "notification" subsection of "node" section.
|
||||
//
|
||||
// Returns empty string if the value is not presented.
|
||||
func (n NotificationConfig) KeyPath() string {
|
||||
return config.StringSafe(n.cfg, "key")
|
||||
}
|
||||
|
||||
// CAPath returns the value of "ca_path" config parameter from
|
||||
// "notification" subsection of "node" section.
|
||||
//
|
||||
// Returns empty string if the value is not presented.
|
||||
func (n NotificationConfig) CAPath() string {
|
||||
return config.StringSafe(n.cfg, "ca")
|
||||
}
|
||||
|
||||
const (
|
||||
// PermDefault is a default permission bits for local override storage file.
|
||||
PermDefault = 0o644
|
||||
|
|
|
@ -2,7 +2,6 @@ package nodeconfig
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||
|
@ -33,25 +32,11 @@ func TestNodeSection(t *testing.T) {
|
|||
relay := Relay(empty)
|
||||
persisessionsPath := PersistentSessions(empty).Path()
|
||||
persistatePath := PersistentState(empty).Path()
|
||||
notificationDefaultEnabled := Notification(empty).Enabled()
|
||||
notificationDefaultEndpoint := Notification(empty).Endpoint()
|
||||
notificationDefaultTimeout := Notification(empty).Timeout()
|
||||
notificationDefaultTopic := Notification(empty).DefaultTopic()
|
||||
notificationDefaultCertPath := Notification(empty).CertPath()
|
||||
notificationDefaultKeyPath := Notification(empty).KeyPath()
|
||||
notificationDefaultCAPath := Notification(empty).CAPath()
|
||||
|
||||
require.Empty(t, attribute)
|
||||
require.Equal(t, false, relay)
|
||||
require.Equal(t, "", persisessionsPath)
|
||||
require.Equal(t, PersistentStatePathDefault, persistatePath)
|
||||
require.Equal(t, false, notificationDefaultEnabled)
|
||||
require.Equal(t, "", notificationDefaultEndpoint)
|
||||
require.Equal(t, NotificationTimeoutDefault, notificationDefaultTimeout)
|
||||
require.Equal(t, "", notificationDefaultTopic)
|
||||
require.Equal(t, "", notificationDefaultCertPath)
|
||||
require.Equal(t, "", notificationDefaultKeyPath)
|
||||
require.Equal(t, "", notificationDefaultCAPath)
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
|
@ -64,13 +49,6 @@ func TestNodeSection(t *testing.T) {
|
|||
wKey := Wallet(c)
|
||||
persisessionsPath := PersistentSessions(c).Path()
|
||||
persistatePath := PersistentState(c).Path()
|
||||
notificationEnabled := Notification(c).Enabled()
|
||||
notificationEndpoint := Notification(c).Endpoint()
|
||||
notificationTimeout := Notification(c).Timeout()
|
||||
notificationDefaultTopic := Notification(c).DefaultTopic()
|
||||
notificationCertPath := Notification(c).CertPath()
|
||||
notificationKeyPath := Notification(c).KeyPath()
|
||||
notificationCAPath := Notification(c).CAPath()
|
||||
|
||||
expectedAddr := []struct {
|
||||
str string
|
||||
|
@ -122,13 +100,6 @@ func TestNodeSection(t *testing.T) {
|
|||
|
||||
require.Equal(t, "/sessions", persisessionsPath)
|
||||
require.Equal(t, "/state", persistatePath)
|
||||
require.Equal(t, true, notificationEnabled)
|
||||
require.Equal(t, "tls://localhost:4222", notificationEndpoint)
|
||||
require.Equal(t, 6*time.Second, notificationTimeout)
|
||||
require.Equal(t, "topic", notificationDefaultTopic)
|
||||
require.Equal(t, "/cert/path", notificationCertPath)
|
||||
require.Equal(t, "/key/path", notificationKeyPath)
|
||||
require.Equal(t, "/ca/path", notificationCAPath)
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
|
|
@ -111,7 +111,6 @@ func initApp(ctx context.Context, c *cfg) {
|
|||
initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) })
|
||||
initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) })
|
||||
initAndLog(c, "session", initSessionService)
|
||||
initAndLog(c, "notification", func(c *cfg) { initNotifications(ctx, c) })
|
||||
initAndLog(c, "object", initObjectService)
|
||||
initAndLog(c, "tree", initTreeService)
|
||||
initAndLog(c, "apemanager", initAPEManagerService)
|
||||
|
@ -143,7 +142,6 @@ func stopAndLog(c *cfg, name string, stopper func() error) {
|
|||
}
|
||||
|
||||
func bootUp(ctx context.Context, c *cfg) {
|
||||
runAndLog(ctx, c, "NATS", true, connectNats)
|
||||
runAndLog(ctx, c, "gRPC", false, func(_ context.Context, c *cfg) { serveGRPC(c) })
|
||||
runAndLog(ctx, c, "notary", true, makeAndWaitNotaryDeposit)
|
||||
|
||||
|
|
|
@ -1,172 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
|
||||
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/notificator"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/notificator/nats"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type notificationSource struct {
|
||||
e *engine.StorageEngine
|
||||
l *logger.Logger
|
||||
defaultTopic string
|
||||
}
|
||||
|
||||
func (n *notificationSource) Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address)) {
|
||||
log := n.l.With(zap.Uint64("epoch", epoch))
|
||||
|
||||
listRes, err := n.e.ListContainers(ctx, engine.ListContainersPrm{})
|
||||
if err != nil {
|
||||
log.Error(logs.FrostFSNodeNotificatorCouldNotListContainers, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
filters := objectSDK.NewSearchFilters()
|
||||
filters.AddNotificationEpochFilter(epoch)
|
||||
|
||||
var selectPrm engine.SelectPrm
|
||||
selectPrm.WithFilters(filters)
|
||||
|
||||
for _, c := range listRes.Containers() {
|
||||
selectPrm.WithContainerID(c)
|
||||
|
||||
selectRes, err := n.e.Select(ctx, selectPrm)
|
||||
if err != nil {
|
||||
log.Error(logs.FrostFSNodeNotificatorCouldNotSelectObjectsFromContainer,
|
||||
zap.Stringer("cid", c),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, a := range selectRes.AddressList() {
|
||||
err = n.processAddress(ctx, a, handler)
|
||||
if err != nil {
|
||||
log.Error(logs.FrostFSNodeNotificatorCouldNotProcessObject,
|
||||
zap.Stringer("address", a),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug(logs.FrostFSNodeNotificatorFinishedProcessingObjectNotifications)
|
||||
}
|
||||
|
||||
func (n *notificationSource) processAddress(
|
||||
ctx context.Context,
|
||||
a oid.Address,
|
||||
h func(topic string, addr oid.Address),
|
||||
) error {
|
||||
var prm engine.HeadPrm
|
||||
prm.WithAddress(a)
|
||||
|
||||
res, err := n.e.Head(ctx, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ni, err := res.Header().NotificationInfo()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not retrieve notification topic from object: %w", err)
|
||||
}
|
||||
|
||||
topic := ni.Topic()
|
||||
|
||||
if topic == "" {
|
||||
topic = n.defaultTopic
|
||||
}
|
||||
|
||||
h(topic, a)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type notificationWriter struct {
|
||||
l *logger.Logger
|
||||
w *nats.Writer
|
||||
}
|
||||
|
||||
func (n notificationWriter) Notify(topic string, address oid.Address) {
|
||||
if err := n.w.Notify(topic, address); err != nil {
|
||||
n.l.Warn(logs.FrostFSNodeCouldNotWriteObjectNotification,
|
||||
zap.Stringer("address", address),
|
||||
zap.String("topic", topic),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func initNotifications(ctx context.Context, c *cfg) {
|
||||
if nodeconfig.Notification(c.appCfg).Enabled() {
|
||||
topic := nodeconfig.Notification(c.appCfg).DefaultTopic()
|
||||
pubKey := hex.EncodeToString(c.cfgNodeInfo.localInfo.PublicKey())
|
||||
|
||||
if topic == "" {
|
||||
topic = pubKey
|
||||
}
|
||||
|
||||
natsSvc := nats.New(
|
||||
nats.WithConnectionName("FrostFS Storage Node: "+pubKey), // connection name is used in the server side logs
|
||||
nats.WithTimeout(nodeconfig.Notification(c.appCfg).Timeout()),
|
||||
nats.WithClientCert(
|
||||
nodeconfig.Notification(c.appCfg).CertPath(),
|
||||
nodeconfig.Notification(c.appCfg).KeyPath(),
|
||||
),
|
||||
nats.WithRootCA(nodeconfig.Notification(c.appCfg).CAPath()),
|
||||
nats.WithLogger(c.log),
|
||||
)
|
||||
|
||||
c.cfgNotifications = cfgNotifications{
|
||||
enabled: true,
|
||||
nw: notificationWriter{
|
||||
l: c.log,
|
||||
w: natsSvc,
|
||||
},
|
||||
defaultTopic: topic,
|
||||
}
|
||||
|
||||
n := notificator.New(new(notificator.Prm).
|
||||
SetLogger(c.log).
|
||||
SetNotificationSource(
|
||||
¬ificationSource{
|
||||
e: c.cfgObject.cfgLocalStorage.localStorage,
|
||||
l: c.log,
|
||||
defaultTopic: topic,
|
||||
}).
|
||||
SetWriter(c.cfgNotifications.nw),
|
||||
)
|
||||
|
||||
addNewEpochAsyncNotificationHandler(c, func(e event.Event) {
|
||||
ev := e.(netmap.NewEpoch)
|
||||
|
||||
n.ProcessEpoch(ctx, ev.EpochNumber())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func connectNats(ctx context.Context, c *cfg) {
|
||||
if !c.cfgNotifications.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
endpoint := nodeconfig.Notification(c.appCfg).Endpoint()
|
||||
err := c.cfgNotifications.nw.w.Connect(ctx, endpoint)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("could not connect to a nats endpoint %s: %v", endpoint, err))
|
||||
}
|
||||
c.log.Info(logs.NatsConnectedToEndpoint, zap.String("endpoint", endpoint))
|
||||
}
|
|
@ -14,7 +14,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||
|
@ -333,15 +332,6 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
|
|||
engine: ls,
|
||||
}
|
||||
|
||||
if c.cfgNotifications.enabled {
|
||||
os = engineWithNotifications{
|
||||
base: os,
|
||||
nw: c.cfgNotifications.nw,
|
||||
ns: c.cfgNetmap.state,
|
||||
defaultTopic: c.cfgNotifications.defaultTopic,
|
||||
}
|
||||
}
|
||||
|
||||
return putsvc.NewService(
|
||||
keyStorage,
|
||||
c.putClientCache,
|
||||
|
@ -503,47 +493,6 @@ func (s *morphEACLFetcher) GetEACL(cnr cid.ID) (*containercore.EACL, error) {
|
|||
return eaclInfo, nil
|
||||
}
|
||||
|
||||
type engineWithNotifications struct {
|
||||
base putsvc.ObjectStorage
|
||||
nw notificationWriter
|
||||
ns netmap.State
|
||||
|
||||
defaultTopic string
|
||||
}
|
||||
|
||||
func (e engineWithNotifications) IsLocked(ctx context.Context, address oid.Address) (bool, error) {
|
||||
return e.base.IsLocked(ctx, address)
|
||||
}
|
||||
|
||||
func (e engineWithNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error {
|
||||
return e.base.Delete(ctx, tombstone, toDelete)
|
||||
}
|
||||
|
||||
func (e engineWithNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
|
||||
return e.base.Lock(ctx, locker, toLock)
|
||||
}
|
||||
|
||||
func (e engineWithNotifications) Put(ctx context.Context, o *objectSDK.Object) error {
|
||||
if err := e.base.Put(ctx, o); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ni, err := o.NotificationInfo()
|
||||
if err == nil {
|
||||
if epoch := ni.Epoch(); epoch == 0 || epoch == e.ns.CurrentEpoch() {
|
||||
topic := ni.Topic()
|
||||
|
||||
if topic == "" {
|
||||
topic = e.defaultTopic
|
||||
}
|
||||
|
||||
e.nw.Notify(topic, objectCore.AddressOf(o))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type engineWithoutNotifications struct {
|
||||
engine *engine.StorageEngine
|
||||
}
|
||||
|
|
|
@ -22,13 +22,6 @@ FROSTFS_NODE_ATTRIBUTE_1="UN-LOCODE:RU MSK"
|
|||
FROSTFS_NODE_RELAY=true
|
||||
FROSTFS_NODE_PERSISTENT_SESSIONS_PATH=/sessions
|
||||
FROSTFS_NODE_PERSISTENT_STATE_PATH=/state
|
||||
FROSTFS_NODE_NOTIFICATION_ENABLED=true
|
||||
FROSTFS_NODE_NOTIFICATION_ENDPOINT=tls://localhost:4222
|
||||
FROSTFS_NODE_NOTIFICATION_TIMEOUT=6s
|
||||
FROSTFS_NODE_NOTIFICATION_DEFAULT_TOPIC=topic
|
||||
FROSTFS_NODE_NOTIFICATION_CERTIFICATE=/cert/path
|
||||
FROSTFS_NODE_NOTIFICATION_KEY=/key/path
|
||||
FROSTFS_NODE_NOTIFICATION_CA=/ca/path
|
||||
|
||||
# Tree service section
|
||||
FROSTFS_TREE_ENABLED=true
|
||||
|
|
|
@ -36,15 +36,6 @@
|
|||
},
|
||||
"persistent_state": {
|
||||
"path": "/state"
|
||||
},
|
||||
"notification": {
|
||||
"enabled": true,
|
||||
"endpoint": "tls://localhost:4222",
|
||||
"timeout": "6s",
|
||||
"default_topic": "topic",
|
||||
"certificate": "/cert/path",
|
||||
"key": "/key/path",
|
||||
"ca": "/ca/path"
|
||||
}
|
||||
},
|
||||
"grpc": {
|
||||
|
|
|
@ -35,14 +35,6 @@ node:
|
|||
path: /sessions # path to persistent session tokens file of Storage node (default: in-memory sessions)
|
||||
persistent_state:
|
||||
path: /state # path to persistent state file of Storage node
|
||||
notification:
|
||||
enabled: true # turn on object notification service
|
||||
endpoint: "tls://localhost:4222" # notification server endpoint
|
||||
timeout: "6s" # timeout for object notification client connection
|
||||
default_topic: "topic" # default topic for object notifications if not found in object's meta
|
||||
certificate: "/cert/path" # path to TLS certificate
|
||||
key: "/key/path" # path to TLS key
|
||||
ca: "/ca/path" # path to optional CA certificate
|
||||
|
||||
grpc:
|
||||
- endpoint: s01.frostfs.devenv:8080 # endpoint for gRPC server
|
||||
|
|
|
@ -322,14 +322,6 @@ node:
|
|||
path: /sessions
|
||||
persistent_state:
|
||||
path: /state
|
||||
notification:
|
||||
enabled: true
|
||||
endpoint: tls://localhost:4222
|
||||
timeout: 6s
|
||||
default_topic: topic
|
||||
certificate: /path/to/cert.pem
|
||||
key: /path/to/key.pem
|
||||
ca: /path/to/ca.pem
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|
@ -341,8 +333,6 @@ node:
|
|||
| `relay` | `bool` | | Enable relay mode. |
|
||||
| `persistent_sessions` | [Persistent sessions config](#persistent_sessions-subsection) | | Persistent session token store configuration. |
|
||||
| `persistent_state` | [Persistent state config](#persistent_state-subsection) | | Persistent state configuration. |
|
||||
| `notification` | [Notification config](#notification-subsection) | | NATS configuration. |
|
||||
|
||||
|
||||
## `wallet` subsection
|
||||
N3 wallet configuration.
|
||||
|
@ -369,19 +359,6 @@ It is used to correctly handle node restarts or crashes.
|
|||
|-----------|----------|------------------------|------------------------|
|
||||
| `path` | `string` | `.frostfs-storage-state` | Path to the database. |
|
||||
|
||||
## `notification` subsection
|
||||
This is an advanced section, use with caution.
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------|------------|-------------------|-------------------------------------------------------------------|
|
||||
| `enabled` | `bool` | `false` | Flag to enable the service. |
|
||||
| `endpoint` | `string` | | NATS endpoint to connect to. |
|
||||
| `timeout` | `duration` | `5s` | Timeout for the object notification operation. |
|
||||
| `default_topic` | `string` | node's public key | Default topic to use if an object has no corresponding attribute. |
|
||||
| `certificate` | `string` | | Path to the client certificate. |
|
||||
| `key` | `string` | | Path to the client key. |
|
||||
| `ca` | `string` | | Override root CA used to verify server certificates. |
|
||||
|
||||
# `apiclient` section
|
||||
Configuration for the FrostFS API client used for communication with other FrostFS nodes.
|
||||
|
||||
|
|
3
go.mod
3
go.mod
|
@ -23,7 +23,6 @@ require (
|
|||
github.com/mitchellh/go-homedir v1.1.0
|
||||
github.com/mr-tron/base58 v1.2.0
|
||||
github.com/multiformats/go-multiaddr v0.12.1
|
||||
github.com/nats-io/nats.go v1.32.0
|
||||
github.com/nspcc-dev/neo-go v0.106.0
|
||||
github.com/olekukonko/tablewriter v0.0.5
|
||||
github.com/panjf2000/ants/v2 v2.9.0
|
||||
|
@ -96,8 +95,6 @@ require (
|
|||
github.com/multiformats/go-multibase v0.2.0 // indirect
|
||||
github.com/multiformats/go-multihash v0.2.3 // indirect
|
||||
github.com/multiformats/go-varint v0.0.7 // indirect
|
||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
|
||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
|
||||
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -132,10 +132,6 @@ const (
|
|||
UtilCouldNotPushTaskToWorkerPool = "could not push task to worker pool"
|
||||
V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring"
|
||||
V2CantCheckIfRequestFromContainerNode = "can't check if request from container node"
|
||||
NatsNatsConnectionWasLost = "nats: connection was lost"
|
||||
NatsNatsReconnectedToTheServer = "nats: reconnected to the server"
|
||||
NatsNatsClosingConnectionAsTheContextIsDone = "nats: closing connection as the context is done"
|
||||
NatsConnectedToEndpoint = "nats: successfully connected to endpoint"
|
||||
ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch"
|
||||
ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch = "could not restore notification subscription after RPC switch"
|
||||
ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch = "could not restore notary notification subscription after RPC switch"
|
||||
|
|
|
@ -1,22 +0,0 @@
|
|||
package notificator
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
// NotificationSource is a source of object notifications.
|
||||
type NotificationSource interface {
|
||||
// Iterate must iterate over all notifications for the
|
||||
// provided epoch and call handler for all of them.
|
||||
Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address))
|
||||
}
|
||||
|
||||
// NotificationWriter notifies all the subscribers
|
||||
// about new object notifications.
|
||||
type NotificationWriter interface {
|
||||
// Notify must notify about an event generated
|
||||
// from an object with a specific topic.
|
||||
Notify(topic string, address oid.Address)
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
package nats
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
func WithClientCert(certPath, keyPath string) Option {
|
||||
return func(o *opts) {
|
||||
o.nOpts = append(o.nOpts, nats.ClientCert(certPath, keyPath))
|
||||
}
|
||||
}
|
||||
|
||||
func WithRootCA(paths ...string) Option {
|
||||
return func(o *opts) {
|
||||
o.nOpts = append(o.nOpts, nats.RootCAs(paths...))
|
||||
}
|
||||
}
|
||||
|
||||
func WithTimeout(timeout time.Duration) Option {
|
||||
return func(o *opts) {
|
||||
o.nOpts = append(o.nOpts, nats.Timeout(timeout))
|
||||
}
|
||||
}
|
||||
|
||||
func WithConnectionName(name string) Option {
|
||||
return func(o *opts) {
|
||||
o.nOpts = append(o.nOpts, nats.Name(name))
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(logger *logger.Logger) Option {
|
||||
return func(o *opts) {
|
||||
o.log = logger
|
||||
}
|
||||
}
|
|
@ -1,129 +0,0 @@
|
|||
package nats
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/nats-io/nats.go"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Writer is a NATS object notification writer.
|
||||
// It handles NATS JetStream connections and allows
|
||||
// sending string representation of the address to
|
||||
// the NATS server.
|
||||
//
|
||||
// For correct operation must be created via New function.
|
||||
// new(Writer) or Writer{} construction leads to undefined
|
||||
// behaviour and is not safe.
|
||||
type Writer struct {
|
||||
js nats.JetStreamContext
|
||||
nc *nats.Conn
|
||||
|
||||
m sync.RWMutex
|
||||
createdStreams map[string]struct{}
|
||||
opts
|
||||
}
|
||||
|
||||
type opts struct {
|
||||
log *logger.Logger
|
||||
nOpts []nats.Option
|
||||
}
|
||||
|
||||
type Option func(*opts)
|
||||
|
||||
var errConnIsClosed = errors.New("connection to the server is closed")
|
||||
|
||||
// Notify sends object address's string representation to the provided topic.
|
||||
// Uses first 4 bytes of object ID as a message ID to support 'exactly once'
|
||||
// message delivery.
|
||||
//
|
||||
// Returns error only if:
|
||||
// 1. underlying connection was closed and has not been established again;
|
||||
// 2. NATS server could not respond that it has saved the message.
|
||||
func (n *Writer) Notify(topic string, address oid.Address) error {
|
||||
if !n.nc.IsConnected() {
|
||||
return errConnIsClosed
|
||||
}
|
||||
|
||||
// use first 4 byte of the encoded string as
|
||||
// message ID for the 'exactly once' delivery
|
||||
messageID := address.Object().EncodeToString()[:4]
|
||||
|
||||
// check if the stream was previously created
|
||||
n.m.RLock()
|
||||
_, created := n.createdStreams[topic]
|
||||
n.m.RUnlock()
|
||||
|
||||
if !created {
|
||||
_, err := n.js.AddStream(&nats.StreamConfig{
|
||||
Name: topic,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not add stream: %w", err)
|
||||
}
|
||||
|
||||
n.m.Lock()
|
||||
n.createdStreams[topic] = struct{}{}
|
||||
n.m.Unlock()
|
||||
}
|
||||
|
||||
_, err := n.js.Publish(topic, []byte(address.EncodeToString()), nats.MsgId(messageID))
|
||||
return err
|
||||
}
|
||||
|
||||
// New creates new Writer.
|
||||
func New(oo ...Option) *Writer {
|
||||
w := &Writer{
|
||||
createdStreams: make(map[string]struct{}),
|
||||
opts: opts{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
nOpts: make([]nats.Option, 0, len(oo)+3),
|
||||
},
|
||||
}
|
||||
|
||||
for _, o := range oo {
|
||||
o(&w.opts)
|
||||
}
|
||||
|
||||
w.opts.nOpts = append(w.opts.nOpts,
|
||||
nats.NoCallbacksAfterClientClose(), // do not call callbacks when it was planned writer stop
|
||||
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
|
||||
w.log.Error(logs.NatsNatsConnectionWasLost, zap.Error(err))
|
||||
}),
|
||||
nats.ReconnectHandler(func(_ *nats.Conn) {
|
||||
w.log.Warn(logs.NatsNatsReconnectedToTheServer)
|
||||
}),
|
||||
)
|
||||
|
||||
return w
|
||||
}
|
||||
|
||||
// Connect tries to connect to a specified NATS endpoint.
|
||||
//
|
||||
// Connection is closed when passed context is done.
|
||||
func (n *Writer) Connect(ctx context.Context, endpoint string) error {
|
||||
nc, err := nats.Connect(endpoint, n.opts.nOpts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not connect to server: %w", err)
|
||||
}
|
||||
|
||||
n.nc = nc
|
||||
|
||||
// usage w/o options is error-free
|
||||
n.js, _ = nc.JetStream()
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
n.opts.log.Info(logs.NatsNatsClosingConnectionAsTheContextIsDone)
|
||||
|
||||
nc.Close()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,88 +0,0 @@
|
|||
package notificator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Prm groups Notificator constructor's
|
||||
// parameters. All are required.
|
||||
type Prm struct {
|
||||
writer NotificationWriter
|
||||
notificationSource NotificationSource
|
||||
logger *logger.Logger
|
||||
}
|
||||
|
||||
// SetLogger sets a logger.
|
||||
func (prm *Prm) SetLogger(v *logger.Logger) *Prm {
|
||||
prm.logger = v
|
||||
return prm
|
||||
}
|
||||
|
||||
// SetWriter sets notification writer.
|
||||
func (prm *Prm) SetWriter(v NotificationWriter) *Prm {
|
||||
prm.writer = v
|
||||
return prm
|
||||
}
|
||||
|
||||
// SetNotificationSource sets notification source.
|
||||
func (prm *Prm) SetNotificationSource(v NotificationSource) *Prm {
|
||||
prm.notificationSource = v
|
||||
return prm
|
||||
}
|
||||
|
||||
// Notificator is a notification producer that handles
|
||||
// objects with defined notification epoch.
|
||||
//
|
||||
// Working client must be created via constructor New.
|
||||
// Using the Client that has been created with new(Client)
|
||||
// expression (or just declaring a Client variable) is unsafe
|
||||
// and can lead to panic.
|
||||
type Notificator struct {
|
||||
w NotificationWriter
|
||||
ns NotificationSource
|
||||
l *logger.Logger
|
||||
}
|
||||
|
||||
// New creates, initializes and returns the Notificator instance.
|
||||
//
|
||||
// Panics if any field of the passed Prm structure is not set/set
|
||||
// to nil.
|
||||
func New(prm *Prm) *Notificator {
|
||||
panicOnNil := func(v any, name string) {
|
||||
if v == nil {
|
||||
panic(fmt.Sprintf("Notificator constructor: %s is nil\n", name))
|
||||
}
|
||||
}
|
||||
|
||||
panicOnNil(prm.writer, "NotificationWriter")
|
||||
panicOnNil(prm.notificationSource, "NotificationSource")
|
||||
panicOnNil(prm.logger, "Logger")
|
||||
|
||||
return &Notificator{
|
||||
w: prm.writer,
|
||||
ns: prm.notificationSource,
|
||||
l: prm.logger,
|
||||
}
|
||||
}
|
||||
|
||||
// ProcessEpoch looks for all objects with defined epoch in the storage
|
||||
// and passes their addresses to the NotificationWriter.
|
||||
func (n *Notificator) ProcessEpoch(ctx context.Context, epoch uint64) {
|
||||
logger := n.l.With(zap.Uint64("epoch", epoch))
|
||||
logger.Debug(logs.NotificatorNotificatorStartProcessingObjectNotifications)
|
||||
|
||||
n.ns.Iterate(ctx, epoch, func(topic string, addr oid.Address) {
|
||||
n.l.Debug(logs.NotificatorNotificatorProcessingObjectNotification,
|
||||
zap.String("topic", topic),
|
||||
zap.Stringer("address", addr),
|
||||
)
|
||||
|
||||
n.w.Notify(topic, addr)
|
||||
})
|
||||
}
|
Loading…
Reference in a new issue