morph: Fail if there is no events #1015
13 changed files with 86 additions and 57 deletions
|
@ -118,6 +118,7 @@ func setMainNetDefaults(cfg *viper.Viper) {
|
||||||
cfg.SetDefault("mainnet.endpoint.client", []string{})
|
cfg.SetDefault("mainnet.endpoint.client", []string{})
|
||||||
cfg.SetDefault("mainnet.dial_timeout", 15*time.Second)
|
cfg.SetDefault("mainnet.dial_timeout", 15*time.Second)
|
||||||
cfg.SetDefault("mainnet.switch_interval", 2*time.Minute)
|
cfg.SetDefault("mainnet.switch_interval", 2*time.Minute)
|
||||||
|
cfg.SetDefault("mainnet.inactivity_timeout", 20*time.Minute)
|
||||||
}
|
}
|
||||||
|
|
||||||
func setMorphDefaults(cfg *viper.Viper) {
|
func setMorphDefaults(cfg *viper.Viper) {
|
||||||
|
@ -125,4 +126,5 @@ func setMorphDefaults(cfg *viper.Viper) {
|
||||||
cfg.SetDefault("morph.dial_timeout", 15*time.Second)
|
cfg.SetDefault("morph.dial_timeout", 15*time.Second)
|
||||||
cfg.SetDefault("morph.validators", []string{})
|
cfg.SetDefault("morph.validators", []string{})
|
||||||
cfg.SetDefault("morph.switch_interval", 2*time.Minute)
|
cfg.SetDefault("morph.switch_interval", 2*time.Minute)
|
||||||
|
cfg.SetDefault("morph.inactivity_timeout", 20*time.Minute)
|
||||||
}
|
}
|
||||||
|
|
|
@ -558,6 +558,8 @@ type cfgMorph struct {
|
||||||
cacheTTL time.Duration
|
cacheTTL time.Duration
|
||||||
|
|
||||||
proxyScriptHash neogoutil.Uint160
|
proxyScriptHash neogoutil.Uint160
|
||||||
|
|
||||||
|
inactivityTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgAccounting struct {
|
type cfgAccounting struct {
|
||||||
|
|
|
@ -25,6 +25,9 @@ const (
|
||||||
|
|
||||||
// SwitchIntervalDefault is a default Neo RPCs switch interval.
|
// SwitchIntervalDefault is a default Neo RPCs switch interval.
|
||||||
SwitchIntervalDefault = 2 * time.Minute
|
SwitchIntervalDefault = 2 * time.Minute
|
||||||
|
|
||||||
|
// InactivityTimeoutDefault is a default Neo RPCs inactivity timeout.
|
||||||
|
InactivityTimeoutDefault = 20 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// RPCEndpoint returns list of the values of "rpc_endpoint" config parameter
|
// RPCEndpoint returns list of the values of "rpc_endpoint" config parameter
|
||||||
|
@ -97,3 +100,16 @@ func SwitchInterval(c *config.Config) time.Duration {
|
||||||
|
|
||||||
return SwitchIntervalDefault
|
return SwitchIntervalDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InactivityTimeout returns the value of "inactivity_timeout" config parameter
|
||||||
|
// from "morph" section.
|
||||||
|
//
|
||||||
|
// Returns MaxInactivityIntervalDefault if value is not positive duration.
|
||||||
|
func InactivityTimeout(c *config.Config) time.Duration {
|
||||||
|
res := config.DurationSafe(c.Sub(subsection), "inactivity_timeout")
|
||||||
|
if res != 0 {
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
return InactivityTimeoutDefault
|
||||||
|
}
|
||||||
|
|
|
@ -19,13 +19,20 @@ func TestMorphSection(t *testing.T) {
|
||||||
require.Equal(t, morphconfig.DialTimeoutDefault, morphconfig.DialTimeout(empty))
|
require.Equal(t, morphconfig.DialTimeoutDefault, morphconfig.DialTimeout(empty))
|
||||||
require.Equal(t, morphconfig.CacheTTLDefault, morphconfig.CacheTTL(empty))
|
require.Equal(t, morphconfig.CacheTTLDefault, morphconfig.CacheTTL(empty))
|
||||||
require.Equal(t, morphconfig.SwitchIntervalDefault, morphconfig.SwitchInterval(empty))
|
require.Equal(t, morphconfig.SwitchIntervalDefault, morphconfig.SwitchInterval(empty))
|
||||||
|
require.Equal(t, morphconfig.InactivityTimeoutDefault, morphconfig.InactivityTimeout(empty))
|
||||||
})
|
})
|
||||||
|
|
||||||
const path = "../../../../config/example/node"
|
const path = "../../../../config/example/node"
|
||||||
|
|
||||||
rpcs := []client.Endpoint{
|
rpcs := []client.Endpoint{
|
||||||
{"wss://rpc1.morph.frostfs.info:40341/ws", 1},
|
{
|
||||||
{"wss://rpc2.morph.frostfs.info:40341/ws", 2},
|
Address: "wss://rpc1.morph.frostfs.info:40341/ws",
|
||||||
|
Priority: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Address: "wss://rpc2.morph.frostfs.info:40341/ws",
|
||||||
|
Priority: 2,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
fileConfigTest := func(c *config.Config) {
|
fileConfigTest := func(c *config.Config) {
|
||||||
|
@ -33,6 +40,7 @@ func TestMorphSection(t *testing.T) {
|
||||||
require.Equal(t, 30*time.Second, morphconfig.DialTimeout(c))
|
require.Equal(t, 30*time.Second, morphconfig.DialTimeout(c))
|
||||||
require.Equal(t, 15*time.Second, morphconfig.CacheTTL(c))
|
require.Equal(t, 15*time.Second, morphconfig.CacheTTL(c))
|
||||||
require.Equal(t, 3*time.Minute, morphconfig.SwitchInterval(c))
|
require.Equal(t, 3*time.Minute, morphconfig.SwitchInterval(c))
|
||||||
|
require.Equal(t, 30*time.Minute, morphconfig.InactivityTimeout(c))
|
||||||
}
|
}
|
||||||
|
|
||||||
configtest.ForEachFileType(path, fileConfigTest)
|
configtest.ForEachFileType(path, fileConfigTest)
|
||||||
|
|
|
@ -106,6 +106,8 @@ func initMorphComponents(ctx context.Context, c *cfg) {
|
||||||
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
|
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.cfgMorph.inactivityTimeout = morphconfig.InactivityTimeout(c.appCfg)
|
||||||
|
|
||||||
c.netMapSource = netmapSource
|
c.netMapSource = netmapSource
|
||||||
c.cfgNetmap.wrapper = wrap
|
c.cfgNetmap.wrapper = wrap
|
||||||
}
|
}
|
||||||
|
@ -194,11 +196,7 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
|
||||||
c.log.Warn(logs.FrostFSNodeCantGetLastProcessedSideChainBlockNumber, zap.String("error", err.Error()))
|
c.log.Warn(logs.FrostFSNodeCantGetLastProcessedSideChainBlockNumber, zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
subs, err = subscriber.New(ctx, &subscriber.Params{
|
subs, err = subscriber.New(ctx, c.log, c.cfgMorph.client, fromSideChainBlock, c.cfgMorph.inactivityTimeout, c.internalErr)
|
||||||
Log: c.log,
|
|
||||||
StartFromBlock: fromSideChainBlock,
|
|
||||||
Client: c.cfgMorph.client,
|
|
||||||
})
|
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
lis, err := event.NewListener(event.ListenerParams{
|
lis, err := event.NewListener(event.ListenerParams{
|
||||||
|
|
|
@ -11,11 +11,13 @@ FROSTFS_IR_MORPH_ENDPOINT_CLIENT_0_ADDRESS="wss://sidechain1.fs.neo.org:30333/ws
|
||||||
FROSTFS_IR_MORPH_ENDPOINT_CLIENT_1_ADDRESS="wss://sidechain2.fs.neo.org:30333/ws"
|
FROSTFS_IR_MORPH_ENDPOINT_CLIENT_1_ADDRESS="wss://sidechain2.fs.neo.org:30333/ws"
|
||||||
FROSTFS_IR_MORPH_VALIDATORS="0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170"
|
FROSTFS_IR_MORPH_VALIDATORS="0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170"
|
||||||
FROSTFS_IR_MORPH_SWITCH_INTERVAL=2m
|
FROSTFS_IR_MORPH_SWITCH_INTERVAL=2m
|
||||||
|
FROSTFS_IR_MORPH_INACTIVITY_TIMEOUT=30m
|
||||||
|
|
||||||
FROSTFS_IR_MAINNET_DIAL_TIMEOUT=5s
|
FROSTFS_IR_MAINNET_DIAL_TIMEOUT=5s
|
||||||
FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_0_ADDRESS="wss://mainchain1.fs.neo.org:30333/ws"
|
FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_0_ADDRESS="wss://mainchain1.fs.neo.org:30333/ws"
|
||||||
FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_1_ADDRESS="wss://mainchain2.fs.neo.org:30333/ws"
|
FROSTFS_IR_MAINNET_ENDPOINT_CLIENT_1_ADDRESS="wss://mainchain2.fs.neo.org:30333/ws"
|
||||||
FROSTFS_IR_MAINNET_SWITCH_INTERVAL=2m
|
FROSTFS_IR_MAINNET_SWITCH_INTERVAL=2m
|
||||||
|
FROSTFS_IR_MAINNET_INACTIVITY_TIMEOUT=30m
|
||||||
|
|
||||||
FROSTFS_IR_CONTROL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
|
FROSTFS_IR_CONTROL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
|
||||||
FROSTFS_IR_CONTROL_GRPC_ENDPOINT=localhost:8090
|
FROSTFS_IR_CONTROL_GRPC_ENDPOINT=localhost:8090
|
||||||
|
|
|
@ -19,10 +19,12 @@ morph:
|
||||||
validators: # List of hex-encoded 33-byte public keys of sidechain validators to vote for at application startup
|
validators: # List of hex-encoded 33-byte public keys of sidechain validators to vote for at application startup
|
||||||
- 0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170
|
- 0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170
|
||||||
switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node
|
switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node
|
||||||
|
inactivity_timeout: 30m # interval b/w events after which the node fails with error
|
||||||
|
|
||||||
mainnet:
|
mainnet:
|
||||||
dial_timeout: 5s # Timeout for RPC client connection to mainchain; ignore if mainchain is disabled
|
dial_timeout: 5s # Timeout for RPC client connection to mainchain; ignore if mainchain is disabled
|
||||||
switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node
|
switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node
|
||||||
|
inactivity_timeout: 30m # interval b/w morph events after which the node fails with error
|
||||||
endpoint:
|
endpoint:
|
||||||
client: # List of websocket RPC endpoints in mainchain; ignore if mainchain is disabled
|
client: # List of websocket RPC endpoints in mainchain; ignore if mainchain is disabled
|
||||||
- address: wss://mainchain1.fs.neo.org:30333/ws
|
- address: wss://mainchain1.fs.neo.org:30333/ws
|
||||||
|
|
|
@ -66,6 +66,7 @@ FROSTFS_CONTRACTS_PROXY=ad7c6b55b737b696e5c82c85445040964a03e97f
|
||||||
FROSTFS_MORPH_DIAL_TIMEOUT=30s
|
FROSTFS_MORPH_DIAL_TIMEOUT=30s
|
||||||
FROSTFS_MORPH_CACHE_TTL=15s
|
FROSTFS_MORPH_CACHE_TTL=15s
|
||||||
FROSTFS_MORPH_SWITCH_INTERVAL=3m
|
FROSTFS_MORPH_SWITCH_INTERVAL=3m
|
||||||
|
FROSTFS_MORPH_INACTIVITY_TIMEOUT=30m
|
||||||
FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.frostfs.info:40341/ws"
|
FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.frostfs.info:40341/ws"
|
||||||
FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY=0
|
FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY=0
|
||||||
FROSTFS_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.frostfs.info:40341/ws"
|
FROSTFS_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.frostfs.info:40341/ws"
|
||||||
|
|
|
@ -101,6 +101,7 @@
|
||||||
"dial_timeout": "30s",
|
"dial_timeout": "30s",
|
||||||
"cache_ttl": "15s",
|
"cache_ttl": "15s",
|
||||||
"switch_interval": "3m",
|
"switch_interval": "3m",
|
||||||
|
"inactivity_timeout": "30m",
|
||||||
"rpc_endpoint": [
|
"rpc_endpoint": [
|
||||||
{
|
{
|
||||||
"address": "wss://rpc1.morph.frostfs.info:40341/ws",
|
"address": "wss://rpc1.morph.frostfs.info:40341/ws",
|
||||||
|
|
|
@ -89,6 +89,7 @@ morph:
|
||||||
# Default value: block time. It is recommended to have this value less or equal to block time.
|
# Default value: block time. It is recommended to have this value less or equal to block time.
|
||||||
# Cached entities: containers, container lists, eACL tables.
|
# Cached entities: containers, container lists, eACL tables.
|
||||||
switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node
|
switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node
|
||||||
|
inactivity_timeout: 30m # interval b/w morph events after which the node fails with error
|
||||||
rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success
|
rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success
|
||||||
- address: wss://rpc1.morph.frostfs.info:40341/ws
|
- address: wss://rpc1.morph.frostfs.info:40341/ws
|
||||||
priority: 0
|
priority: 0
|
||||||
|
|
|
@ -76,7 +76,6 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
|
||||||
|
|
||||||
NodeStateSettings: netSettings,
|
NodeStateSettings: netSettings,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -105,6 +104,7 @@ func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *
|
||||||
s.log.Warn(logs.InnerringCantGetLastProcessedMainChainBlockNumber, zap.String("error", err.Error()))
|
s.log.Warn(logs.InnerringCantGetLastProcessedMainChainBlockNumber, zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
mainnetChain.from = fromMainChainBlock
|
mainnetChain.from = fromMainChainBlock
|
||||||
|
mainnetChain.inactivityTimeout = cfg.GetDuration("mainnet.inactivity_timeout")
|
||||||
|
|
||||||
// create mainnet client
|
// create mainnet client
|
||||||
s.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
|
s.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
|
||||||
|
@ -113,7 +113,7 @@ func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *
|
||||||
}
|
}
|
||||||
|
|
||||||
// create mainnet listener
|
// create mainnet listener
|
||||||
s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain)
|
s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain, errChan)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -467,6 +467,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
|
||||||
name: morphPrefix,
|
name: morphPrefix,
|
||||||
from: fromSideChainBlock,
|
from: fromSideChainBlock,
|
||||||
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
|
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
|
||||||
|
inactivityTimeout: cfg.GetDuration("morph.inactivity_timeout"),
|
||||||
}
|
}
|
||||||
|
|
||||||
// create morph client
|
// create morph client
|
||||||
|
@ -476,7 +477,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
|
||||||
}
|
}
|
||||||
|
|
||||||
// create morph listener
|
// create morph listener
|
||||||
s.morphListener, err = createListener(ctx, s.morphClient, morphChain)
|
s.morphListener, err = createListener(ctx, s.morphClient, morphChain, errChan)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
|
||||||
|
@ -113,6 +114,7 @@ type (
|
||||||
sgn *transaction.Signer
|
sgn *transaction.Signer
|
||||||
from uint32 // block height
|
from uint32 // block height
|
||||||
morphCacheMetric metrics.MorphCacheMetrics
|
morphCacheMetric metrics.MorphCacheMetrics
|
||||||
|
inactivityTimeout time.Duration
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -418,17 +420,13 @@ func (s *Server) initSdNotify(cfg *viper.Viper) (bool, error) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
|
func createListener(ctx context.Context, cli *client.Client, p *chainParams, errCh chan<- error) (event.Listener, error) {
|
||||||
var (
|
var (
|
||||||
sub subscriber.Subscriber
|
sub subscriber.Subscriber
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
sub, err = subscriber.New(ctx, &subscriber.Params{
|
sub, err = subscriber.New(ctx, p.log, cli, p.from, p.inactivityTimeout, errCh)
|
||||||
Log: p.log,
|
|
||||||
StartFromBlock: p.from,
|
|
||||||
Client: cli,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,9 +2,9 @@ package subscriber
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
|
@ -55,13 +55,9 @@ type (
|
||||||
subscribedEvents map[util.Uint160]bool
|
subscribedEvents map[util.Uint160]bool
|
||||||
subscribedNotaryEvents map[util.Uint160]bool
|
subscribedNotaryEvents map[util.Uint160]bool
|
||||||
subscribedToNewBlocks bool
|
subscribedToNewBlocks bool
|
||||||
}
|
|
||||||
|
|
||||||
// Params is a group of Subscriber constructor parameters.
|
deadlockDuration time.Duration
|
||||||
Params struct {
|
deadlockErrCh chan<- error
|
||||||
Log *logger.Logger
|
|
||||||
StartFromBlock uint32
|
|
||||||
Client *client.Client
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -73,14 +69,6 @@ func (s *subscriber) NotificationChannels() NotificationChannels {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
errNilParams = errors.New("chain/subscriber: config was not provided to the constructor")
|
|
||||||
|
|
||||||
errNilLogger = errors.New("chain/subscriber: logger was not provided to the constructor")
|
|
||||||
|
|
||||||
errNilClient = errors.New("chain/subscriber: client was not provided to the constructor")
|
|
||||||
)
|
|
||||||
|
|
||||||
func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
|
func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
@ -146,24 +134,18 @@ func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// New is a constructs Neo:Morph event listener and returns Subscriber interface.
|
// New is a constructs Neo:Morph event listener and returns Subscriber interface.
|
||||||
func New(ctx context.Context, p *Params) (Subscriber, error) {
|
func New(ctx context.Context, log *logger.Logger,
|
||||||
switch {
|
client *client.Client, startFromBlock uint32,
|
||||||
case p == nil:
|
deadlockDuration time.Duration, deadlockErrCh chan<- error,
|
||||||
return nil, errNilParams
|
) (Subscriber, error) {
|
||||||
case p.Log == nil:
|
err := awaitHeight(client, startFromBlock)
|
||||||
return nil, errNilLogger
|
|
||||||
case p.Client == nil:
|
|
||||||
return nil, errNilClient
|
|
||||||
}
|
|
||||||
|
|
||||||
err := awaitHeight(p.Client, p.StartFromBlock)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sub := &subscriber{
|
sub := &subscriber{
|
||||||
log: p.Log,
|
log: log,
|
||||||
client: p.Client,
|
client: client,
|
||||||
notifyChan: make(chan *state.ContainedNotificationEvent),
|
notifyChan: make(chan *state.ContainedNotificationEvent),
|
||||||
blockChan: make(chan *block.Block),
|
blockChan: make(chan *block.Block),
|
||||||
notaryChan: make(chan *result.NotaryRequestEvent),
|
notaryChan: make(chan *result.NotaryRequestEvent),
|
||||||
|
@ -172,6 +154,9 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
|
||||||
|
|
||||||
subscribedEvents: make(map[util.Uint160]bool),
|
subscribedEvents: make(map[util.Uint160]bool),
|
||||||
subscribedNotaryEvents: make(map[util.Uint160]bool),
|
subscribedNotaryEvents: make(map[util.Uint160]bool),
|
||||||
|
|
||||||
|
deadlockDuration: deadlockDuration,
|
||||||
|
deadlockErrCh: deadlockErrCh,
|
||||||
}
|
}
|
||||||
// Worker listens all events from temporary NeoGo channel and puts them
|
// Worker listens all events from temporary NeoGo channel and puts them
|
||||||
// into corresponding permanent channels.
|
// into corresponding permanent channels.
|
||||||
|
@ -184,7 +169,13 @@ func (s *subscriber) routeNotifications(ctx context.Context) {
|
||||||
var (
|
var (
|
||||||
restoreCh = make(chan bool)
|
restoreCh = make(chan bool)
|
||||||
restoreInProgress bool
|
restoreInProgress bool
|
||||||
|
deadlockTimer = time.NewTimer(s.deadlockDuration)
|
||||||
)
|
)
|
||||||
|
defer func() {
|
||||||
|
if !deadlockTimer.Stop() {
|
||||||
|
<-deadlockTimer.C
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
routeloop:
|
routeloop:
|
||||||
for {
|
for {
|
||||||
|
@ -196,6 +187,7 @@ routeloop:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
break routeloop
|
break routeloop
|
||||||
case ev, ok := <-curr.NotifyChan:
|
case ev, ok := <-curr.NotifyChan:
|
||||||
|
deadlockTimer.Reset(s.deadlockDuration)
|
||||||
if ok {
|
if ok {
|
||||||
s.client.Metrics().IncNotificationCount("notify")
|
s.client.Metrics().IncNotificationCount("notify")
|
||||||
s.notifyChan <- ev
|
s.notifyChan <- ev
|
||||||
|
@ -203,6 +195,7 @@ routeloop:
|
||||||
connLost = true
|
connLost = true
|
||||||
}
|
}
|
||||||
case ev, ok := <-curr.BlockChan:
|
case ev, ok := <-curr.BlockChan:
|
||||||
|
deadlockTimer.Reset(s.deadlockDuration)
|
||||||
if ok {
|
if ok {
|
||||||
s.client.Metrics().IncNotificationCount("block")
|
s.client.Metrics().IncNotificationCount("block")
|
||||||
s.client.Metrics().SetLastBlock(ev.Index)
|
s.client.Metrics().SetLastBlock(ev.Index)
|
||||||
|
@ -211,6 +204,7 @@ routeloop:
|
||||||
connLost = true
|
connLost = true
|
||||||
}
|
}
|
||||||
case ev, ok := <-curr.NotaryChan:
|
case ev, ok := <-curr.NotaryChan:
|
||||||
|
deadlockTimer.Reset(s.deadlockDuration)
|
||||||
if ok {
|
if ok {
|
||||||
s.client.Metrics().IncNotificationCount("notary")
|
s.client.Metrics().IncNotificationCount("notary")
|
||||||
s.notaryChan <- ev
|
s.notaryChan <- ev
|
||||||
|
@ -218,10 +212,13 @@ routeloop:
|
||||||
connLost = true
|
connLost = true
|
||||||
}
|
}
|
||||||
case ok := <-restoreCh:
|
case ok := <-restoreCh:
|
||||||
|
deadlockTimer.Reset(s.deadlockDuration)
|
||||||
restoreInProgress = false
|
restoreInProgress = false
|
||||||
if !ok {
|
if !ok {
|
||||||
connLost = true
|
connLost = true
|
||||||
}
|
}
|
||||||
|
case <-deadlockTimer.C:
|
||||||
|
s.deadlockErrCh <- fmt.Errorf("no morph events got for %s", s.deadlockDuration)
|
||||||
}
|
}
|
||||||
if connLost {
|
if connLost {
|
||||||
if !restoreInProgress {
|
if !restoreInProgress {
|
||||||
|
|
Loading…
Reference in a new issue