morph: Fail if there is no events #1015

Closed
dstepanov-yadro wants to merge 2 commits from dstepanov-yadro/frostfs-node:fix/morph_reconnect into master
13 changed files with 90 additions and 58 deletions

View file

@ -118,6 +118,7 @@ func setMainNetDefaults(cfg *viper.Viper) {
cfg.SetDefault("mainnet.endpoint.client", []string{})
cfg.SetDefault("mainnet.dial_timeout", 15*time.Second)
cfg.SetDefault("mainnet.switch_interval", 2*time.Minute)
cfg.SetDefault("mainnet.inactivity_timeout", 20*time.Minute)
}
func setMorphDefaults(cfg *viper.Viper) {
@ -125,4 +126,5 @@ func setMorphDefaults(cfg *viper.Viper) {
cfg.SetDefault("morph.dial_timeout", 15*time.Second)
cfg.SetDefault("morph.validators", []string{})
cfg.SetDefault("morph.switch_interval", 2*time.Minute)
cfg.SetDefault("morph.inactivity_timeout", 20*time.Minute)
}

View file

@ -558,6 +558,8 @@ type cfgMorph struct {
cacheTTL time.Duration
proxyScriptHash neogoutil.Uint160
inactivityTimeout time.Duration
}
type cfgAccounting struct {

View file

@ -25,6 +25,9 @@ const (
// SwitchIntervalDefault is a default Neo RPCs switch interval.
SwitchIntervalDefault = 2 * time.Minute
// InactivityTimeoutDefault is a default Neo RPCs inactivity timeout.
InactivityTimeoutDefault = 20 * time.Minute
)
// RPCEndpoint returns list of the values of "rpc_endpoint" config parameter
@ -97,3 +100,16 @@ func SwitchInterval(c *config.Config) time.Duration {
return SwitchIntervalDefault
}
// InactivityTimeout returns the value of "inactivity_timeout" config parameter
// from "morph" section.
//
// Returns MaxInactivityIntervalDefault if value is not positive duration.
func InactivityTimeout(c *config.Config) time.Duration {
res := config.DurationSafe(c.Sub(subsection), "inactivity_timeout")
if res != 0 {
return res
}
return InactivityTimeoutDefault
}

View file

@ -19,13 +19,20 @@ func TestMorphSection(t *testing.T) {
require.Equal(t, morphconfig.DialTimeoutDefault, morphconfig.DialTimeout(empty))
require.Equal(t, morphconfig.CacheTTLDefault, morphconfig.CacheTTL(empty))
require.Equal(t, morphconfig.SwitchIntervalDefault, morphconfig.SwitchInterval(empty))
require.Equal(t, morphconfig.InactivityTimeoutDefault, morphconfig.InactivityTimeout(empty))
})
const path = "../../../../config/example/node"
rpcs := []client.Endpoint{
{"wss://rpc1.morph.frostfs.info:40341/ws", 1},
{"wss://rpc2.morph.frostfs.info:40341/ws", 2},
{
Address: "wss://rpc1.morph.frostfs.info:40341/ws",
Priority: 1,
},
{
Address: "wss://rpc2.morph.frostfs.info:40341/ws",
Priority: 2,
},
}
fileConfigTest := func(c *config.Config) {
@ -33,6 +40,7 @@ func TestMorphSection(t *testing.T) {
require.Equal(t, 30*time.Second, morphconfig.DialTimeout(c))
require.Equal(t, 15*time.Second, morphconfig.CacheTTL(c))
require.Equal(t, 3*time.Minute, morphconfig.SwitchInterval(c))
require.Equal(t, 30*time.Minute, morphconfig.InactivityTimeout(c))
}
configtest.ForEachFileType(path, fileConfigTest)

View file

@ -85,6 +85,12 @@ func initMorphComponents(ctx context.Context, c *cfg) {
zap.Bool("sidechain_enabled", c.cfgMorph.notaryEnabled),
)
initNetmapSource(c)
c.cfgMorph.inactivityTimeout = morphconfig.InactivityTimeout(c.appCfg)
}
func initNetmapSource(c *cfg) {
wrap, err := nmClient.NewFromMorph(c.cfgMorph.client, c.cfgNetmap.scriptHash, 0, nmClient.TryNotary())
fatalOnErr(err)
@ -105,7 +111,6 @@ func initMorphComponents(ctx context.Context, c *cfg) {
// use RPC node as source of netmap (with caching)
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
}
c.netMapSource = netmapSource
c.cfgNetmap.wrapper = wrap
}
@ -194,11 +199,7 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
c.log.Warn(logs.FrostFSNodeCantGetLastProcessedSideChainBlockNumber, zap.String("error", err.Error()))
}
subs, err = subscriber.New(ctx, &subscriber.Params{
Log: c.log,
StartFromBlock: fromSideChainBlock,
Client: c.cfgMorph.client,
})
subs, err = subscriber.New(ctx, c.log, c.cfgMorph.client, fromSideChainBlock, c.cfgMorph.inactivityTimeout, c.internalErr)
fatalOnErr(err)
lis, err := event.NewListener(event.ListenerParams{

View file

@ -11,11 +11,13 @@ FROSTFS_IR_MORPH_ENDPOINT_CLIENT_0_ADDRESS="wss://sidechain1.fs.neo.org:30333/ws
FROSTFS_IR_MORPH_ENDPOINT_CLIENT_1_ADDRESS="wss://sidechain2.fs.neo.org:30333/ws"
FROSTFS_IR_MORPH_VALIDATORS="0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170"
FROSTFS_IR_MORPH_SWITCH_INTERVAL=2m
FROSTFS_IR_MORPH_INACTIVITY_TIMEOUT=30m
FROSTFS_IR_MAINNET_DIAL_TIMEOUT=5s
FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_0_ADDRESS="wss://mainchain1.fs.neo.org:30333/ws"
FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_1_ADDRESS="wss://mainchain2.fs.neo.org:30333/ws"
FROSTFS_IR_MAINNET_SWITCH_INTERVAL=2m
FROSTFS_IR_MAINNET_INACTIVITY_TIMEOUT=30m
FROSTFS_IR_CONTROL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
FROSTFS_IR_CONTROL_GRPC_ENDPOINT=localhost:8090

View file

@ -19,10 +19,12 @@ morph:
validators: # List of hex-encoded 33-byte public keys of sidechain validators to vote for at application startup
- 0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170
switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node
inactivity_timeout: 30m # interval b/w events after which the node fails with error
mainnet:
dial_timeout: 5s # Timeout for RPC client connection to mainchain; ignore if mainchain is disabled
switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node
inactivity_timeout: 30m # interval b/w morph events after which the node fails with error
endpoint:
client: # List of websocket RPC endpoints in mainchain; ignore if mainchain is disabled
- address: wss://mainchain1.fs.neo.org:30333/ws

View file

@ -66,6 +66,7 @@ FROSTFS_CONTRACTS_PROXY=ad7c6b55b737b696e5c82c85445040964a03e97f
FROSTFS_MORPH_DIAL_TIMEOUT=30s
FROSTFS_MORPH_CACHE_TTL=15s
FROSTFS_MORPH_SWITCH_INTERVAL=3m
FROSTFS_MORPH_INACTIVITY_TIMEOUT=30m
FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.frostfs.info:40341/ws"
FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY=0
FROSTFS_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.frostfs.info:40341/ws"

View file

@ -101,6 +101,7 @@
"dial_timeout": "30s",
"cache_ttl": "15s",
"switch_interval": "3m",
"inactivity_timeout": "30m",
"rpc_endpoint": [
{
"address": "wss://rpc1.morph.frostfs.info:40341/ws",

View file

@ -89,6 +89,7 @@ morph:
# Default value: block time. It is recommended to have this value less or equal to block time.
# Cached entities: containers, container lists, eACL tables.
switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node
inactivity_timeout: 30m # interval b/w morph events after which the node fails with error
rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success
- address: wss://rpc1.morph.frostfs.info:40341/ws
priority: 0

View file

@ -76,7 +76,6 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
NodeStateSettings: netSettings,
})
if err != nil {
return err
}
@ -105,6 +104,7 @@ func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *
s.log.Warn(logs.InnerringCantGetLastProcessedMainChainBlockNumber, zap.String("error", err.Error()))
}
mainnetChain.from = fromMainChainBlock
mainnetChain.inactivityTimeout = cfg.GetDuration("mainnet.inactivity_timeout")
// create mainnet client
s.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
@ -113,7 +113,7 @@ func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *
}
// create mainnet listener
s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain)
s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain, errChan)
return err
}
@ -461,12 +461,13 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
}
morphChain := &chainParams{
log: s.log,
cfg: cfg,
key: s.key,
name: morphPrefix,
from: fromSideChainBlock,
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
log: s.log,
cfg: cfg,
key: s.key,
name: morphPrefix,
from: fromSideChainBlock,
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
inactivityTimeout: cfg.GetDuration("morph.inactivity_timeout"),
}
// create morph client
@ -476,7 +477,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
}
// create morph listener
s.morphListener, err = createListener(ctx, s.morphClient, morphChain)
s.morphListener, err = createListener(ctx, s.morphClient, morphChain, errChan)
if err != nil {
return nil, err
}

View file

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
@ -106,13 +107,14 @@ type (
}
chainParams struct {
log *logger.Logger
cfg *viper.Viper
key *keys.PrivateKey
name string
sgn *transaction.Signer
from uint32 // block height
morphCacheMetric metrics.MorphCacheMetrics
log *logger.Logger
cfg *viper.Viper
key *keys.PrivateKey
name string
sgn *transaction.Signer
from uint32 // block height
morphCacheMetric metrics.MorphCacheMetrics
inactivityTimeout time.Duration
}
)
@ -418,17 +420,13 @@ func (s *Server) initSdNotify(cfg *viper.Viper) (bool, error) {
return false, nil
}
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
func createListener(ctx context.Context, cli *client.Client, p *chainParams, errCh chan<- error) (event.Listener, error) {
var (
sub subscriber.Subscriber
err error
)
sub, err = subscriber.New(ctx, &subscriber.Params{
Log: p.log,
StartFromBlock: p.from,
Client: cli,
})
sub, err = subscriber.New(ctx, p.log, cli, p.from, p.inactivityTimeout, errCh)
if err != nil {
return nil, err
}

View file

@ -2,9 +2,9 @@ package subscriber
import (
"context"
"errors"
"fmt"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
@ -55,13 +55,9 @@ type (
subscribedEvents map[util.Uint160]bool
subscribedNotaryEvents map[util.Uint160]bool
subscribedToNewBlocks bool
}
// Params is a group of Subscriber constructor parameters.
Params struct {
Log *logger.Logger
StartFromBlock uint32
Client *client.Client
deadlockDuration time.Duration
deadlockErrCh chan<- error
}
)
@ -73,14 +69,6 @@ func (s *subscriber) NotificationChannels() NotificationChannels {
}
}
var (
errNilParams = errors.New("chain/subscriber: config was not provided to the constructor")
errNilLogger = errors.New("chain/subscriber: logger was not provided to the constructor")
errNilClient = errors.New("chain/subscriber: client was not provided to the constructor")
)
func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
s.Lock()
defer s.Unlock()
@ -146,24 +134,18 @@ func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error
}
// New is a constructs Neo:Morph event listener and returns Subscriber interface.
func New(ctx context.Context, p *Params) (Subscriber, error) {
switch {
case p == nil:
return nil, errNilParams
case p.Log == nil:
return nil, errNilLogger
case p.Client == nil:
return nil, errNilClient
}
err := awaitHeight(p.Client, p.StartFromBlock)
func New(ctx context.Context, log *logger.Logger,
client *client.Client, startFromBlock uint32,
deadlockDuration time.Duration, deadlockErrCh chan<- error,
) (Subscriber, error) {
err := awaitHeight(client, startFromBlock)
if err != nil {
return nil, err
}
sub := &subscriber{
log: p.Log,
client: p.Client,
log: log,
client: client,
notifyChan: make(chan *state.ContainedNotificationEvent),
blockChan: make(chan *block.Block),
notaryChan: make(chan *result.NotaryRequestEvent),
@ -172,6 +154,9 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
subscribedEvents: make(map[util.Uint160]bool),
subscribedNotaryEvents: make(map[util.Uint160]bool),
deadlockDuration: deadlockDuration,
deadlockErrCh: deadlockErrCh,
}
// Worker listens all events from temporary NeoGo channel and puts them
// into corresponding permanent channels.
@ -184,7 +169,13 @@ func (s *subscriber) routeNotifications(ctx context.Context) {
var (
restoreCh = make(chan bool)
restoreInProgress bool
deadlockTimer = time.NewTimer(s.deadlockDuration)
)
defer func() {
if !deadlockTimer.Stop() {
<-deadlockTimer.C
}
}()
routeloop:
for {
@ -196,6 +187,7 @@ routeloop:
case <-ctx.Done():
break routeloop
case ev, ok := <-curr.NotifyChan:
deadlockTimer.Reset(s.deadlockDuration)
if ok {
s.client.Metrics().IncNotificationCount("notify")
s.notifyChan <- ev
@ -203,6 +195,7 @@ routeloop:
connLost = true
}
case ev, ok := <-curr.BlockChan:
deadlockTimer.Reset(s.deadlockDuration)
if ok {
s.client.Metrics().IncNotificationCount("block")
s.client.Metrics().SetLastBlock(ev.Index)
@ -211,6 +204,7 @@ routeloop:
connLost = true
}
case ev, ok := <-curr.NotaryChan:
deadlockTimer.Reset(s.deadlockDuration)
if ok {
s.client.Metrics().IncNotificationCount("notary")
s.notaryChan <- ev
@ -218,10 +212,13 @@ routeloop:
connLost = true
}
case ok := <-restoreCh:
deadlockTimer.Reset(s.deadlockDuration)
restoreInProgress = false
if !ok {
connLost = true
}
case <-deadlockTimer.C:
s.deadlockErrCh <- fmt.Errorf("no morph events got for %s", s.deadlockDuration)
}
if connLost {
if !restoreInProgress {