morph: Fail if there is no events #1015

Closed
dstepanov-yadro wants to merge 2 commits from dstepanov-yadro/frostfs-node:fix/morph_reconnect into master
13 changed files with 90 additions and 58 deletions

View file

@ -118,6 +118,7 @@ func setMainNetDefaults(cfg *viper.Viper) {
cfg.SetDefault("mainnet.endpoint.client", []string{}) cfg.SetDefault("mainnet.endpoint.client", []string{})
cfg.SetDefault("mainnet.dial_timeout", 15*time.Second) cfg.SetDefault("mainnet.dial_timeout", 15*time.Second)
cfg.SetDefault("mainnet.switch_interval", 2*time.Minute) cfg.SetDefault("mainnet.switch_interval", 2*time.Minute)
cfg.SetDefault("mainnet.inactivity_timeout", 20*time.Minute)
} }
func setMorphDefaults(cfg *viper.Viper) { func setMorphDefaults(cfg *viper.Viper) {
@ -125,4 +126,5 @@ func setMorphDefaults(cfg *viper.Viper) {
cfg.SetDefault("morph.dial_timeout", 15*time.Second) cfg.SetDefault("morph.dial_timeout", 15*time.Second)
cfg.SetDefault("morph.validators", []string{}) cfg.SetDefault("morph.validators", []string{})
cfg.SetDefault("morph.switch_interval", 2*time.Minute) cfg.SetDefault("morph.switch_interval", 2*time.Minute)
cfg.SetDefault("morph.inactivity_timeout", 20*time.Minute)
} }

View file

@ -558,6 +558,8 @@ type cfgMorph struct {
cacheTTL time.Duration cacheTTL time.Duration
proxyScriptHash neogoutil.Uint160 proxyScriptHash neogoutil.Uint160
inactivityTimeout time.Duration
} }
type cfgAccounting struct { type cfgAccounting struct {

View file

@ -25,6 +25,9 @@ const (
// SwitchIntervalDefault is a default Neo RPCs switch interval. // SwitchIntervalDefault is a default Neo RPCs switch interval.
SwitchIntervalDefault = 2 * time.Minute SwitchIntervalDefault = 2 * time.Minute
// InactivityTimeoutDefault is a default Neo RPCs inactivity timeout.
InactivityTimeoutDefault = 20 * time.Minute
) )
// RPCEndpoint returns list of the values of "rpc_endpoint" config parameter // RPCEndpoint returns list of the values of "rpc_endpoint" config parameter
@ -97,3 +100,16 @@ func SwitchInterval(c *config.Config) time.Duration {
return SwitchIntervalDefault return SwitchIntervalDefault
} }
// InactivityTimeout returns the value of "inactivity_timeout" config parameter
// from "morph" section.
//
// Returns MaxInactivityIntervalDefault if value is not positive duration.
func InactivityTimeout(c *config.Config) time.Duration {
res := config.DurationSafe(c.Sub(subsection), "inactivity_timeout")
if res != 0 {
return res
}
return InactivityTimeoutDefault
}

View file

@ -19,13 +19,20 @@ func TestMorphSection(t *testing.T) {
require.Equal(t, morphconfig.DialTimeoutDefault, morphconfig.DialTimeout(empty)) require.Equal(t, morphconfig.DialTimeoutDefault, morphconfig.DialTimeout(empty))
require.Equal(t, morphconfig.CacheTTLDefault, morphconfig.CacheTTL(empty)) require.Equal(t, morphconfig.CacheTTLDefault, morphconfig.CacheTTL(empty))
require.Equal(t, morphconfig.SwitchIntervalDefault, morphconfig.SwitchInterval(empty)) require.Equal(t, morphconfig.SwitchIntervalDefault, morphconfig.SwitchInterval(empty))
require.Equal(t, morphconfig.InactivityTimeoutDefault, morphconfig.InactivityTimeout(empty))
}) })
const path = "../../../../config/example/node" const path = "../../../../config/example/node"
rpcs := []client.Endpoint{ rpcs := []client.Endpoint{
{"wss://rpc1.morph.frostfs.info:40341/ws", 1}, {
{"wss://rpc2.morph.frostfs.info:40341/ws", 2}, Address: "wss://rpc1.morph.frostfs.info:40341/ws",
Priority: 1,
},
{
Address: "wss://rpc2.morph.frostfs.info:40341/ws",
Priority: 2,
},
} }
fileConfigTest := func(c *config.Config) { fileConfigTest := func(c *config.Config) {
@ -33,6 +40,7 @@ func TestMorphSection(t *testing.T) {
require.Equal(t, 30*time.Second, morphconfig.DialTimeout(c)) require.Equal(t, 30*time.Second, morphconfig.DialTimeout(c))
require.Equal(t, 15*time.Second, morphconfig.CacheTTL(c)) require.Equal(t, 15*time.Second, morphconfig.CacheTTL(c))
require.Equal(t, 3*time.Minute, morphconfig.SwitchInterval(c)) require.Equal(t, 3*time.Minute, morphconfig.SwitchInterval(c))
require.Equal(t, 30*time.Minute, morphconfig.InactivityTimeout(c))
} }
configtest.ForEachFileType(path, fileConfigTest) configtest.ForEachFileType(path, fileConfigTest)

View file

@ -85,6 +85,12 @@ func initMorphComponents(ctx context.Context, c *cfg) {
zap.Bool("sidechain_enabled", c.cfgMorph.notaryEnabled), zap.Bool("sidechain_enabled", c.cfgMorph.notaryEnabled),
) )
initNetmapSource(c)
c.cfgMorph.inactivityTimeout = morphconfig.InactivityTimeout(c.appCfg)
}
func initNetmapSource(c *cfg) {
wrap, err := nmClient.NewFromMorph(c.cfgMorph.client, c.cfgNetmap.scriptHash, 0, nmClient.TryNotary()) wrap, err := nmClient.NewFromMorph(c.cfgMorph.client, c.cfgNetmap.scriptHash, 0, nmClient.TryNotary())
fatalOnErr(err) fatalOnErr(err)
@ -105,7 +111,6 @@ func initMorphComponents(ctx context.Context, c *cfg) {
// use RPC node as source of netmap (with caching) // use RPC node as source of netmap (with caching)
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap) netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
} }
c.netMapSource = netmapSource c.netMapSource = netmapSource
c.cfgNetmap.wrapper = wrap c.cfgNetmap.wrapper = wrap
} }
@ -194,11 +199,7 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
c.log.Warn(logs.FrostFSNodeCantGetLastProcessedSideChainBlockNumber, zap.String("error", err.Error())) c.log.Warn(logs.FrostFSNodeCantGetLastProcessedSideChainBlockNumber, zap.String("error", err.Error()))
} }
subs, err = subscriber.New(ctx, &subscriber.Params{ subs, err = subscriber.New(ctx, c.log, c.cfgMorph.client, fromSideChainBlock, c.cfgMorph.inactivityTimeout, c.internalErr)
Log: c.log,
StartFromBlock: fromSideChainBlock,
Client: c.cfgMorph.client,
})
fatalOnErr(err) fatalOnErr(err)
lis, err := event.NewListener(event.ListenerParams{ lis, err := event.NewListener(event.ListenerParams{

View file

@ -11,11 +11,13 @@ FROSTFS_IR_MORPH_ENDPOINT_CLIENT_0_ADDRESS="wss://sidechain1.fs.neo.org:30333/ws
FROSTFS_IR_MORPH_ENDPOINT_CLIENT_1_ADDRESS="wss://sidechain2.fs.neo.org:30333/ws" FROSTFS_IR_MORPH_ENDPOINT_CLIENT_1_ADDRESS="wss://sidechain2.fs.neo.org:30333/ws"
FROSTFS_IR_MORPH_VALIDATORS="0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170" FROSTFS_IR_MORPH_VALIDATORS="0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170"
FROSTFS_IR_MORPH_SWITCH_INTERVAL=2m FROSTFS_IR_MORPH_SWITCH_INTERVAL=2m
FROSTFS_IR_MORPH_INACTIVITY_TIMEOUT=30m
FROSTFS_IR_MAINNET_DIAL_TIMEOUT=5s FROSTFS_IR_MAINNET_DIAL_TIMEOUT=5s
FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_0_ADDRESS="wss://mainchain1.fs.neo.org:30333/ws" FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_0_ADDRESS="wss://mainchain1.fs.neo.org:30333/ws"
FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_1_ADDRESS="wss://mainchain2.fs.neo.org:30333/ws" FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_1_ADDRESS="wss://mainchain2.fs.neo.org:30333/ws"
FROSTFS_IR_MAINNET_SWITCH_INTERVAL=2m FROSTFS_IR_MAINNET_SWITCH_INTERVAL=2m
FROSTFS_IR_MAINNET_INACTIVITY_TIMEOUT=30m
FROSTFS_IR_CONTROL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6" FROSTFS_IR_CONTROL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
FROSTFS_IR_CONTROL_GRPC_ENDPOINT=localhost:8090 FROSTFS_IR_CONTROL_GRPC_ENDPOINT=localhost:8090

View file

@ -19,10 +19,12 @@ morph:
validators: # List of hex-encoded 33-byte public keys of sidechain validators to vote for at application startup validators: # List of hex-encoded 33-byte public keys of sidechain validators to vote for at application startup
- 0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170 - 0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170
switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node
inactivity_timeout: 30m # interval b/w events after which the node fails with error
mainnet: mainnet:
dial_timeout: 5s # Timeout for RPC client connection to mainchain; ignore if mainchain is disabled dial_timeout: 5s # Timeout for RPC client connection to mainchain; ignore if mainchain is disabled
switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node
inactivity_timeout: 30m # interval b/w morph events after which the node fails with error
endpoint: endpoint:
client: # List of websocket RPC endpoints in mainchain; ignore if mainchain is disabled client: # List of websocket RPC endpoints in mainchain; ignore if mainchain is disabled
- address: wss://mainchain1.fs.neo.org:30333/ws - address: wss://mainchain1.fs.neo.org:30333/ws

View file

@ -66,6 +66,7 @@ FROSTFS_CONTRACTS_PROXY=ad7c6b55b737b696e5c82c85445040964a03e97f
FROSTFS_MORPH_DIAL_TIMEOUT=30s FROSTFS_MORPH_DIAL_TIMEOUT=30s
FROSTFS_MORPH_CACHE_TTL=15s FROSTFS_MORPH_CACHE_TTL=15s
FROSTFS_MORPH_SWITCH_INTERVAL=3m FROSTFS_MORPH_SWITCH_INTERVAL=3m
FROSTFS_MORPH_INACTIVITY_TIMEOUT=30m
FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.frostfs.info:40341/ws" FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.frostfs.info:40341/ws"
FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY=0 FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY=0
FROSTFS_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.frostfs.info:40341/ws" FROSTFS_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.frostfs.info:40341/ws"

View file

@ -101,6 +101,7 @@
"dial_timeout": "30s", "dial_timeout": "30s",
"cache_ttl": "15s", "cache_ttl": "15s",
"switch_interval": "3m", "switch_interval": "3m",
"inactivity_timeout": "30m",
"rpc_endpoint": [ "rpc_endpoint": [
{ {
"address": "wss://rpc1.morph.frostfs.info:40341/ws", "address": "wss://rpc1.morph.frostfs.info:40341/ws",

View file

@ -89,6 +89,7 @@ morph:
# Default value: block time. It is recommended to have this value less or equal to block time. # Default value: block time. It is recommended to have this value less or equal to block time.
# Cached entities: containers, container lists, eACL tables. # Cached entities: containers, container lists, eACL tables.
switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node
inactivity_timeout: 30m # interval b/w morph events after which the node fails with error
rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success
- address: wss://rpc1.morph.frostfs.info:40341/ws - address: wss://rpc1.morph.frostfs.info:40341/ws
priority: 0 priority: 0

View file

@ -76,7 +76,6 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
NodeStateSettings: netSettings, NodeStateSettings: netSettings,
}) })
if err != nil { if err != nil {
return err return err
} }
@ -105,6 +104,7 @@ func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *
s.log.Warn(logs.InnerringCantGetLastProcessedMainChainBlockNumber, zap.String("error", err.Error())) s.log.Warn(logs.InnerringCantGetLastProcessedMainChainBlockNumber, zap.String("error", err.Error()))
} }
mainnetChain.from = fromMainChainBlock mainnetChain.from = fromMainChainBlock
mainnetChain.inactivityTimeout = cfg.GetDuration("mainnet.inactivity_timeout")
// create mainnet client // create mainnet client
s.mainnetClient, err = createClient(ctx, mainnetChain, errChan) s.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
@ -113,7 +113,7 @@ func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *
} }
// create mainnet listener // create mainnet listener
s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain) s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain, errChan)
return err return err
} }
@ -461,12 +461,13 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
} }
morphChain := &chainParams{ morphChain := &chainParams{
log: s.log, log: s.log,
cfg: cfg, cfg: cfg,
key: s.key, key: s.key,
name: morphPrefix, name: morphPrefix,
from: fromSideChainBlock, from: fromSideChainBlock,
morphCacheMetric: s.irMetrics.MorphCacheMetrics(), morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
inactivityTimeout: cfg.GetDuration("morph.inactivity_timeout"),
} }
// create morph client // create morph client
@ -476,7 +477,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
} }
// create morph listener // create morph listener
s.morphListener, err = createListener(ctx, s.morphClient, morphChain) s.morphListener, err = createListener(ctx, s.morphClient, morphChain, errChan)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io" "io"
"sync/atomic" "sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
@ -106,13 +107,14 @@ type (
} }
chainParams struct { chainParams struct {
log *logger.Logger log *logger.Logger
cfg *viper.Viper cfg *viper.Viper
key *keys.PrivateKey key *keys.PrivateKey
name string name string
sgn *transaction.Signer sgn *transaction.Signer
from uint32 // block height from uint32 // block height
morphCacheMetric metrics.MorphCacheMetrics morphCacheMetric metrics.MorphCacheMetrics
inactivityTimeout time.Duration
} }
) )
@ -418,17 +420,13 @@ func (s *Server) initSdNotify(cfg *viper.Viper) (bool, error) {
return false, nil return false, nil
} }
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) { func createListener(ctx context.Context, cli *client.Client, p *chainParams, errCh chan<- error) (event.Listener, error) {
var ( var (
sub subscriber.Subscriber sub subscriber.Subscriber
err error err error
) )
sub, err = subscriber.New(ctx, &subscriber.Params{ sub, err = subscriber.New(ctx, p.log, cli, p.from, p.inactivityTimeout, errCh)
Log: p.log,
StartFromBlock: p.from,
Client: cli,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -2,9 +2,9 @@ package subscriber
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sync" "sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
@ -55,13 +55,9 @@ type (
subscribedEvents map[util.Uint160]bool subscribedEvents map[util.Uint160]bool
subscribedNotaryEvents map[util.Uint160]bool subscribedNotaryEvents map[util.Uint160]bool
subscribedToNewBlocks bool subscribedToNewBlocks bool
}
// Params is a group of Subscriber constructor parameters. deadlockDuration time.Duration
Params struct { deadlockErrCh chan<- error
Log *logger.Logger
StartFromBlock uint32
Client *client.Client
} }
) )
@ -73,14 +69,6 @@ func (s *subscriber) NotificationChannels() NotificationChannels {
} }
} }
var (
errNilParams = errors.New("chain/subscriber: config was not provided to the constructor")
errNilLogger = errors.New("chain/subscriber: logger was not provided to the constructor")
errNilClient = errors.New("chain/subscriber: client was not provided to the constructor")
)
func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error { func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -146,24 +134,18 @@ func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error
} }
// New is a constructs Neo:Morph event listener and returns Subscriber interface. // New is a constructs Neo:Morph event listener and returns Subscriber interface.
func New(ctx context.Context, p *Params) (Subscriber, error) { func New(ctx context.Context, log *logger.Logger,
switch { client *client.Client, startFromBlock uint32,
case p == nil: deadlockDuration time.Duration, deadlockErrCh chan<- error,
return nil, errNilParams ) (Subscriber, error) {
case p.Log == nil: err := awaitHeight(client, startFromBlock)
return nil, errNilLogger
case p.Client == nil:
return nil, errNilClient
}
err := awaitHeight(p.Client, p.StartFromBlock)
if err != nil { if err != nil {
return nil, err return nil, err
} }
sub := &subscriber{ sub := &subscriber{
log: p.Log, log: log,
client: p.Client, client: client,
notifyChan: make(chan *state.ContainedNotificationEvent), notifyChan: make(chan *state.ContainedNotificationEvent),
blockChan: make(chan *block.Block), blockChan: make(chan *block.Block),
notaryChan: make(chan *result.NotaryRequestEvent), notaryChan: make(chan *result.NotaryRequestEvent),
@ -172,6 +154,9 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
subscribedEvents: make(map[util.Uint160]bool), subscribedEvents: make(map[util.Uint160]bool),
subscribedNotaryEvents: make(map[util.Uint160]bool), subscribedNotaryEvents: make(map[util.Uint160]bool),
deadlockDuration: deadlockDuration,
deadlockErrCh: deadlockErrCh,
} }
// Worker listens all events from temporary NeoGo channel and puts them // Worker listens all events from temporary NeoGo channel and puts them
// into corresponding permanent channels. // into corresponding permanent channels.
@ -184,7 +169,13 @@ func (s *subscriber) routeNotifications(ctx context.Context) {
var ( var (
restoreCh = make(chan bool) restoreCh = make(chan bool)
restoreInProgress bool restoreInProgress bool
deadlockTimer = time.NewTimer(s.deadlockDuration)
) )
defer func() {
if !deadlockTimer.Stop() {
<-deadlockTimer.C
}
}()
routeloop: routeloop:
for { for {
@ -196,6 +187,7 @@ routeloop:
case <-ctx.Done(): case <-ctx.Done():
break routeloop break routeloop
case ev, ok := <-curr.NotifyChan: case ev, ok := <-curr.NotifyChan:
deadlockTimer.Reset(s.deadlockDuration)
if ok { if ok {
s.client.Metrics().IncNotificationCount("notify") s.client.Metrics().IncNotificationCount("notify")
s.notifyChan <- ev s.notifyChan <- ev
@ -203,6 +195,7 @@ routeloop:
connLost = true connLost = true
} }
case ev, ok := <-curr.BlockChan: case ev, ok := <-curr.BlockChan:
deadlockTimer.Reset(s.deadlockDuration)
if ok { if ok {
s.client.Metrics().IncNotificationCount("block") s.client.Metrics().IncNotificationCount("block")
s.client.Metrics().SetLastBlock(ev.Index) s.client.Metrics().SetLastBlock(ev.Index)
@ -211,6 +204,7 @@ routeloop:
connLost = true connLost = true
} }
case ev, ok := <-curr.NotaryChan: case ev, ok := <-curr.NotaryChan:
deadlockTimer.Reset(s.deadlockDuration)
if ok { if ok {
s.client.Metrics().IncNotificationCount("notary") s.client.Metrics().IncNotificationCount("notary")
s.notaryChan <- ev s.notaryChan <- ev
@ -218,10 +212,13 @@ routeloop:
connLost = true connLost = true
} }
case ok := <-restoreCh: case ok := <-restoreCh:
deadlockTimer.Reset(s.deadlockDuration)
restoreInProgress = false restoreInProgress = false
if !ok { if !ok {
connLost = true connLost = true
} }
case <-deadlockTimer.C:
s.deadlockErrCh <- fmt.Errorf("no morph events got for %s", s.deadlockDuration)
} }
if connLost { if connLost {
if !restoreInProgress { if !restoreInProgress {