From 4874b4ae92e64bc43b6c01547483c21f0151a529 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Mon, 6 Sep 2021 16:05:45 +0300 Subject: [PATCH] [#798] cmd/neofs-node: Save latest processed block number Signed-off-by: Alex Vanin --- cmd/neofs-node/config.go | 16 +++++++++++++--- cmd/neofs-node/morph.go | 22 +++++++++++++++++++--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 9832d7cdd..69519fdee 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -46,6 +46,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/util/response" "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "github.com/nspcc-dev/neofs-node/pkg/util/state" "github.com/panjf2000/ants/v2" "go.etcd.io/bbolt" "go.uber.org/atomic" @@ -113,6 +114,8 @@ type cfg struct { mainChainClient *client.Client clientCache *cache.ClientCache + + persistate *state.PersistentStorage } type cfgGRPC struct { @@ -207,6 +210,8 @@ type cfgReputation struct { scriptHash neogoutil.Uint160 } +var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block") + func initCfg(path string) *cfg { var p config.Prm @@ -231,7 +236,10 @@ func initCfg(path string) *cfg { maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes - state := newNetworkState() + netState := newNetworkState() + + persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path()) + fatalOnErr(err) containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) @@ -261,7 +269,7 @@ func initCfg(path string) *cfg { }, cfgNetmap: cfgNetmap{ scriptHash: contractsconfig.Netmap(appCfg), - state: state, + state: netState, workerPool: netmapWorkerPool, needBootstrap: !relayOnly, reBoostrapTurnedOff: atomic.NewBool(relayOnly), @@ -272,7 +280,7 @@ func initCfg(path string) *cfg { }, localAddr: netAddr, respSvc: response.NewService( - response.WithNetworkState(state), + response.WithNetworkState(netState), ), cfgObject: cfgObject{ pool: initObjectPool(appCfg), @@ -285,6 +293,7 @@ func initCfg(path string) *cfg { clientCache: cache.NewSDKClientCache( apiclient.WithDialTimeout(apiclientconfig.DialTimeout(appCfg)), ), + persistate: persistate, } if metricsconfig.Address(c.appCfg) != "" { @@ -292,6 +301,7 @@ func initCfg(path string) *cfg { } c.onShutdown(c.clientCache.CloseAll) // clean up connections + c.onShutdown(func() { _ = c.persistate.Close() }) initLocalStorage(c) diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index 89096a238..9cb2dfe60 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -179,11 +179,19 @@ func listenMorphNotifications(c *cfg) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] }) + fromSideChainBlock, err := c.persistate.UInt32(persistateSideChainLastBlockKey) + if err != nil { + fromSideChainBlock = 0 + c.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) + } + for i := range endpoints { subs, err = subscriber.New(c.ctx, &subscriber.Params{ - Log: c.log, - Endpoint: endpoints[i], - DialTimeout: timeout, + Log: c.log, + Endpoint: endpoints[i], + DialTimeout: timeout, + RPCInitTimeout: 10 * time.Second, + StartFromBlock: fromSideChainBlock, }) if err == nil { c.log.Info("websocket neo event listener established", @@ -215,6 +223,14 @@ func listenMorphNotifications(c *cfg) { registerBlockHandler(lis, func(block *block.Block) { c.log.Debug("new block", zap.Uint32("index", block.Index)) + + err = c.persistate.SetUInt32(persistateSideChainLastBlockKey, block.Index) + if err != nil { + c.log.Warn("can't update persistent state", + zap.String("chain", "side"), + zap.Uint32("block_index", block.Index)) + } + tickBlockTimers(c) }) }