forked from TrueCloudLab/frostfs-node
[#1161] node: Remove notification functionality
It is unused and will be reworked in future. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
67b3002743
commit
a849236b68
17 changed files with 0 additions and 678 deletions
|
@ -474,7 +474,6 @@ type cfg struct {
|
||||||
cfgNetmap cfgNetmap
|
cfgNetmap cfgNetmap
|
||||||
cfgControlService cfgControlService
|
cfgControlService cfgControlService
|
||||||
cfgObject cfgObject
|
cfgObject cfgObject
|
||||||
cfgNotifications cfgNotifications
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadCurrentNetMap reads network map which has been cached at the
|
// ReadCurrentNetMap reads network map which has been cached at the
|
||||||
|
@ -633,12 +632,6 @@ type cfgObject struct {
|
||||||
skipSessionTokenIssuerVerification bool
|
skipSessionTokenIssuerVerification bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgNotifications struct {
|
|
||||||
enabled bool
|
|
||||||
nw notificationWriter
|
|
||||||
defaultTopic string
|
|
||||||
}
|
|
||||||
|
|
||||||
type cfgLocalStorage struct {
|
type cfgLocalStorage struct {
|
||||||
localStorage *engine.StorageEngine
|
localStorage *engine.StorageEngine
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
|
@ -25,12 +24,6 @@ type PersistentStateConfig struct {
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
// NotificationConfig is a wrapper over "notification" config section
|
|
||||||
// which provides access to object notification configuration of node.
|
|
||||||
type NotificationConfig struct {
|
|
||||||
cfg *config.Config
|
|
||||||
}
|
|
||||||
|
|
||||||
// PersistentPolicyRulesConfig is a wrapper over "persistent_policy_rules" config section
|
// PersistentPolicyRulesConfig is a wrapper over "persistent_policy_rules" config section
|
||||||
// which provides access to persistent policy rules storage configuration of node.
|
// which provides access to persistent policy rules storage configuration of node.
|
||||||
type PersistentPolicyRulesConfig struct {
|
type PersistentPolicyRulesConfig struct {
|
||||||
|
@ -41,16 +34,12 @@ const (
|
||||||
subsection = "node"
|
subsection = "node"
|
||||||
persistentSessionsSubsection = "persistent_sessions"
|
persistentSessionsSubsection = "persistent_sessions"
|
||||||
persistentStateSubsection = "persistent_state"
|
persistentStateSubsection = "persistent_state"
|
||||||
notificationSubsection = "notification"
|
|
||||||
persistentPolicyRulesSubsection = "persistent_policy_rules"
|
persistentPolicyRulesSubsection = "persistent_policy_rules"
|
||||||
|
|
||||||
attributePrefix = "attribute"
|
attributePrefix = "attribute"
|
||||||
|
|
||||||
// PersistentStatePathDefault is a default path for persistent state file.
|
// PersistentStatePathDefault is a default path for persistent state file.
|
||||||
PersistentStatePathDefault = ".frostfs-storage-state"
|
PersistentStatePathDefault = ".frostfs-storage-state"
|
||||||
|
|
||||||
// NotificationTimeoutDefault is a default timeout for object notification operation.
|
|
||||||
NotificationTimeoutDefault = 5 * time.Second
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Key returns the value of "key" config parameter
|
// Key returns the value of "key" config parameter
|
||||||
|
@ -185,75 +174,6 @@ func (p PersistentStateConfig) Path() string {
|
||||||
return PersistentStatePathDefault
|
return PersistentStatePathDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notification returns structure that provides access to "notification"
|
|
||||||
// subsection of "node" section.
|
|
||||||
func Notification(c *config.Config) NotificationConfig {
|
|
||||||
return NotificationConfig{
|
|
||||||
c.Sub(subsection).Sub(notificationSubsection),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enabled returns the value of "enabled" config parameter from "notification"
|
|
||||||
// subsection of "node" section.
|
|
||||||
//
|
|
||||||
// Returns false if the value is not presented.
|
|
||||||
func (n NotificationConfig) Enabled() bool {
|
|
||||||
return config.BoolSafe(n.cfg, "enabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
// DefaultTopic returns the value of "default_topic" config parameter from
|
|
||||||
// "notification" subsection of "node" section.
|
|
||||||
//
|
|
||||||
// Returns empty string if the value is not presented.
|
|
||||||
func (n NotificationConfig) DefaultTopic() string {
|
|
||||||
return config.StringSafe(n.cfg, "default_topic")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Endpoint returns the value of "endpoint" config parameter from "notification"
|
|
||||||
// subsection of "node" section.
|
|
||||||
//
|
|
||||||
// Returns empty string if the value is not presented.
|
|
||||||
func (n NotificationConfig) Endpoint() string {
|
|
||||||
return config.StringSafe(n.cfg, "endpoint")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Timeout returns the value of "timeout" config parameter from "notification"
|
|
||||||
// subsection of "node" section.
|
|
||||||
//
|
|
||||||
// Returns NotificationTimeoutDefault if the value is not positive.
|
|
||||||
func (n NotificationConfig) Timeout() time.Duration {
|
|
||||||
v := config.DurationSafe(n.cfg, "timeout")
|
|
||||||
if v > 0 {
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
|
|
||||||
return NotificationTimeoutDefault
|
|
||||||
}
|
|
||||||
|
|
||||||
// CertPath returns the value of "certificate_path" config parameter from "notification"
|
|
||||||
// subsection of "node" section.
|
|
||||||
//
|
|
||||||
// Returns empty string if the value is not presented.
|
|
||||||
func (n NotificationConfig) CertPath() string {
|
|
||||||
return config.StringSafe(n.cfg, "certificate")
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyPath returns the value of "key_path" config parameter from
|
|
||||||
// "notification" subsection of "node" section.
|
|
||||||
//
|
|
||||||
// Returns empty string if the value is not presented.
|
|
||||||
func (n NotificationConfig) KeyPath() string {
|
|
||||||
return config.StringSafe(n.cfg, "key")
|
|
||||||
}
|
|
||||||
|
|
||||||
// CAPath returns the value of "ca_path" config parameter from
|
|
||||||
// "notification" subsection of "node" section.
|
|
||||||
//
|
|
||||||
// Returns empty string if the value is not presented.
|
|
||||||
func (n NotificationConfig) CAPath() string {
|
|
||||||
return config.StringSafe(n.cfg, "ca")
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// PermDefault is a default permission bits for local override storage file.
|
// PermDefault is a default permission bits for local override storage file.
|
||||||
PermDefault = 0o644
|
PermDefault = 0o644
|
||||||
|
|
|
@ -2,7 +2,6 @@ package nodeconfig
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||||
|
@ -33,25 +32,11 @@ func TestNodeSection(t *testing.T) {
|
||||||
relay := Relay(empty)
|
relay := Relay(empty)
|
||||||
persisessionsPath := PersistentSessions(empty).Path()
|
persisessionsPath := PersistentSessions(empty).Path()
|
||||||
persistatePath := PersistentState(empty).Path()
|
persistatePath := PersistentState(empty).Path()
|
||||||
notificationDefaultEnabled := Notification(empty).Enabled()
|
|
||||||
notificationDefaultEndpoint := Notification(empty).Endpoint()
|
|
||||||
notificationDefaultTimeout := Notification(empty).Timeout()
|
|
||||||
notificationDefaultTopic := Notification(empty).DefaultTopic()
|
|
||||||
notificationDefaultCertPath := Notification(empty).CertPath()
|
|
||||||
notificationDefaultKeyPath := Notification(empty).KeyPath()
|
|
||||||
notificationDefaultCAPath := Notification(empty).CAPath()
|
|
||||||
|
|
||||||
require.Empty(t, attribute)
|
require.Empty(t, attribute)
|
||||||
require.Equal(t, false, relay)
|
require.Equal(t, false, relay)
|
||||||
require.Equal(t, "", persisessionsPath)
|
require.Equal(t, "", persisessionsPath)
|
||||||
require.Equal(t, PersistentStatePathDefault, persistatePath)
|
require.Equal(t, PersistentStatePathDefault, persistatePath)
|
||||||
require.Equal(t, false, notificationDefaultEnabled)
|
|
||||||
require.Equal(t, "", notificationDefaultEndpoint)
|
|
||||||
require.Equal(t, NotificationTimeoutDefault, notificationDefaultTimeout)
|
|
||||||
require.Equal(t, "", notificationDefaultTopic)
|
|
||||||
require.Equal(t, "", notificationDefaultCertPath)
|
|
||||||
require.Equal(t, "", notificationDefaultKeyPath)
|
|
||||||
require.Equal(t, "", notificationDefaultCAPath)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const path = "../../../../config/example/node"
|
const path = "../../../../config/example/node"
|
||||||
|
@ -64,13 +49,6 @@ func TestNodeSection(t *testing.T) {
|
||||||
wKey := Wallet(c)
|
wKey := Wallet(c)
|
||||||
persisessionsPath := PersistentSessions(c).Path()
|
persisessionsPath := PersistentSessions(c).Path()
|
||||||
persistatePath := PersistentState(c).Path()
|
persistatePath := PersistentState(c).Path()
|
||||||
notificationEnabled := Notification(c).Enabled()
|
|
||||||
notificationEndpoint := Notification(c).Endpoint()
|
|
||||||
notificationTimeout := Notification(c).Timeout()
|
|
||||||
notificationDefaultTopic := Notification(c).DefaultTopic()
|
|
||||||
notificationCertPath := Notification(c).CertPath()
|
|
||||||
notificationKeyPath := Notification(c).KeyPath()
|
|
||||||
notificationCAPath := Notification(c).CAPath()
|
|
||||||
|
|
||||||
expectedAddr := []struct {
|
expectedAddr := []struct {
|
||||||
str string
|
str string
|
||||||
|
@ -122,13 +100,6 @@ func TestNodeSection(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, "/sessions", persisessionsPath)
|
require.Equal(t, "/sessions", persisessionsPath)
|
||||||
require.Equal(t, "/state", persistatePath)
|
require.Equal(t, "/state", persistatePath)
|
||||||
require.Equal(t, true, notificationEnabled)
|
|
||||||
require.Equal(t, "tls://localhost:4222", notificationEndpoint)
|
|
||||||
require.Equal(t, 6*time.Second, notificationTimeout)
|
|
||||||
require.Equal(t, "topic", notificationDefaultTopic)
|
|
||||||
require.Equal(t, "/cert/path", notificationCertPath)
|
|
||||||
require.Equal(t, "/key/path", notificationKeyPath)
|
|
||||||
require.Equal(t, "/ca/path", notificationCAPath)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
configtest.ForEachFileType(path, fileConfigTest)
|
configtest.ForEachFileType(path, fileConfigTest)
|
||||||
|
|
|
@ -111,7 +111,6 @@ func initApp(ctx context.Context, c *cfg) {
|
||||||
initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) })
|
initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) })
|
||||||
initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) })
|
initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) })
|
||||||
initAndLog(c, "session", initSessionService)
|
initAndLog(c, "session", initSessionService)
|
||||||
initAndLog(c, "notification", func(c *cfg) { initNotifications(ctx, c) })
|
|
||||||
initAndLog(c, "object", initObjectService)
|
initAndLog(c, "object", initObjectService)
|
||||||
initAndLog(c, "tree", initTreeService)
|
initAndLog(c, "tree", initTreeService)
|
||||||
initAndLog(c, "apemanager", initAPEManagerService)
|
initAndLog(c, "apemanager", initAPEManagerService)
|
||||||
|
@ -143,7 +142,6 @@ func stopAndLog(c *cfg, name string, stopper func() error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootUp(ctx context.Context, c *cfg) {
|
func bootUp(ctx context.Context, c *cfg) {
|
||||||
runAndLog(ctx, c, "NATS", true, connectNats)
|
|
||||||
runAndLog(ctx, c, "gRPC", false, func(_ context.Context, c *cfg) { serveGRPC(c) })
|
runAndLog(ctx, c, "gRPC", false, func(_ context.Context, c *cfg) { serveGRPC(c) })
|
||||||
runAndLog(ctx, c, "notary", true, makeAndWaitNotaryDeposit)
|
runAndLog(ctx, c, "notary", true, makeAndWaitNotaryDeposit)
|
||||||
|
|
||||||
|
|
|
@ -1,172 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/notificator"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/notificator/nats"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type notificationSource struct {
|
|
||||||
e *engine.StorageEngine
|
|
||||||
l *logger.Logger
|
|
||||||
defaultTopic string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *notificationSource) Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address)) {
|
|
||||||
log := n.l.With(zap.Uint64("epoch", epoch))
|
|
||||||
|
|
||||||
listRes, err := n.e.ListContainers(ctx, engine.ListContainersPrm{})
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.FrostFSNodeNotificatorCouldNotListContainers, zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
filters := objectSDK.NewSearchFilters()
|
|
||||||
filters.AddNotificationEpochFilter(epoch)
|
|
||||||
|
|
||||||
var selectPrm engine.SelectPrm
|
|
||||||
selectPrm.WithFilters(filters)
|
|
||||||
|
|
||||||
for _, c := range listRes.Containers() {
|
|
||||||
selectPrm.WithContainerID(c)
|
|
||||||
|
|
||||||
selectRes, err := n.e.Select(ctx, selectPrm)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.FrostFSNodeNotificatorCouldNotSelectObjectsFromContainer,
|
|
||||||
zap.Stringer("cid", c),
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, a := range selectRes.AddressList() {
|
|
||||||
err = n.processAddress(ctx, a, handler)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.FrostFSNodeNotificatorCouldNotProcessObject,
|
|
||||||
zap.Stringer("address", a),
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debug(logs.FrostFSNodeNotificatorFinishedProcessingObjectNotifications)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *notificationSource) processAddress(
|
|
||||||
ctx context.Context,
|
|
||||||
a oid.Address,
|
|
||||||
h func(topic string, addr oid.Address),
|
|
||||||
) error {
|
|
||||||
var prm engine.HeadPrm
|
|
||||||
prm.WithAddress(a)
|
|
||||||
|
|
||||||
res, err := n.e.Head(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ni, err := res.Header().NotificationInfo()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not retrieve notification topic from object: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
topic := ni.Topic()
|
|
||||||
|
|
||||||
if topic == "" {
|
|
||||||
topic = n.defaultTopic
|
|
||||||
}
|
|
||||||
|
|
||||||
h(topic, a)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type notificationWriter struct {
|
|
||||||
l *logger.Logger
|
|
||||||
w *nats.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n notificationWriter) Notify(topic string, address oid.Address) {
|
|
||||||
if err := n.w.Notify(topic, address); err != nil {
|
|
||||||
n.l.Warn(logs.FrostFSNodeCouldNotWriteObjectNotification,
|
|
||||||
zap.Stringer("address", address),
|
|
||||||
zap.String("topic", topic),
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func initNotifications(ctx context.Context, c *cfg) {
|
|
||||||
if nodeconfig.Notification(c.appCfg).Enabled() {
|
|
||||||
topic := nodeconfig.Notification(c.appCfg).DefaultTopic()
|
|
||||||
pubKey := hex.EncodeToString(c.cfgNodeInfo.localInfo.PublicKey())
|
|
||||||
|
|
||||||
if topic == "" {
|
|
||||||
topic = pubKey
|
|
||||||
}
|
|
||||||
|
|
||||||
natsSvc := nats.New(
|
|
||||||
nats.WithConnectionName("FrostFS Storage Node: "+pubKey), // connection name is used in the server side logs
|
|
||||||
nats.WithTimeout(nodeconfig.Notification(c.appCfg).Timeout()),
|
|
||||||
nats.WithClientCert(
|
|
||||||
nodeconfig.Notification(c.appCfg).CertPath(),
|
|
||||||
nodeconfig.Notification(c.appCfg).KeyPath(),
|
|
||||||
),
|
|
||||||
nats.WithRootCA(nodeconfig.Notification(c.appCfg).CAPath()),
|
|
||||||
nats.WithLogger(c.log),
|
|
||||||
)
|
|
||||||
|
|
||||||
c.cfgNotifications = cfgNotifications{
|
|
||||||
enabled: true,
|
|
||||||
nw: notificationWriter{
|
|
||||||
l: c.log,
|
|
||||||
w: natsSvc,
|
|
||||||
},
|
|
||||||
defaultTopic: topic,
|
|
||||||
}
|
|
||||||
|
|
||||||
n := notificator.New(new(notificator.Prm).
|
|
||||||
SetLogger(c.log).
|
|
||||||
SetNotificationSource(
|
|
||||||
¬ificationSource{
|
|
||||||
e: c.cfgObject.cfgLocalStorage.localStorage,
|
|
||||||
l: c.log,
|
|
||||||
defaultTopic: topic,
|
|
||||||
}).
|
|
||||||
SetWriter(c.cfgNotifications.nw),
|
|
||||||
)
|
|
||||||
|
|
||||||
addNewEpochAsyncNotificationHandler(c, func(e event.Event) {
|
|
||||||
ev := e.(netmap.NewEpoch)
|
|
||||||
|
|
||||||
n.ProcessEpoch(ctx, ev.EpochNumber())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func connectNats(ctx context.Context, c *cfg) {
|
|
||||||
if !c.cfgNotifications.enabled {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
endpoint := nodeconfig.Notification(c.appCfg).Endpoint()
|
|
||||||
err := c.cfgNotifications.nw.w.Connect(ctx, endpoint)
|
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Sprintf("could not connect to a nats endpoint %s: %v", endpoint, err))
|
|
||||||
}
|
|
||||||
c.log.Info(logs.NatsConnectedToEndpoint, zap.String("endpoint", endpoint))
|
|
||||||
}
|
|
|
@ -14,7 +14,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
|
@ -333,15 +332,6 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
|
||||||
engine: ls,
|
engine: ls,
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.cfgNotifications.enabled {
|
|
||||||
os = engineWithNotifications{
|
|
||||||
base: os,
|
|
||||||
nw: c.cfgNotifications.nw,
|
|
||||||
ns: c.cfgNetmap.state,
|
|
||||||
defaultTopic: c.cfgNotifications.defaultTopic,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return putsvc.NewService(
|
return putsvc.NewService(
|
||||||
keyStorage,
|
keyStorage,
|
||||||
c.putClientCache,
|
c.putClientCache,
|
||||||
|
@ -503,47 +493,6 @@ func (s *morphEACLFetcher) GetEACL(cnr cid.ID) (*containercore.EACL, error) {
|
||||||
return eaclInfo, nil
|
return eaclInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type engineWithNotifications struct {
|
|
||||||
base putsvc.ObjectStorage
|
|
||||||
nw notificationWriter
|
|
||||||
ns netmap.State
|
|
||||||
|
|
||||||
defaultTopic string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e engineWithNotifications) IsLocked(ctx context.Context, address oid.Address) (bool, error) {
|
|
||||||
return e.base.IsLocked(ctx, address)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e engineWithNotifications) Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID) error {
|
|
||||||
return e.base.Delete(ctx, tombstone, toDelete)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e engineWithNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
|
|
||||||
return e.base.Lock(ctx, locker, toLock)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e engineWithNotifications) Put(ctx context.Context, o *objectSDK.Object) error {
|
|
||||||
if err := e.base.Put(ctx, o); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ni, err := o.NotificationInfo()
|
|
||||||
if err == nil {
|
|
||||||
if epoch := ni.Epoch(); epoch == 0 || epoch == e.ns.CurrentEpoch() {
|
|
||||||
topic := ni.Topic()
|
|
||||||
|
|
||||||
if topic == "" {
|
|
||||||
topic = e.defaultTopic
|
|
||||||
}
|
|
||||||
|
|
||||||
e.nw.Notify(topic, objectCore.AddressOf(o))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type engineWithoutNotifications struct {
|
type engineWithoutNotifications struct {
|
||||||
engine *engine.StorageEngine
|
engine *engine.StorageEngine
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,13 +22,6 @@ FROSTFS_NODE_ATTRIBUTE_1="UN-LOCODE:RU MSK"
|
||||||
FROSTFS_NODE_RELAY=true
|
FROSTFS_NODE_RELAY=true
|
||||||
FROSTFS_NODE_PERSISTENT_SESSIONS_PATH=/sessions
|
FROSTFS_NODE_PERSISTENT_SESSIONS_PATH=/sessions
|
||||||
FROSTFS_NODE_PERSISTENT_STATE_PATH=/state
|
FROSTFS_NODE_PERSISTENT_STATE_PATH=/state
|
||||||
FROSTFS_NODE_NOTIFICATION_ENABLED=true
|
|
||||||
FROSTFS_NODE_NOTIFICATION_ENDPOINT=tls://localhost:4222
|
|
||||||
FROSTFS_NODE_NOTIFICATION_TIMEOUT=6s
|
|
||||||
FROSTFS_NODE_NOTIFICATION_DEFAULT_TOPIC=topic
|
|
||||||
FROSTFS_NODE_NOTIFICATION_CERTIFICATE=/cert/path
|
|
||||||
FROSTFS_NODE_NOTIFICATION_KEY=/key/path
|
|
||||||
FROSTFS_NODE_NOTIFICATION_CA=/ca/path
|
|
||||||
|
|
||||||
# Tree service section
|
# Tree service section
|
||||||
FROSTFS_TREE_ENABLED=true
|
FROSTFS_TREE_ENABLED=true
|
||||||
|
|
|
@ -36,15 +36,6 @@
|
||||||
},
|
},
|
||||||
"persistent_state": {
|
"persistent_state": {
|
||||||
"path": "/state"
|
"path": "/state"
|
||||||
},
|
|
||||||
"notification": {
|
|
||||||
"enabled": true,
|
|
||||||
"endpoint": "tls://localhost:4222",
|
|
||||||
"timeout": "6s",
|
|
||||||
"default_topic": "topic",
|
|
||||||
"certificate": "/cert/path",
|
|
||||||
"key": "/key/path",
|
|
||||||
"ca": "/ca/path"
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"grpc": {
|
"grpc": {
|
||||||
|
|
|
@ -35,14 +35,6 @@ node:
|
||||||
path: /sessions # path to persistent session tokens file of Storage node (default: in-memory sessions)
|
path: /sessions # path to persistent session tokens file of Storage node (default: in-memory sessions)
|
||||||
persistent_state:
|
persistent_state:
|
||||||
path: /state # path to persistent state file of Storage node
|
path: /state # path to persistent state file of Storage node
|
||||||
notification:
|
|
||||||
enabled: true # turn on object notification service
|
|
||||||
endpoint: "tls://localhost:4222" # notification server endpoint
|
|
||||||
timeout: "6s" # timeout for object notification client connection
|
|
||||||
default_topic: "topic" # default topic for object notifications if not found in object's meta
|
|
||||||
certificate: "/cert/path" # path to TLS certificate
|
|
||||||
key: "/key/path" # path to TLS key
|
|
||||||
ca: "/ca/path" # path to optional CA certificate
|
|
||||||
|
|
||||||
grpc:
|
grpc:
|
||||||
- endpoint: s01.frostfs.devenv:8080 # endpoint for gRPC server
|
- endpoint: s01.frostfs.devenv:8080 # endpoint for gRPC server
|
||||||
|
|
|
@ -322,14 +322,6 @@ node:
|
||||||
path: /sessions
|
path: /sessions
|
||||||
persistent_state:
|
persistent_state:
|
||||||
path: /state
|
path: /state
|
||||||
notification:
|
|
||||||
enabled: true
|
|
||||||
endpoint: tls://localhost:4222
|
|
||||||
timeout: 6s
|
|
||||||
default_topic: topic
|
|
||||||
certificate: /path/to/cert.pem
|
|
||||||
key: /path/to/key.pem
|
|
||||||
ca: /path/to/ca.pem
|
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|
@ -341,8 +333,6 @@ node:
|
||||||
| `relay` | `bool` | | Enable relay mode. |
|
| `relay` | `bool` | | Enable relay mode. |
|
||||||
| `persistent_sessions` | [Persistent sessions config](#persistent_sessions-subsection) | | Persistent session token store configuration. |
|
| `persistent_sessions` | [Persistent sessions config](#persistent_sessions-subsection) | | Persistent session token store configuration. |
|
||||||
| `persistent_state` | [Persistent state config](#persistent_state-subsection) | | Persistent state configuration. |
|
| `persistent_state` | [Persistent state config](#persistent_state-subsection) | | Persistent state configuration. |
|
||||||
| `notification` | [Notification config](#notification-subsection) | | NATS configuration. |
|
|
||||||
|
|
||||||
|
|
||||||
## `wallet` subsection
|
## `wallet` subsection
|
||||||
N3 wallet configuration.
|
N3 wallet configuration.
|
||||||
|
@ -369,19 +359,6 @@ It is used to correctly handle node restarts or crashes.
|
||||||
|-----------|----------|------------------------|------------------------|
|
|-----------|----------|------------------------|------------------------|
|
||||||
| `path` | `string` | `.frostfs-storage-state` | Path to the database. |
|
| `path` | `string` | `.frostfs-storage-state` | Path to the database. |
|
||||||
|
|
||||||
## `notification` subsection
|
|
||||||
This is an advanced section, use with caution.
|
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
|
||||||
|-----------------|------------|-------------------|-------------------------------------------------------------------|
|
|
||||||
| `enabled` | `bool` | `false` | Flag to enable the service. |
|
|
||||||
| `endpoint` | `string` | | NATS endpoint to connect to. |
|
|
||||||
| `timeout` | `duration` | `5s` | Timeout for the object notification operation. |
|
|
||||||
| `default_topic` | `string` | node's public key | Default topic to use if an object has no corresponding attribute. |
|
|
||||||
| `certificate` | `string` | | Path to the client certificate. |
|
|
||||||
| `key` | `string` | | Path to the client key. |
|
|
||||||
| `ca` | `string` | | Override root CA used to verify server certificates. |
|
|
||||||
|
|
||||||
# `apiclient` section
|
# `apiclient` section
|
||||||
Configuration for the FrostFS API client used for communication with other FrostFS nodes.
|
Configuration for the FrostFS API client used for communication with other FrostFS nodes.
|
||||||
|
|
||||||
|
|
3
go.mod
3
go.mod
|
@ -23,7 +23,6 @@ require (
|
||||||
github.com/mitchellh/go-homedir v1.1.0
|
github.com/mitchellh/go-homedir v1.1.0
|
||||||
github.com/mr-tron/base58 v1.2.0
|
github.com/mr-tron/base58 v1.2.0
|
||||||
github.com/multiformats/go-multiaddr v0.12.1
|
github.com/multiformats/go-multiaddr v0.12.1
|
||||||
github.com/nats-io/nats.go v1.32.0
|
|
||||||
github.com/nspcc-dev/neo-go v0.106.0
|
github.com/nspcc-dev/neo-go v0.106.0
|
||||||
github.com/olekukonko/tablewriter v0.0.5
|
github.com/olekukonko/tablewriter v0.0.5
|
||||||
github.com/panjf2000/ants/v2 v2.9.0
|
github.com/panjf2000/ants/v2 v2.9.0
|
||||||
|
@ -96,8 +95,6 @@ require (
|
||||||
github.com/multiformats/go-multibase v0.2.0 // indirect
|
github.com/multiformats/go-multibase v0.2.0 // indirect
|
||||||
github.com/multiformats/go-multihash v0.2.3 // indirect
|
github.com/multiformats/go-multihash v0.2.3 // indirect
|
||||||
github.com/multiformats/go-varint v0.0.7 // indirect
|
github.com/multiformats/go-varint v0.0.7 // indirect
|
||||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
|
||||||
github.com/nats-io/nuid v1.0.1 // indirect
|
|
||||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
|
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
|
||||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
|
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
|
||||||
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
|
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
|
||||||
|
|
6
go.sum
6
go.sum
|
@ -184,12 +184,6 @@ github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7B
|
||||||
github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM=
|
github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM=
|
||||||
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
|
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
|
||||||
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
|
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
|
||||||
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
|
|
||||||
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
|
||||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
|
||||||
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
|
||||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
|
||||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
|
||||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 h1:mD9hU3v+zJcnHAVmHnZKt3I++tvn30gBj2rP2PocZMk=
|
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 h1:mD9hU3v+zJcnHAVmHnZKt3I++tvn30gBj2rP2PocZMk=
|
||||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2/go.mod h1:U5VfmPNM88P4RORFb6KSUVBdJBDhlqggJZYGXGPxOcc=
|
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2/go.mod h1:U5VfmPNM88P4RORFb6KSUVBdJBDhlqggJZYGXGPxOcc=
|
||||||
github.com/nspcc-dev/neo-go v0.106.0 h1:YiOdW/GcLmbVSvxMRfD5aytO6n3TDHrEA97VHMawy6g=
|
github.com/nspcc-dev/neo-go v0.106.0 h1:YiOdW/GcLmbVSvxMRfD5aytO6n3TDHrEA97VHMawy6g=
|
||||||
|
|
|
@ -132,10 +132,6 @@ const (
|
||||||
UtilCouldNotPushTaskToWorkerPool = "could not push task to worker pool"
|
UtilCouldNotPushTaskToWorkerPool = "could not push task to worker pool"
|
||||||
V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring"
|
V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring"
|
||||||
V2CantCheckIfRequestFromContainerNode = "can't check if request from container node"
|
V2CantCheckIfRequestFromContainerNode = "can't check if request from container node"
|
||||||
NatsNatsConnectionWasLost = "nats: connection was lost"
|
|
||||||
NatsNatsReconnectedToTheServer = "nats: reconnected to the server"
|
|
||||||
NatsNatsClosingConnectionAsTheContextIsDone = "nats: closing connection as the context is done"
|
|
||||||
NatsConnectedToEndpoint = "nats: successfully connected to endpoint"
|
|
||||||
ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch"
|
ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch"
|
||||||
ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch = "could not restore notification subscription after RPC switch"
|
ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch = "could not restore notification subscription after RPC switch"
|
||||||
ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch = "could not restore notary notification subscription after RPC switch"
|
ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch = "could not restore notary notification subscription after RPC switch"
|
||||||
|
|
|
@ -1,22 +0,0 @@
|
||||||
package notificator
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
)
|
|
||||||
|
|
||||||
// NotificationSource is a source of object notifications.
|
|
||||||
type NotificationSource interface {
|
|
||||||
// Iterate must iterate over all notifications for the
|
|
||||||
// provided epoch and call handler for all of them.
|
|
||||||
Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address))
|
|
||||||
}
|
|
||||||
|
|
||||||
// NotificationWriter notifies all the subscribers
|
|
||||||
// about new object notifications.
|
|
||||||
type NotificationWriter interface {
|
|
||||||
// Notify must notify about an event generated
|
|
||||||
// from an object with a specific topic.
|
|
||||||
Notify(topic string, address oid.Address)
|
|
||||||
}
|
|
|
@ -1,38 +0,0 @@
|
||||||
package nats
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
||||||
"github.com/nats-io/nats.go"
|
|
||||||
)
|
|
||||||
|
|
||||||
func WithClientCert(certPath, keyPath string) Option {
|
|
||||||
return func(o *opts) {
|
|
||||||
o.nOpts = append(o.nOpts, nats.ClientCert(certPath, keyPath))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithRootCA(paths ...string) Option {
|
|
||||||
return func(o *opts) {
|
|
||||||
o.nOpts = append(o.nOpts, nats.RootCAs(paths...))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithTimeout(timeout time.Duration) Option {
|
|
||||||
return func(o *opts) {
|
|
||||||
o.nOpts = append(o.nOpts, nats.Timeout(timeout))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithConnectionName(name string) Option {
|
|
||||||
return func(o *opts) {
|
|
||||||
o.nOpts = append(o.nOpts, nats.Name(name))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithLogger(logger *logger.Logger) Option {
|
|
||||||
return func(o *opts) {
|
|
||||||
o.log = logger
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,129 +0,0 @@
|
||||||
package nats
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"github.com/nats-io/nats.go"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Writer is a NATS object notification writer.
|
|
||||||
// It handles NATS JetStream connections and allows
|
|
||||||
// sending string representation of the address to
|
|
||||||
// the NATS server.
|
|
||||||
//
|
|
||||||
// For correct operation must be created via New function.
|
|
||||||
// new(Writer) or Writer{} construction leads to undefined
|
|
||||||
// behaviour and is not safe.
|
|
||||||
type Writer struct {
|
|
||||||
js nats.JetStreamContext
|
|
||||||
nc *nats.Conn
|
|
||||||
|
|
||||||
m sync.RWMutex
|
|
||||||
createdStreams map[string]struct{}
|
|
||||||
opts
|
|
||||||
}
|
|
||||||
|
|
||||||
type opts struct {
|
|
||||||
log *logger.Logger
|
|
||||||
nOpts []nats.Option
|
|
||||||
}
|
|
||||||
|
|
||||||
type Option func(*opts)
|
|
||||||
|
|
||||||
var errConnIsClosed = errors.New("connection to the server is closed")
|
|
||||||
|
|
||||||
// Notify sends object address's string representation to the provided topic.
|
|
||||||
// Uses first 4 bytes of object ID as a message ID to support 'exactly once'
|
|
||||||
// message delivery.
|
|
||||||
//
|
|
||||||
// Returns error only if:
|
|
||||||
// 1. underlying connection was closed and has not been established again;
|
|
||||||
// 2. NATS server could not respond that it has saved the message.
|
|
||||||
func (n *Writer) Notify(topic string, address oid.Address) error {
|
|
||||||
if !n.nc.IsConnected() {
|
|
||||||
return errConnIsClosed
|
|
||||||
}
|
|
||||||
|
|
||||||
// use first 4 byte of the encoded string as
|
|
||||||
// message ID for the 'exactly once' delivery
|
|
||||||
messageID := address.Object().EncodeToString()[:4]
|
|
||||||
|
|
||||||
// check if the stream was previously created
|
|
||||||
n.m.RLock()
|
|
||||||
_, created := n.createdStreams[topic]
|
|
||||||
n.m.RUnlock()
|
|
||||||
|
|
||||||
if !created {
|
|
||||||
_, err := n.js.AddStream(&nats.StreamConfig{
|
|
||||||
Name: topic,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not add stream: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
n.m.Lock()
|
|
||||||
n.createdStreams[topic] = struct{}{}
|
|
||||||
n.m.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := n.js.Publish(topic, []byte(address.EncodeToString()), nats.MsgId(messageID))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates new Writer.
|
|
||||||
func New(oo ...Option) *Writer {
|
|
||||||
w := &Writer{
|
|
||||||
createdStreams: make(map[string]struct{}),
|
|
||||||
opts: opts{
|
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
|
||||||
nOpts: make([]nats.Option, 0, len(oo)+3),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, o := range oo {
|
|
||||||
o(&w.opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
w.opts.nOpts = append(w.opts.nOpts,
|
|
||||||
nats.NoCallbacksAfterClientClose(), // do not call callbacks when it was planned writer stop
|
|
||||||
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
|
|
||||||
w.log.Error(logs.NatsNatsConnectionWasLost, zap.Error(err))
|
|
||||||
}),
|
|
||||||
nats.ReconnectHandler(func(_ *nats.Conn) {
|
|
||||||
w.log.Warn(logs.NatsNatsReconnectedToTheServer)
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
return w
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connect tries to connect to a specified NATS endpoint.
|
|
||||||
//
|
|
||||||
// Connection is closed when passed context is done.
|
|
||||||
func (n *Writer) Connect(ctx context.Context, endpoint string) error {
|
|
||||||
nc, err := nats.Connect(endpoint, n.opts.nOpts...)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not connect to server: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
n.nc = nc
|
|
||||||
|
|
||||||
// usage w/o options is error-free
|
|
||||||
n.js, _ = nc.JetStream()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
<-ctx.Done()
|
|
||||||
n.opts.log.Info(logs.NatsNatsClosingConnectionAsTheContextIsDone)
|
|
||||||
|
|
||||||
nc.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,88 +0,0 @@
|
||||||
package notificator
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Prm groups Notificator constructor's
|
|
||||||
// parameters. All are required.
|
|
||||||
type Prm struct {
|
|
||||||
writer NotificationWriter
|
|
||||||
notificationSource NotificationSource
|
|
||||||
logger *logger.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetLogger sets a logger.
|
|
||||||
func (prm *Prm) SetLogger(v *logger.Logger) *Prm {
|
|
||||||
prm.logger = v
|
|
||||||
return prm
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetWriter sets notification writer.
|
|
||||||
func (prm *Prm) SetWriter(v NotificationWriter) *Prm {
|
|
||||||
prm.writer = v
|
|
||||||
return prm
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetNotificationSource sets notification source.
|
|
||||||
func (prm *Prm) SetNotificationSource(v NotificationSource) *Prm {
|
|
||||||
prm.notificationSource = v
|
|
||||||
return prm
|
|
||||||
}
|
|
||||||
|
|
||||||
// Notificator is a notification producer that handles
|
|
||||||
// objects with defined notification epoch.
|
|
||||||
//
|
|
||||||
// Working client must be created via constructor New.
|
|
||||||
// Using the Client that has been created with new(Client)
|
|
||||||
// expression (or just declaring a Client variable) is unsafe
|
|
||||||
// and can lead to panic.
|
|
||||||
type Notificator struct {
|
|
||||||
w NotificationWriter
|
|
||||||
ns NotificationSource
|
|
||||||
l *logger.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates, initializes and returns the Notificator instance.
|
|
||||||
//
|
|
||||||
// Panics if any field of the passed Prm structure is not set/set
|
|
||||||
// to nil.
|
|
||||||
func New(prm *Prm) *Notificator {
|
|
||||||
panicOnNil := func(v any, name string) {
|
|
||||||
if v == nil {
|
|
||||||
panic(fmt.Sprintf("Notificator constructor: %s is nil\n", name))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
panicOnNil(prm.writer, "NotificationWriter")
|
|
||||||
panicOnNil(prm.notificationSource, "NotificationSource")
|
|
||||||
panicOnNil(prm.logger, "Logger")
|
|
||||||
|
|
||||||
return &Notificator{
|
|
||||||
w: prm.writer,
|
|
||||||
ns: prm.notificationSource,
|
|
||||||
l: prm.logger,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProcessEpoch looks for all objects with defined epoch in the storage
|
|
||||||
// and passes their addresses to the NotificationWriter.
|
|
||||||
func (n *Notificator) ProcessEpoch(ctx context.Context, epoch uint64) {
|
|
||||||
logger := n.l.With(zap.Uint64("epoch", epoch))
|
|
||||||
logger.Debug(logs.NotificatorNotificatorStartProcessingObjectNotifications)
|
|
||||||
|
|
||||||
n.ns.Iterate(ctx, epoch, func(topic string, addr oid.Address) {
|
|
||||||
n.l.Debug(logs.NotificatorNotificatorProcessingObjectNotification,
|
|
||||||
zap.String("topic", topic),
|
|
||||||
zap.Stringer("address", addr),
|
|
||||||
)
|
|
||||||
|
|
||||||
n.w.Notify(topic, addr)
|
|
||||||
})
|
|
||||||
}
|
|
Loading…
Reference in a new issue