morph: Fail if there is no events #1015
13 changed files with 90 additions and 58 deletions
|
@ -118,6 +118,7 @@ func setMainNetDefaults(cfg *viper.Viper) {
|
|||
cfg.SetDefault("mainnet.endpoint.client", []string{})
|
||||
cfg.SetDefault("mainnet.dial_timeout", 15*time.Second)
|
||||
cfg.SetDefault("mainnet.switch_interval", 2*time.Minute)
|
||||
cfg.SetDefault("mainnet.inactivity_timeout", 20*time.Minute)
|
||||
}
|
||||
|
||||
func setMorphDefaults(cfg *viper.Viper) {
|
||||
|
@ -125,4 +126,5 @@ func setMorphDefaults(cfg *viper.Viper) {
|
|||
cfg.SetDefault("morph.dial_timeout", 15*time.Second)
|
||||
cfg.SetDefault("morph.validators", []string{})
|
||||
cfg.SetDefault("morph.switch_interval", 2*time.Minute)
|
||||
cfg.SetDefault("morph.inactivity_timeout", 20*time.Minute)
|
||||
}
|
||||
|
|
|
@ -558,6 +558,8 @@ type cfgMorph struct {
|
|||
cacheTTL time.Duration
|
||||
|
||||
proxyScriptHash neogoutil.Uint160
|
||||
|
||||
inactivityTimeout time.Duration
|
||||
}
|
||||
|
||||
type cfgAccounting struct {
|
||||
|
|
|
@ -25,6 +25,9 @@ const (
|
|||
|
||||
// SwitchIntervalDefault is a default Neo RPCs switch interval.
|
||||
SwitchIntervalDefault = 2 * time.Minute
|
||||
|
||||
// InactivityTimeoutDefault is a default Neo RPCs inactivity timeout.
|
||||
InactivityTimeoutDefault = 20 * time.Minute
|
||||
)
|
||||
|
||||
// RPCEndpoint returns list of the values of "rpc_endpoint" config parameter
|
||||
|
@ -97,3 +100,16 @@ func SwitchInterval(c *config.Config) time.Duration {
|
|||
|
||||
return SwitchIntervalDefault
|
||||
}
|
||||
|
||||
// InactivityTimeout returns the value of "inactivity_timeout" config parameter
|
||||
// from "morph" section.
|
||||
//
|
||||
// Returns MaxInactivityIntervalDefault if value is not positive duration.
|
||||
func InactivityTimeout(c *config.Config) time.Duration {
|
||||
res := config.DurationSafe(c.Sub(subsection), "inactivity_timeout")
|
||||
if res != 0 {
|
||||
return res
|
||||
}
|
||||
|
||||
return InactivityTimeoutDefault
|
||||
}
|
||||
|
|
|
@ -19,13 +19,20 @@ func TestMorphSection(t *testing.T) {
|
|||
require.Equal(t, morphconfig.DialTimeoutDefault, morphconfig.DialTimeout(empty))
|
||||
require.Equal(t, morphconfig.CacheTTLDefault, morphconfig.CacheTTL(empty))
|
||||
require.Equal(t, morphconfig.SwitchIntervalDefault, morphconfig.SwitchInterval(empty))
|
||||
require.Equal(t, morphconfig.InactivityTimeoutDefault, morphconfig.InactivityTimeout(empty))
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
|
||||
rpcs := []client.Endpoint{
|
||||
{"wss://rpc1.morph.frostfs.info:40341/ws", 1},
|
||||
{"wss://rpc2.morph.frostfs.info:40341/ws", 2},
|
||||
{
|
||||
Address: "wss://rpc1.morph.frostfs.info:40341/ws",
|
||||
Priority: 1,
|
||||
},
|
||||
{
|
||||
Address: "wss://rpc2.morph.frostfs.info:40341/ws",
|
||||
Priority: 2,
|
||||
},
|
||||
}
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
|
@ -33,6 +40,7 @@ func TestMorphSection(t *testing.T) {
|
|||
require.Equal(t, 30*time.Second, morphconfig.DialTimeout(c))
|
||||
require.Equal(t, 15*time.Second, morphconfig.CacheTTL(c))
|
||||
require.Equal(t, 3*time.Minute, morphconfig.SwitchInterval(c))
|
||||
require.Equal(t, 30*time.Minute, morphconfig.InactivityTimeout(c))
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
|
|
@ -85,6 +85,12 @@ func initMorphComponents(ctx context.Context, c *cfg) {
|
|||
zap.Bool("sidechain_enabled", c.cfgMorph.notaryEnabled),
|
||||
)
|
||||
|
||||
initNetmapSource(c)
|
||||
|
||||
c.cfgMorph.inactivityTimeout = morphconfig.InactivityTimeout(c.appCfg)
|
||||
}
|
||||
|
||||
func initNetmapSource(c *cfg) {
|
||||
wrap, err := nmClient.NewFromMorph(c.cfgMorph.client, c.cfgNetmap.scriptHash, 0, nmClient.TryNotary())
|
||||
fatalOnErr(err)
|
||||
|
||||
|
@ -105,7 +111,6 @@ func initMorphComponents(ctx context.Context, c *cfg) {
|
|||
// use RPC node as source of netmap (with caching)
|
||||
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
|
||||
}
|
||||
|
||||
c.netMapSource = netmapSource
|
||||
c.cfgNetmap.wrapper = wrap
|
||||
}
|
||||
|
@ -194,11 +199,7 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
|
|||
c.log.Warn(logs.FrostFSNodeCantGetLastProcessedSideChainBlockNumber, zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
subs, err = subscriber.New(ctx, &subscriber.Params{
|
||||
Log: c.log,
|
||||
StartFromBlock: fromSideChainBlock,
|
||||
Client: c.cfgMorph.client,
|
||||
})
|
||||
subs, err = subscriber.New(ctx, c.log, c.cfgMorph.client, fromSideChainBlock, c.cfgMorph.inactivityTimeout, c.internalErr)
|
||||
fatalOnErr(err)
|
||||
|
||||
lis, err := event.NewListener(event.ListenerParams{
|
||||
|
|
|
@ -11,11 +11,13 @@ FROSTFS_IR_MORPH_ENDPOINT_CLIENT_0_ADDRESS="wss://sidechain1.fs.neo.org:30333/ws
|
|||
FROSTFS_IR_MORPH_ENDPOINT_CLIENT_1_ADDRESS="wss://sidechain2.fs.neo.org:30333/ws"
|
||||
FROSTFS_IR_MORPH_VALIDATORS="0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170"
|
||||
FROSTFS_IR_MORPH_SWITCH_INTERVAL=2m
|
||||
FROSTFS_IR_MORPH_INACTIVITY_TIMEOUT=30m
|
||||
|
||||
FROSTFS_IR_MAINNET_DIAL_TIMEOUT=5s
|
||||
FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_0_ADDRESS="wss://mainchain1.fs.neo.org:30333/ws"
|
||||
FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_1_ADDRESS="wss://mainchain2.fs.neo.org:30333/ws"
|
||||
FROSTFS_IR_MAINNET_SWITCH_INTERVAL=2m
|
||||
FROSTFS_IR_MAINNET_INACTIVITY_TIMEOUT=30m
|
||||
|
||||
FROSTFS_IR_CONTROL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
|
||||
FROSTFS_IR_CONTROL_GRPC_ENDPOINT=localhost:8090
|
||||
|
|
|
@ -19,10 +19,12 @@ morph:
|
|||
validators: # List of hex-encoded 33-byte public keys of sidechain validators to vote for at application startup
|
||||
- 0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170
|
||||
switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node
|
||||
inactivity_timeout: 30m # interval b/w events after which the node fails with error
|
||||
|
||||
mainnet:
|
||||
dial_timeout: 5s # Timeout for RPC client connection to mainchain; ignore if mainchain is disabled
|
||||
switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node
|
||||
inactivity_timeout: 30m # interval b/w morph events after which the node fails with error
|
||||
endpoint:
|
||||
client: # List of websocket RPC endpoints in mainchain; ignore if mainchain is disabled
|
||||
- address: wss://mainchain1.fs.neo.org:30333/ws
|
||||
|
|
|
@ -66,6 +66,7 @@ FROSTFS_CONTRACTS_PROXY=ad7c6b55b737b696e5c82c85445040964a03e97f
|
|||
FROSTFS_MORPH_DIAL_TIMEOUT=30s
|
||||
FROSTFS_MORPH_CACHE_TTL=15s
|
||||
FROSTFS_MORPH_SWITCH_INTERVAL=3m
|
||||
FROSTFS_MORPH_INACTIVITY_TIMEOUT=30m
|
||||
FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.frostfs.info:40341/ws"
|
||||
FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY=0
|
||||
FROSTFS_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.frostfs.info:40341/ws"
|
||||
|
|
|
@ -101,6 +101,7 @@
|
|||
"dial_timeout": "30s",
|
||||
"cache_ttl": "15s",
|
||||
"switch_interval": "3m",
|
||||
"inactivity_timeout": "30m",
|
||||
"rpc_endpoint": [
|
||||
{
|
||||
"address": "wss://rpc1.morph.frostfs.info:40341/ws",
|
||||
|
|
|
@ -89,6 +89,7 @@ morph:
|
|||
# Default value: block time. It is recommended to have this value less or equal to block time.
|
||||
# Cached entities: containers, container lists, eACL tables.
|
||||
switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node
|
||||
inactivity_timeout: 30m # interval b/w morph events after which the node fails with error
|
||||
rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success
|
||||
- address: wss://rpc1.morph.frostfs.info:40341/ws
|
||||
priority: 0
|
||||
|
|
|
@ -76,7 +76,6 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
|
|||
|
||||
NodeStateSettings: netSettings,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -105,6 +104,7 @@ func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *
|
|||
s.log.Warn(logs.InnerringCantGetLastProcessedMainChainBlockNumber, zap.String("error", err.Error()))
|
||||
}
|
||||
mainnetChain.from = fromMainChainBlock
|
||||
mainnetChain.inactivityTimeout = cfg.GetDuration("mainnet.inactivity_timeout")
|
||||
|
||||
// create mainnet client
|
||||
s.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
|
||||
|
@ -113,7 +113,7 @@ func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *
|
|||
}
|
||||
|
||||
// create mainnet listener
|
||||
s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain)
|
||||
s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain, errChan)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -461,12 +461,13 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
|
|||
}
|
||||
|
||||
morphChain := &chainParams{
|
||||
log: s.log,
|
||||
cfg: cfg,
|
||||
key: s.key,
|
||||
name: morphPrefix,
|
||||
from: fromSideChainBlock,
|
||||
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
|
||||
log: s.log,
|
||||
cfg: cfg,
|
||||
key: s.key,
|
||||
name: morphPrefix,
|
||||
from: fromSideChainBlock,
|
||||
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
|
||||
inactivityTimeout: cfg.GetDuration("morph.inactivity_timeout"),
|
||||
}
|
||||
|
||||
// create morph client
|
||||
|
@ -476,7 +477,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
|
|||
}
|
||||
|
||||
// create morph listener
|
||||
s.morphListener, err = createListener(ctx, s.morphClient, morphChain)
|
||||
s.morphListener, err = createListener(ctx, s.morphClient, morphChain, errChan)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
|
||||
|
@ -106,13 +107,14 @@ type (
|
|||
}
|
||||
|
||||
chainParams struct {
|
||||
log *logger.Logger
|
||||
cfg *viper.Viper
|
||||
key *keys.PrivateKey
|
||||
name string
|
||||
sgn *transaction.Signer
|
||||
from uint32 // block height
|
||||
morphCacheMetric metrics.MorphCacheMetrics
|
||||
log *logger.Logger
|
||||
cfg *viper.Viper
|
||||
key *keys.PrivateKey
|
||||
name string
|
||||
sgn *transaction.Signer
|
||||
from uint32 // block height
|
||||
morphCacheMetric metrics.MorphCacheMetrics
|
||||
inactivityTimeout time.Duration
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -418,17 +420,13 @@ func (s *Server) initSdNotify(cfg *viper.Viper) (bool, error) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
|
||||
func createListener(ctx context.Context, cli *client.Client, p *chainParams, errCh chan<- error) (event.Listener, error) {
|
||||
var (
|
||||
sub subscriber.Subscriber
|
||||
err error
|
||||
)
|
||||
|
||||
sub, err = subscriber.New(ctx, &subscriber.Params{
|
||||
Log: p.log,
|
||||
StartFromBlock: p.from,
|
||||
Client: cli,
|
||||
})
|
||||
sub, err = subscriber.New(ctx, p.log, cli, p.from, p.inactivityTimeout, errCh)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -2,9 +2,9 @@ package subscriber
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
|
@ -55,13 +55,9 @@ type (
|
|||
subscribedEvents map[util.Uint160]bool
|
||||
subscribedNotaryEvents map[util.Uint160]bool
|
||||
subscribedToNewBlocks bool
|
||||
}
|
||||
|
||||
// Params is a group of Subscriber constructor parameters.
|
||||
Params struct {
|
||||
Log *logger.Logger
|
||||
StartFromBlock uint32
|
||||
Client *client.Client
|
||||
deadlockDuration time.Duration
|
||||
deadlockErrCh chan<- error
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -73,14 +69,6 @@ func (s *subscriber) NotificationChannels() NotificationChannels {
|
|||
}
|
||||
}
|
||||
|
||||
var (
|
||||
errNilParams = errors.New("chain/subscriber: config was not provided to the constructor")
|
||||
|
||||
errNilLogger = errors.New("chain/subscriber: logger was not provided to the constructor")
|
||||
|
||||
errNilClient = errors.New("chain/subscriber: client was not provided to the constructor")
|
||||
)
|
||||
|
||||
func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
@ -146,24 +134,18 @@ func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error
|
|||
}
|
||||
|
||||
// New is a constructs Neo:Morph event listener and returns Subscriber interface.
|
||||
func New(ctx context.Context, p *Params) (Subscriber, error) {
|
||||
switch {
|
||||
case p == nil:
|
||||
return nil, errNilParams
|
||||
case p.Log == nil:
|
||||
return nil, errNilLogger
|
||||
case p.Client == nil:
|
||||
return nil, errNilClient
|
||||
}
|
||||
|
||||
err := awaitHeight(p.Client, p.StartFromBlock)
|
||||
func New(ctx context.Context, log *logger.Logger,
|
||||
client *client.Client, startFromBlock uint32,
|
||||
deadlockDuration time.Duration, deadlockErrCh chan<- error,
|
||||
) (Subscriber, error) {
|
||||
err := awaitHeight(client, startFromBlock)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sub := &subscriber{
|
||||
log: p.Log,
|
||||
client: p.Client,
|
||||
log: log,
|
||||
client: client,
|
||||
notifyChan: make(chan *state.ContainedNotificationEvent),
|
||||
blockChan: make(chan *block.Block),
|
||||
notaryChan: make(chan *result.NotaryRequestEvent),
|
||||
|
@ -172,6 +154,9 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
|
|||
|
||||
subscribedEvents: make(map[util.Uint160]bool),
|
||||
subscribedNotaryEvents: make(map[util.Uint160]bool),
|
||||
|
||||
deadlockDuration: deadlockDuration,
|
||||
deadlockErrCh: deadlockErrCh,
|
||||
}
|
||||
// Worker listens all events from temporary NeoGo channel and puts them
|
||||
// into corresponding permanent channels.
|
||||
|
@ -184,7 +169,13 @@ func (s *subscriber) routeNotifications(ctx context.Context) {
|
|||
var (
|
||||
restoreCh = make(chan bool)
|
||||
restoreInProgress bool
|
||||
deadlockTimer = time.NewTimer(s.deadlockDuration)
|
||||
)
|
||||
defer func() {
|
||||
if !deadlockTimer.Stop() {
|
||||
<-deadlockTimer.C
|
||||
}
|
||||
}()
|
||||
|
||||
routeloop:
|
||||
for {
|
||||
|
@ -196,6 +187,7 @@ routeloop:
|
|||
case <-ctx.Done():
|
||||
break routeloop
|
||||
case ev, ok := <-curr.NotifyChan:
|
||||
deadlockTimer.Reset(s.deadlockDuration)
|
||||
if ok {
|
||||
s.client.Metrics().IncNotificationCount("notify")
|
||||
s.notifyChan <- ev
|
||||
|
@ -203,6 +195,7 @@ routeloop:
|
|||
connLost = true
|
||||
}
|
||||
case ev, ok := <-curr.BlockChan:
|
||||
deadlockTimer.Reset(s.deadlockDuration)
|
||||
if ok {
|
||||
s.client.Metrics().IncNotificationCount("block")
|
||||
s.client.Metrics().SetLastBlock(ev.Index)
|
||||
|
@ -211,6 +204,7 @@ routeloop:
|
|||
connLost = true
|
||||
}
|
||||
case ev, ok := <-curr.NotaryChan:
|
||||
deadlockTimer.Reset(s.deadlockDuration)
|
||||
if ok {
|
||||
s.client.Metrics().IncNotificationCount("notary")
|
||||
s.notaryChan <- ev
|
||||
|
@ -218,10 +212,13 @@ routeloop:
|
|||
connLost = true
|
||||
}
|
||||
case ok := <-restoreCh:
|
||||
deadlockTimer.Reset(s.deadlockDuration)
|
||||
restoreInProgress = false
|
||||
if !ok {
|
||||
connLost = true
|
||||
}
|
||||
case <-deadlockTimer.C:
|
||||
s.deadlockErrCh <- fmt.Errorf("no morph events got for %s", s.deadlockDuration)
|
||||
}
|
||||
if connLost {
|
||||
if !restoreInProgress {
|
||||
|
|
Loading…
Reference in a new issue