diff --git a/cmd/frostfs-ir/defaults.go b/cmd/frostfs-ir/defaults.go index 127a68b29..f99500119 100644 --- a/cmd/frostfs-ir/defaults.go +++ b/cmd/frostfs-ir/defaults.go @@ -118,6 +118,7 @@ func setMainNetDefaults(cfg *viper.Viper) { cfg.SetDefault("mainnet.endpoint.client", []string{}) cfg.SetDefault("mainnet.dial_timeout", 15*time.Second) cfg.SetDefault("mainnet.switch_interval", 2*time.Minute) + cfg.SetDefault("mainnet.inactivity_timeout", 20*time.Minute) } func setMorphDefaults(cfg *viper.Viper) { @@ -125,4 +126,5 @@ func setMorphDefaults(cfg *viper.Viper) { cfg.SetDefault("morph.dial_timeout", 15*time.Second) cfg.SetDefault("morph.validators", []string{}) cfg.SetDefault("morph.switch_interval", 2*time.Minute) + cfg.SetDefault("morph.inactivity_timeout", 20*time.Minute) } diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 50219a8c7..3f0819300 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -558,6 +558,8 @@ type cfgMorph struct { cacheTTL time.Duration proxyScriptHash neogoutil.Uint160 + + inactivityTimeout time.Duration } type cfgAccounting struct { diff --git a/cmd/frostfs-node/config/morph/config.go b/cmd/frostfs-node/config/morph/config.go index 4ab608ef3..5d51d87af 100644 --- a/cmd/frostfs-node/config/morph/config.go +++ b/cmd/frostfs-node/config/morph/config.go @@ -25,6 +25,9 @@ const ( // SwitchIntervalDefault is a default Neo RPCs switch interval. SwitchIntervalDefault = 2 * time.Minute + + // InactivityTimeoutDefault is a default Neo RPCs inactivity timeout. + InactivityTimeoutDefault = 20 * time.Minute ) // RPCEndpoint returns list of the values of "rpc_endpoint" config parameter @@ -97,3 +100,16 @@ func SwitchInterval(c *config.Config) time.Duration { return SwitchIntervalDefault } + +// InactivityTimeout returns the value of "inactivity_timeout" config parameter +// from "morph" section. +// +// Returns MaxInactivityIntervalDefault if value is not positive duration. +func InactivityTimeout(c *config.Config) time.Duration { + res := config.DurationSafe(c.Sub(subsection), "inactivity_timeout") + if res != 0 { + return res + } + + return InactivityTimeoutDefault +} diff --git a/cmd/frostfs-node/config/morph/config_test.go b/cmd/frostfs-node/config/morph/config_test.go index 192140446..3e6b9d18a 100644 --- a/cmd/frostfs-node/config/morph/config_test.go +++ b/cmd/frostfs-node/config/morph/config_test.go @@ -19,13 +19,20 @@ func TestMorphSection(t *testing.T) { require.Equal(t, morphconfig.DialTimeoutDefault, morphconfig.DialTimeout(empty)) require.Equal(t, morphconfig.CacheTTLDefault, morphconfig.CacheTTL(empty)) require.Equal(t, morphconfig.SwitchIntervalDefault, morphconfig.SwitchInterval(empty)) + require.Equal(t, morphconfig.InactivityTimeoutDefault, morphconfig.InactivityTimeout(empty)) }) const path = "../../../../config/example/node" rpcs := []client.Endpoint{ - {"wss://rpc1.morph.frostfs.info:40341/ws", 1}, - {"wss://rpc2.morph.frostfs.info:40341/ws", 2}, + { + Address: "wss://rpc1.morph.frostfs.info:40341/ws", + Priority: 1, + }, + { + Address: "wss://rpc2.morph.frostfs.info:40341/ws", + Priority: 2, + }, } fileConfigTest := func(c *config.Config) { @@ -33,6 +40,7 @@ func TestMorphSection(t *testing.T) { require.Equal(t, 30*time.Second, morphconfig.DialTimeout(c)) require.Equal(t, 15*time.Second, morphconfig.CacheTTL(c)) require.Equal(t, 3*time.Minute, morphconfig.SwitchInterval(c)) + require.Equal(t, 30*time.Minute, morphconfig.InactivityTimeout(c)) } configtest.ForEachFileType(path, fileConfigTest) diff --git a/cmd/frostfs-node/morph.go b/cmd/frostfs-node/morph.go index 698fb3b83..11563bf47 100644 --- a/cmd/frostfs-node/morph.go +++ b/cmd/frostfs-node/morph.go @@ -85,6 +85,12 @@ func initMorphComponents(ctx context.Context, c *cfg) { zap.Bool("sidechain_enabled", c.cfgMorph.notaryEnabled), ) + initNetmapSource(c) + + c.cfgMorph.inactivityTimeout = morphconfig.InactivityTimeout(c.appCfg) +} + +func initNetmapSource(c *cfg) { wrap, err := nmClient.NewFromMorph(c.cfgMorph.client, c.cfgNetmap.scriptHash, 0, nmClient.TryNotary()) fatalOnErr(err) @@ -105,7 +111,6 @@ func initMorphComponents(ctx context.Context, c *cfg) { // use RPC node as source of netmap (with caching) netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap) } - c.netMapSource = netmapSource c.cfgNetmap.wrapper = wrap } @@ -194,11 +199,7 @@ func listenMorphNotifications(ctx context.Context, c *cfg) { c.log.Warn(logs.FrostFSNodeCantGetLastProcessedSideChainBlockNumber, zap.String("error", err.Error())) } - subs, err = subscriber.New(ctx, &subscriber.Params{ - Log: c.log, - StartFromBlock: fromSideChainBlock, - Client: c.cfgMorph.client, - }) + subs, err = subscriber.New(ctx, c.log, c.cfgMorph.client, fromSideChainBlock, c.cfgMorph.inactivityTimeout, c.internalErr) fatalOnErr(err) lis, err := event.NewListener(event.ListenerParams{ diff --git a/config/example/ir.env b/config/example/ir.env index 3f9530ab6..5116e71eb 100644 --- a/config/example/ir.env +++ b/config/example/ir.env @@ -11,11 +11,13 @@ FROSTFS_IR_MORPH_ENDPOINT_CLIENT_0_ADDRESS="wss://sidechain1.fs.neo.org:30333/ws FROSTFS_IR_MORPH_ENDPOINT_CLIENT_1_ADDRESS="wss://sidechain2.fs.neo.org:30333/ws" FROSTFS_IR_MORPH_VALIDATORS="0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170" FROSTFS_IR_MORPH_SWITCH_INTERVAL=2m +FROSTFS_IR_MORPH_INACTIVITY_TIMEOUT=30m FROSTFS_IR_MAINNET_DIAL_TIMEOUT=5s FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_0_ADDRESS="wss://mainchain1.fs.neo.org:30333/ws" FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_1_ADDRESS="wss://mainchain2.fs.neo.org:30333/ws" FROSTFS_IR_MAINNET_SWITCH_INTERVAL=2m +FROSTFS_IR_MAINNET_INACTIVITY_TIMEOUT=30m FROSTFS_IR_CONTROL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6" FROSTFS_IR_CONTROL_GRPC_ENDPOINT=localhost:8090 diff --git a/config/example/ir.yaml b/config/example/ir.yaml index 401328e72..773ad30da 100644 --- a/config/example/ir.yaml +++ b/config/example/ir.yaml @@ -19,10 +19,12 @@ morph: validators: # List of hex-encoded 33-byte public keys of sidechain validators to vote for at application startup - 0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170 switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node + inactivity_timeout: 30m # interval b/w events after which the node fails with error mainnet: dial_timeout: 5s # Timeout for RPC client connection to mainchain; ignore if mainchain is disabled switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node + inactivity_timeout: 30m # interval b/w morph events after which the node fails with error endpoint: client: # List of websocket RPC endpoints in mainchain; ignore if mainchain is disabled - address: wss://mainchain1.fs.neo.org:30333/ws diff --git a/config/example/node.env b/config/example/node.env index a1db0c876..821548bb3 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -66,6 +66,7 @@ FROSTFS_CONTRACTS_PROXY=ad7c6b55b737b696e5c82c85445040964a03e97f FROSTFS_MORPH_DIAL_TIMEOUT=30s FROSTFS_MORPH_CACHE_TTL=15s FROSTFS_MORPH_SWITCH_INTERVAL=3m +FROSTFS_MORPH_INACTIVITY_TIMEOUT=30m FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.frostfs.info:40341/ws" FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY=0 FROSTFS_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.frostfs.info:40341/ws" diff --git a/config/example/node.json b/config/example/node.json index a7d7a3651..0ebc6b43d 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -101,6 +101,7 @@ "dial_timeout": "30s", "cache_ttl": "15s", "switch_interval": "3m", + "inactivity_timeout": "30m", "rpc_endpoint": [ { "address": "wss://rpc1.morph.frostfs.info:40341/ws", diff --git a/config/example/node.yaml b/config/example/node.yaml index 678ee1a87..498a02095 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -89,6 +89,7 @@ morph: # Default value: block time. It is recommended to have this value less or equal to block time. # Cached entities: containers, container lists, eACL tables. switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node + inactivity_timeout: 30m # interval b/w morph events after which the node fails with error rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success - address: wss://rpc1.morph.frostfs.info:40341/ws priority: 0 diff --git a/pkg/innerring/initialization.go b/pkg/innerring/initialization.go index f4d9b4169..f3c0df318 100644 --- a/pkg/innerring/initialization.go +++ b/pkg/innerring/initialization.go @@ -76,7 +76,6 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper, NodeStateSettings: netSettings, }) - if err != nil { return err } @@ -105,6 +104,7 @@ func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain * s.log.Warn(logs.InnerringCantGetLastProcessedMainChainBlockNumber, zap.String("error", err.Error())) } mainnetChain.from = fromMainChainBlock + mainnetChain.inactivityTimeout = cfg.GetDuration("mainnet.inactivity_timeout") // create mainnet client s.mainnetClient, err = createClient(ctx, mainnetChain, errChan) @@ -113,7 +113,7 @@ func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain * } // create mainnet listener - s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain) + s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain, errChan) return err } @@ -461,12 +461,13 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- } morphChain := &chainParams{ - log: s.log, - cfg: cfg, - key: s.key, - name: morphPrefix, - from: fromSideChainBlock, - morphCacheMetric: s.irMetrics.MorphCacheMetrics(), + log: s.log, + cfg: cfg, + key: s.key, + name: morphPrefix, + from: fromSideChainBlock, + morphCacheMetric: s.irMetrics.MorphCacheMetrics(), + inactivityTimeout: cfg.GetDuration("morph.inactivity_timeout"), } // create morph client @@ -476,7 +477,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- } // create morph listener - s.morphListener, err = createListener(ctx, s.morphClient, morphChain) + s.morphListener, err = createListener(ctx, s.morphClient, morphChain, errChan) if err != nil { return nil, err } diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 5d7dc5a52..96db8edda 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "sync/atomic" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config" @@ -106,13 +107,14 @@ type ( } chainParams struct { - log *logger.Logger - cfg *viper.Viper - key *keys.PrivateKey - name string - sgn *transaction.Signer - from uint32 // block height - morphCacheMetric metrics.MorphCacheMetrics + log *logger.Logger + cfg *viper.Viper + key *keys.PrivateKey + name string + sgn *transaction.Signer + from uint32 // block height + morphCacheMetric metrics.MorphCacheMetrics + inactivityTimeout time.Duration } ) @@ -418,17 +420,13 @@ func (s *Server) initSdNotify(cfg *viper.Viper) (bool, error) { return false, nil } -func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) { +func createListener(ctx context.Context, cli *client.Client, p *chainParams, errCh chan<- error) (event.Listener, error) { var ( sub subscriber.Subscriber err error ) - sub, err = subscriber.New(ctx, &subscriber.Params{ - Log: p.log, - StartFromBlock: p.from, - Client: cli, - }) + sub, err = subscriber.New(ctx, p.log, cli, p.from, p.inactivityTimeout, errCh) if err != nil { return nil, err } diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index ee5466a7d..f0964d64c 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -2,9 +2,9 @@ package subscriber import ( "context" - "errors" "fmt" "sync" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" @@ -55,13 +55,9 @@ type ( subscribedEvents map[util.Uint160]bool subscribedNotaryEvents map[util.Uint160]bool subscribedToNewBlocks bool - } - // Params is a group of Subscriber constructor parameters. - Params struct { - Log *logger.Logger - StartFromBlock uint32 - Client *client.Client + deadlockDuration time.Duration + deadlockErrCh chan<- error } ) @@ -73,14 +69,6 @@ func (s *subscriber) NotificationChannels() NotificationChannels { } } -var ( - errNilParams = errors.New("chain/subscriber: config was not provided to the constructor") - - errNilLogger = errors.New("chain/subscriber: logger was not provided to the constructor") - - errNilClient = errors.New("chain/subscriber: client was not provided to the constructor") -) - func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error { s.Lock() defer s.Unlock() @@ -146,24 +134,18 @@ func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error } // New is a constructs Neo:Morph event listener and returns Subscriber interface. -func New(ctx context.Context, p *Params) (Subscriber, error) { - switch { - case p == nil: - return nil, errNilParams - case p.Log == nil: - return nil, errNilLogger - case p.Client == nil: - return nil, errNilClient - } - - err := awaitHeight(p.Client, p.StartFromBlock) +func New(ctx context.Context, log *logger.Logger, + client *client.Client, startFromBlock uint32, + deadlockDuration time.Duration, deadlockErrCh chan<- error, +) (Subscriber, error) { + err := awaitHeight(client, startFromBlock) if err != nil { return nil, err } sub := &subscriber{ - log: p.Log, - client: p.Client, + log: log, + client: client, notifyChan: make(chan *state.ContainedNotificationEvent), blockChan: make(chan *block.Block), notaryChan: make(chan *result.NotaryRequestEvent), @@ -172,6 +154,9 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { subscribedEvents: make(map[util.Uint160]bool), subscribedNotaryEvents: make(map[util.Uint160]bool), + + deadlockDuration: deadlockDuration, + deadlockErrCh: deadlockErrCh, } // Worker listens all events from temporary NeoGo channel and puts them // into corresponding permanent channels. @@ -184,7 +169,13 @@ func (s *subscriber) routeNotifications(ctx context.Context) { var ( restoreCh = make(chan bool) restoreInProgress bool + deadlockTimer = time.NewTimer(s.deadlockDuration) ) + defer func() { + if !deadlockTimer.Stop() { + <-deadlockTimer.C + } + }() routeloop: for { @@ -196,6 +187,7 @@ routeloop: case <-ctx.Done(): break routeloop case ev, ok := <-curr.NotifyChan: + deadlockTimer.Reset(s.deadlockDuration) if ok { s.client.Metrics().IncNotificationCount("notify") s.notifyChan <- ev @@ -203,6 +195,7 @@ routeloop: connLost = true } case ev, ok := <-curr.BlockChan: + deadlockTimer.Reset(s.deadlockDuration) if ok { s.client.Metrics().IncNotificationCount("block") s.client.Metrics().SetLastBlock(ev.Index) @@ -211,6 +204,7 @@ routeloop: connLost = true } case ev, ok := <-curr.NotaryChan: + deadlockTimer.Reset(s.deadlockDuration) if ok { s.client.Metrics().IncNotificationCount("notary") s.notaryChan <- ev @@ -218,10 +212,13 @@ routeloop: connLost = true } case ok := <-restoreCh: + deadlockTimer.Reset(s.deadlockDuration) restoreInProgress = false if !ok { connLost = true } + case <-deadlockTimer.C: + s.deadlockErrCh <- fmt.Errorf("no morph events got for %s", s.deadlockDuration) } if connLost { if !restoreInProgress {