[#798] pkg/innerring: Save latest processed block number

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-09-06 16:01:50 +03:00 committed by Alex Vanin
parent 2bcf22ad79
commit 005f54e61e
3 changed files with 67 additions and 3 deletions

View file

@ -41,6 +41,8 @@ func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("without_mainnet", false)
cfg.SetDefault("node.persistent_state.path", ".neofs-ir-state")
cfg.SetDefault("morph.endpoint.client", "")
cfg.SetDefault("morph.endpoint.notification", "")
cfg.SetDefault("morph.dial_timeout", "10s")

View file

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
@ -45,6 +46,7 @@ import (
util2 "github.com/nspcc-dev/neofs-node/pkg/util"
utilConfig "github.com/nspcc-dev/neofs-node/pkg/util/config"
"github.com/nspcc-dev/neofs-node/pkg/util/precision"
"github.com/nspcc-dev/neofs-node/pkg/util/state"
"github.com/panjf2000/ants/v2"
"github.com/spf13/viper"
"go.uber.org/atomic"
@ -74,6 +76,7 @@ type (
healthStatus atomic.Value
balanceClient *balanceWrapper.Wrapper
netmapClient *nmWrapper.Wrapper
persistate *state.PersistentStorage
// notary configuration
feeConfig *config.FeeConfig
@ -139,6 +142,7 @@ type (
name string
gas util.Uint160
sgn *transaction.Signer
from uint32 // block height
}
)
@ -237,9 +241,27 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
zap.Uint32("index", b.Index),
)
err = s.persistate.SetUInt32(persistateSideChainLastBlockKey, b.Index)
if err != nil {
s.log.Warn("can't update persistent state",
zap.String("chain", "side"),
zap.Uint32("block_index", b.Index))
}
s.tickTimers()
})
if !s.withoutMainNet {
s.mainnetListener.RegisterBlockHandler(func(b *block.Block) {
err = s.persistate.SetUInt32(persistateMainChainLastBlockKey, b.Index)
if err != nil {
s.log.Warn("can't update persistent state",
zap.String("chain", "main"),
zap.Uint32("block_index", b.Index))
}
})
}
for _, runner := range s.runners {
runner(intError)
}
@ -318,11 +340,24 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
server.key = acc.PrivateKey()
server.persistate, err = initPersistentStateStorage(cfg)
if err != nil {
return nil, err
}
server.registerCloser(server.persistate.Close)
fromSideChainBlock, err := server.persistate.UInt32(persistateSideChainLastBlockKey)
if err != nil {
fromSideChainBlock = 0
log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))
}
morphChain := &chainParams{
log: log,
cfg: cfg,
key: server.key,
name: morphPrefix,
from: fromSideChainBlock,
}
// create morph listener
@ -350,6 +385,13 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
mainnetChain.name = mainnetPrefix
mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry}
fromMainChainBlock, err := server.persistate.UInt32(persistateMainChainLastBlockKey)
if err != nil {
fromMainChainBlock = 0
log.Warn("can't get last processed main chain block number", zap.String("error", err.Error()))
}
mainnetChain.from = fromMainChainBlock
// create mainnet listener
server.mainnetListener, err = createListener(ctx, mainnetChain)
if err != nil {
@ -853,6 +895,8 @@ func createListener(ctx context.Context, p *chainParams) (event.Listener, error)
Log: p.log,
Endpoint: p.cfg.GetString(p.name + ".endpoint.notification"),
DialTimeout: p.cfg.GetDuration(p.name + ".dial_timeout"),
RPCInitTimeout: 10 * time.Second,
StartFromBlock: p.from,
})
if err != nil {
return nil, err

View file

@ -1,17 +1,25 @@
package innerring
import (
"fmt"
"sort"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
control "github.com/nspcc-dev/neofs-node/pkg/services/control/ir"
"github.com/nspcc-dev/neofs-node/pkg/util/state"
"github.com/spf13/viper"
"go.uber.org/zap"
)
const voteMethod = "vote"
var (
persistateMainChainLastBlockKey = []byte("main_chain_last_processed_block")
persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block")
)
// EpochCounter is a getter for a global epoch counter.
func (s *Server) EpochCounter() uint64 {
return s.epochCounter.Load()
@ -140,3 +148,13 @@ func (s *Server) setHealthStatus(hs control.HealthStatus) {
func (s *Server) HealthStatus() control.HealthStatus {
return s.healthStatus.Load().(control.HealthStatus)
}
func initPersistentStateStorage(cfg *viper.Viper) (*state.PersistentStorage, error) {
persistPath := cfg.GetString("node.persistent_state.path")
persistStorage, err := state.NewPersistentStorage(persistPath)
if err != nil {
return nil, fmt.Errorf("persistent state init error: %w", err)
}
return persistStorage, nil
}