[#798] cmd/neofs-node: Save latest processed block number

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-09-06 16:05:45 +03:00 committed by Alex Vanin
parent cdb3b71070
commit 4874b4ae92
2 changed files with 32 additions and 6 deletions

View file

@ -46,6 +46,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/util/response" "github.com/nspcc-dev/neofs-node/pkg/services/util/response"
"github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/nspcc-dev/neofs-node/pkg/util/state"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.uber.org/atomic" "go.uber.org/atomic"
@ -113,6 +114,8 @@ type cfg struct {
mainChainClient *client.Client mainChainClient *client.Client
clientCache *cache.ClientCache clientCache *cache.ClientCache
persistate *state.PersistentStorage
} }
type cfgGRPC struct { type cfgGRPC struct {
@ -207,6 +210,8 @@ type cfgReputation struct {
scriptHash neogoutil.Uint160 scriptHash neogoutil.Uint160
} }
var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block")
func initCfg(path string) *cfg { func initCfg(path string) *cfg {
var p config.Prm var p config.Prm
@ -231,7 +236,10 @@ func initCfg(path string) *cfg {
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes
state := newNetworkState() netState := newNetworkState()
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
fatalOnErr(err)
containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err) fatalOnErr(err)
@ -261,7 +269,7 @@ func initCfg(path string) *cfg {
}, },
cfgNetmap: cfgNetmap{ cfgNetmap: cfgNetmap{
scriptHash: contractsconfig.Netmap(appCfg), scriptHash: contractsconfig.Netmap(appCfg),
state: state, state: netState,
workerPool: netmapWorkerPool, workerPool: netmapWorkerPool,
needBootstrap: !relayOnly, needBootstrap: !relayOnly,
reBoostrapTurnedOff: atomic.NewBool(relayOnly), reBoostrapTurnedOff: atomic.NewBool(relayOnly),
@ -272,7 +280,7 @@ func initCfg(path string) *cfg {
}, },
localAddr: netAddr, localAddr: netAddr,
respSvc: response.NewService( respSvc: response.NewService(
response.WithNetworkState(state), response.WithNetworkState(netState),
), ),
cfgObject: cfgObject{ cfgObject: cfgObject{
pool: initObjectPool(appCfg), pool: initObjectPool(appCfg),
@ -285,6 +293,7 @@ func initCfg(path string) *cfg {
clientCache: cache.NewSDKClientCache( clientCache: cache.NewSDKClientCache(
apiclient.WithDialTimeout(apiclientconfig.DialTimeout(appCfg)), apiclient.WithDialTimeout(apiclientconfig.DialTimeout(appCfg)),
), ),
persistate: persistate,
} }
if metricsconfig.Address(c.appCfg) != "" { if metricsconfig.Address(c.appCfg) != "" {
@ -292,6 +301,7 @@ func initCfg(path string) *cfg {
} }
c.onShutdown(c.clientCache.CloseAll) // clean up connections c.onShutdown(c.clientCache.CloseAll) // clean up connections
c.onShutdown(func() { _ = c.persistate.Close() })
initLocalStorage(c) initLocalStorage(c)

View file

@ -179,11 +179,19 @@ func listenMorphNotifications(c *cfg) {
endpoints[i], endpoints[j] = endpoints[j], endpoints[i] endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
}) })
fromSideChainBlock, err := c.persistate.UInt32(persistateSideChainLastBlockKey)
if err != nil {
fromSideChainBlock = 0
c.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))
}
for i := range endpoints { for i := range endpoints {
subs, err = subscriber.New(c.ctx, &subscriber.Params{ subs, err = subscriber.New(c.ctx, &subscriber.Params{
Log: c.log, Log: c.log,
Endpoint: endpoints[i], Endpoint: endpoints[i],
DialTimeout: timeout, DialTimeout: timeout,
RPCInitTimeout: 10 * time.Second,
StartFromBlock: fromSideChainBlock,
}) })
if err == nil { if err == nil {
c.log.Info("websocket neo event listener established", c.log.Info("websocket neo event listener established",
@ -215,6 +223,14 @@ func listenMorphNotifications(c *cfg) {
registerBlockHandler(lis, func(block *block.Block) { registerBlockHandler(lis, func(block *block.Block) {
c.log.Debug("new block", zap.Uint32("index", block.Index)) c.log.Debug("new block", zap.Uint32("index", block.Index))
err = c.persistate.SetUInt32(persistateSideChainLastBlockKey, block.Index)
if err != nil {
c.log.Warn("can't update persistent state",
zap.String("chain", "side"),
zap.Uint32("block_index", block.Index))
}
tickBlockTimers(c) tickBlockTimers(c)
}) })
} }