From 005f54e61ec368cdd77e4db2e07efbb70146d458 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Mon, 6 Sep 2021 16:01:50 +0300 Subject: [PATCH] [#798] pkg/innerring: Save latest processed block number Signed-off-by: Alex Vanin --- cmd/neofs-ir/defaults.go | 2 ++ pkg/innerring/innerring.go | 50 +++++++++++++++++++++++++++++++++++--- pkg/innerring/state.go | 18 ++++++++++++++ 3 files changed, 67 insertions(+), 3 deletions(-) diff --git a/cmd/neofs-ir/defaults.go b/cmd/neofs-ir/defaults.go index dca3fe22a..41931ff33 100644 --- a/cmd/neofs-ir/defaults.go +++ b/cmd/neofs-ir/defaults.go @@ -41,6 +41,8 @@ func defaultConfiguration(cfg *viper.Viper) { cfg.SetDefault("without_mainnet", false) + cfg.SetDefault("node.persistent_state.path", ".neofs-ir-state") + cfg.SetDefault("morph.endpoint.client", "") cfg.SetDefault("morph.endpoint.notification", "") cfg.SetDefault("morph.dial_timeout", "10s") diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 2d0fccb6c..68bebb69e 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net" + "time" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -45,6 +46,7 @@ import ( util2 "github.com/nspcc-dev/neofs-node/pkg/util" utilConfig "github.com/nspcc-dev/neofs-node/pkg/util/config" "github.com/nspcc-dev/neofs-node/pkg/util/precision" + "github.com/nspcc-dev/neofs-node/pkg/util/state" "github.com/panjf2000/ants/v2" "github.com/spf13/viper" "go.uber.org/atomic" @@ -74,6 +76,7 @@ type ( healthStatus atomic.Value balanceClient *balanceWrapper.Wrapper netmapClient *nmWrapper.Wrapper + persistate *state.PersistentStorage // notary configuration feeConfig *config.FeeConfig @@ -139,6 +142,7 @@ type ( name string gas util.Uint160 sgn *transaction.Signer + from uint32 // block height } ) @@ -237,9 +241,27 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { zap.Uint32("index", b.Index), ) + err = s.persistate.SetUInt32(persistateSideChainLastBlockKey, b.Index) + if err != nil { + s.log.Warn("can't update persistent state", + zap.String("chain", "side"), + zap.Uint32("block_index", b.Index)) + } + s.tickTimers() }) + if !s.withoutMainNet { + s.mainnetListener.RegisterBlockHandler(func(b *block.Block) { + err = s.persistate.SetUInt32(persistateMainChainLastBlockKey, b.Index) + if err != nil { + s.log.Warn("can't update persistent state", + zap.String("chain", "main"), + zap.Uint32("block_index", b.Index)) + } + }) + } + for _, runner := range s.runners { runner(intError) } @@ -318,11 +340,24 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error server.key = acc.PrivateKey() + server.persistate, err = initPersistentStateStorage(cfg) + if err != nil { + return nil, err + } + server.registerCloser(server.persistate.Close) + + fromSideChainBlock, err := server.persistate.UInt32(persistateSideChainLastBlockKey) + if err != nil { + fromSideChainBlock = 0 + log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) + } + morphChain := &chainParams{ log: log, cfg: cfg, key: server.key, name: morphPrefix, + from: fromSideChainBlock, } // create morph listener @@ -350,6 +385,13 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error mainnetChain.name = mainnetPrefix mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry} + fromMainChainBlock, err := server.persistate.UInt32(persistateMainChainLastBlockKey) + if err != nil { + fromMainChainBlock = 0 + log.Warn("can't get last processed main chain block number", zap.String("error", err.Error())) + } + mainnetChain.from = fromMainChainBlock + // create mainnet listener server.mainnetListener, err = createListener(ctx, mainnetChain) if err != nil { @@ -850,9 +892,11 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error func createListener(ctx context.Context, p *chainParams) (event.Listener, error) { sub, err := subscriber.New(ctx, &subscriber.Params{ - Log: p.log, - Endpoint: p.cfg.GetString(p.name + ".endpoint.notification"), - DialTimeout: p.cfg.GetDuration(p.name + ".dial_timeout"), + Log: p.log, + Endpoint: p.cfg.GetString(p.name + ".endpoint.notification"), + DialTimeout: p.cfg.GetDuration(p.name + ".dial_timeout"), + RPCInitTimeout: 10 * time.Second, + StartFromBlock: p.from, }) if err != nil { return nil, err diff --git a/pkg/innerring/state.go b/pkg/innerring/state.go index e74ccd327..36ce29232 100644 --- a/pkg/innerring/state.go +++ b/pkg/innerring/state.go @@ -1,17 +1,25 @@ package innerring import ( + "fmt" "sort" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/services/audit" control "github.com/nspcc-dev/neofs-node/pkg/services/control/ir" + "github.com/nspcc-dev/neofs-node/pkg/util/state" + "github.com/spf13/viper" "go.uber.org/zap" ) const voteMethod = "vote" +var ( + persistateMainChainLastBlockKey = []byte("main_chain_last_processed_block") + persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block") +) + // EpochCounter is a getter for a global epoch counter. func (s *Server) EpochCounter() uint64 { return s.epochCounter.Load() @@ -140,3 +148,13 @@ func (s *Server) setHealthStatus(hs control.HealthStatus) { func (s *Server) HealthStatus() control.HealthStatus { return s.healthStatus.Load().(control.HealthStatus) } + +func initPersistentStateStorage(cfg *viper.Viper) (*state.PersistentStorage, error) { + persistPath := cfg.GetString("node.persistent_state.path") + persistStorage, err := state.NewPersistentStorage(persistPath) + if err != nil { + return nil, fmt.Errorf("persistent state init error: %w", err) + } + + return persistStorage, nil +}