5bb7c6b715
Added sync logs for every service separately to provide the ability to have a custom logger for each service. This commit makes the code follow the zap usages rules: `Sync calls the underlying Core's Sync method, flushing any buffered log entries. Applications should take care to call Sync before exiting.` Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
766 lines
22 KiB
Go
766 lines
22 KiB
Go
package consensus
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/dbft"
|
|
"github.com/nspcc-dev/dbft/block"
|
|
"github.com/nspcc-dev/dbft/crypto"
|
|
"github.com/nspcc-dev/dbft/payload"
|
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
|
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
|
|
coreb "github.com/nspcc-dev/neo-go/pkg/core/block"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/interop"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
|
npayload "github.com/nspcc-dev/neo-go/pkg/network/payload"
|
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
|
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
|
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// cacheMaxCapacity is the default cache capacity taken
|
|
// from C# implementation https://github.com/neo-project/neo/blob/master/neo/Ledger/Blockchain.cs#L64
|
|
const cacheMaxCapacity = 100
|
|
|
|
// defaultTimePerBlock is a period between blocks which is used in Neo.
|
|
const defaultTimePerBlock = 15 * time.Second
|
|
|
|
// Number of nanoseconds in millisecond.
|
|
const nsInMs = 1000000
|
|
|
|
// Ledger is the interface to Blockchain sufficient for Service.
|
|
type Ledger interface {
|
|
ApplyPolicyToTxSet([]*transaction.Transaction) []*transaction.Transaction
|
|
GetConfig() config.Blockchain
|
|
GetMemPool() *mempool.Pool
|
|
GetNextBlockValidators() ([]*keys.PublicKey, error)
|
|
GetStateRoot(height uint32) (*state.MPTRoot, error)
|
|
GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error)
|
|
ComputeNextBlockValidators() []*keys.PublicKey
|
|
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
|
|
SubscribeForBlocks(ch chan *coreb.Block)
|
|
UnsubscribeFromBlocks(ch chan *coreb.Block)
|
|
GetBaseExecFee() int64
|
|
CalculateAttributesFee(tx *transaction.Transaction) int64
|
|
interop.Ledger
|
|
mempool.Feer
|
|
}
|
|
|
|
// BlockQueuer is an interface to the block queue manager sufficient for Service.
|
|
type BlockQueuer interface {
|
|
PutBlock(block *coreb.Block) error
|
|
}
|
|
|
|
// Service represents a consensus instance.
|
|
type Service interface {
|
|
// Name returns service name.
|
|
Name() string
|
|
// Start initializes dBFT and starts event loop for consensus service.
|
|
// It must be called only when the sufficient amount of peers are connected.
|
|
// The service only starts once, subsequent calls to Start are no-op.
|
|
Start()
|
|
// Shutdown stops dBFT event loop. It can only be called once, subsequent calls
|
|
// to Shutdown on the same instance are no-op. The instance that was stopped can
|
|
// not be started again by calling Start (use a new instance if needed).
|
|
Shutdown()
|
|
|
|
// OnPayload is a callback to notify the Service about a newly received payload.
|
|
OnPayload(p *npayload.Extensible) error
|
|
// OnTransaction is a callback to notify the Service about a newly received transaction.
|
|
OnTransaction(tx *transaction.Transaction)
|
|
}
|
|
|
|
type service struct {
|
|
Config
|
|
|
|
log *zap.Logger
|
|
// txx is a fifo cache which stores miner transactions.
|
|
txx *relayCache
|
|
dbft *dbft.DBFT
|
|
// messages and transactions are channels needed to process
|
|
// everything in single thread.
|
|
messages chan Payload
|
|
transactions chan *transaction.Transaction
|
|
// blockEvents is used to pass a new block event to the consensus
|
|
// process. It has a tiny buffer in order to avoid Blockchain blocking
|
|
// on block addition under the high load.
|
|
blockEvents chan *coreb.Block
|
|
lastProposal []util.Uint256
|
|
wallet *wallet.Wallet
|
|
// started is a flag set with Start method that runs an event handling
|
|
// goroutine.
|
|
started atomic.Bool
|
|
quit chan struct{}
|
|
finished chan struct{}
|
|
// lastTimestamp contains timestamp for the last processed block.
|
|
// We can't rely on timestamp from dbft context because it is changed
|
|
// before the block is accepted. So, in case of change view, it will contain
|
|
// an updated value.
|
|
lastTimestamp uint64
|
|
}
|
|
|
|
// Config is a configuration for consensus services.
|
|
type Config struct {
|
|
// Logger is a logger instance.
|
|
Logger *zap.Logger
|
|
// Broadcast is a callback which is called to notify the server
|
|
// about a new consensus payload to be sent.
|
|
Broadcast func(p *npayload.Extensible)
|
|
// Chain is a Ledger instance.
|
|
Chain Ledger
|
|
// BlockQueue is a BlockQueuer instance.
|
|
BlockQueue BlockQueuer
|
|
// ProtocolConfiguration contains protocol settings.
|
|
ProtocolConfiguration config.ProtocolConfiguration
|
|
// RequestTx is a callback to which will be called
|
|
// when a node lacks transactions present in the block.
|
|
RequestTx func(h ...util.Uint256)
|
|
// StopTxFlow is a callback that is called after the consensus
|
|
// process stops accepting incoming transactions.
|
|
StopTxFlow func()
|
|
// TimePerBlock is minimal time that should pass before the next block is accepted.
|
|
TimePerBlock time.Duration
|
|
// Wallet is a local-node wallet configuration. If the path is empty, then
|
|
// no wallet will be initialized and the service will be in watch-only mode.
|
|
Wallet config.Wallet
|
|
}
|
|
|
|
// NewService returns a new consensus.Service instance.
|
|
func NewService(cfg Config) (Service, error) {
|
|
if cfg.TimePerBlock <= 0 {
|
|
cfg.TimePerBlock = defaultTimePerBlock
|
|
}
|
|
|
|
if cfg.Logger == nil {
|
|
return nil, errors.New("empty logger")
|
|
}
|
|
|
|
srv := &service{
|
|
Config: cfg,
|
|
|
|
log: cfg.Logger,
|
|
txx: newFIFOCache(cacheMaxCapacity),
|
|
messages: make(chan Payload, 100),
|
|
|
|
transactions: make(chan *transaction.Transaction, 100),
|
|
blockEvents: make(chan *coreb.Block, 1),
|
|
quit: make(chan struct{}),
|
|
finished: make(chan struct{}),
|
|
}
|
|
|
|
var err error
|
|
|
|
if len(cfg.Wallet.Path) > 0 {
|
|
if srv.wallet, err = wallet.NewWalletFromFile(cfg.Wallet.Path); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check that the wallet password is correct for at least one account.
|
|
var ok bool
|
|
for _, acc := range srv.wallet.Accounts {
|
|
err := acc.Decrypt(srv.Config.Wallet.Password, srv.wallet.Scrypt)
|
|
if err == nil {
|
|
ok = true
|
|
break
|
|
}
|
|
}
|
|
if !ok {
|
|
return nil, errors.New("no account with provided password was found")
|
|
}
|
|
}
|
|
|
|
srv.dbft = dbft.New(
|
|
dbft.WithLogger(srv.log),
|
|
dbft.WithSecondsPerBlock(cfg.TimePerBlock),
|
|
dbft.WithGetKeyPair(srv.getKeyPair),
|
|
dbft.WithRequestTx(cfg.RequestTx),
|
|
dbft.WithStopTxFlow(cfg.StopTxFlow),
|
|
dbft.WithGetTx(srv.getTx),
|
|
dbft.WithGetVerified(srv.getVerifiedTx),
|
|
dbft.WithBroadcast(srv.broadcast),
|
|
dbft.WithProcessBlock(srv.processBlock),
|
|
dbft.WithVerifyBlock(srv.verifyBlock),
|
|
dbft.WithGetBlock(srv.getBlock),
|
|
dbft.WithWatchOnly(func() bool { return false }),
|
|
dbft.WithNewBlockFromContext(srv.newBlockFromContext),
|
|
dbft.WithCurrentHeight(cfg.Chain.BlockHeight),
|
|
dbft.WithCurrentBlockHash(cfg.Chain.CurrentBlockHash),
|
|
dbft.WithGetValidators(srv.getValidators),
|
|
dbft.WithGetConsensusAddress(srv.getConsensusAddress),
|
|
|
|
dbft.WithNewConsensusPayload(srv.newPayload),
|
|
dbft.WithNewPrepareRequest(srv.newPrepareRequest),
|
|
dbft.WithNewPrepareResponse(func() payload.PrepareResponse { return new(prepareResponse) }),
|
|
dbft.WithNewChangeView(func() payload.ChangeView { return new(changeView) }),
|
|
dbft.WithNewCommit(func() payload.Commit { return new(commit) }),
|
|
dbft.WithNewRecoveryRequest(func() payload.RecoveryRequest { return new(recoveryRequest) }),
|
|
dbft.WithNewRecoveryMessage(func() payload.RecoveryMessage {
|
|
return &recoveryMessage{stateRootEnabled: srv.ProtocolConfiguration.StateRootInHeader}
|
|
}),
|
|
dbft.WithVerifyPrepareRequest(srv.verifyRequest),
|
|
dbft.WithVerifyPrepareResponse(func(_ payload.ConsensusPayload) error { return nil }),
|
|
)
|
|
|
|
if srv.dbft == nil {
|
|
return nil, errors.New("can't initialize dBFT")
|
|
}
|
|
|
|
return srv, nil
|
|
}
|
|
|
|
var (
|
|
_ block.Transaction = (*transaction.Transaction)(nil)
|
|
_ block.Block = (*neoBlock)(nil)
|
|
)
|
|
|
|
// NewPayload creates a new consensus payload for the provided network.
|
|
func NewPayload(m netmode.Magic, stateRootEnabled bool) *Payload {
|
|
return &Payload{
|
|
Extensible: npayload.Extensible{
|
|
Category: npayload.ConsensusCategory,
|
|
},
|
|
message: message{
|
|
stateRootEnabled: stateRootEnabled,
|
|
},
|
|
network: m,
|
|
}
|
|
}
|
|
|
|
func (s *service) newPayload(c *dbft.Context, t payload.MessageType, msg any) payload.ConsensusPayload {
|
|
cp := NewPayload(s.ProtocolConfiguration.Magic, s.ProtocolConfiguration.StateRootInHeader)
|
|
cp.SetHeight(c.BlockIndex)
|
|
cp.SetValidatorIndex(uint16(c.MyIndex))
|
|
cp.SetViewNumber(c.ViewNumber)
|
|
cp.SetType(t)
|
|
if pr, ok := msg.(*prepareRequest); ok {
|
|
pr.SetPrevHash(s.dbft.PrevHash)
|
|
pr.SetVersion(s.dbft.Version)
|
|
}
|
|
cp.SetPayload(msg)
|
|
|
|
cp.Extensible.ValidBlockStart = 0
|
|
cp.Extensible.ValidBlockEnd = c.BlockIndex
|
|
cp.Extensible.Sender = c.Validators[c.MyIndex].(*publicKey).GetScriptHash()
|
|
|
|
return cp
|
|
}
|
|
|
|
func (s *service) newPrepareRequest() payload.PrepareRequest {
|
|
r := new(prepareRequest)
|
|
if s.ProtocolConfiguration.StateRootInHeader {
|
|
r.stateRootEnabled = true
|
|
if sr, err := s.Chain.GetStateRoot(s.dbft.BlockIndex - 1); err == nil {
|
|
r.stateRoot = sr.Root
|
|
} else {
|
|
panic(err)
|
|
}
|
|
}
|
|
return r
|
|
}
|
|
|
|
// Name returns service name.
|
|
func (s *service) Name() string {
|
|
return "consensus"
|
|
}
|
|
|
|
func (s *service) Start() {
|
|
if s.started.CompareAndSwap(false, true) {
|
|
s.log.Info("starting consensus service")
|
|
b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block!
|
|
s.lastTimestamp = b.Timestamp
|
|
s.dbft.Start(s.lastTimestamp * nsInMs)
|
|
go s.eventLoop()
|
|
}
|
|
}
|
|
|
|
// Shutdown implements the Service interface.
|
|
func (s *service) Shutdown() {
|
|
if s.started.CompareAndSwap(true, false) {
|
|
s.log.Info("stopping consensus service")
|
|
close(s.quit)
|
|
<-s.finished
|
|
if s.wallet != nil {
|
|
s.wallet.Close()
|
|
}
|
|
}
|
|
_ = s.log.Sync()
|
|
}
|
|
|
|
func (s *service) eventLoop() {
|
|
s.Chain.SubscribeForBlocks(s.blockEvents)
|
|
|
|
// Manually sync up with potentially missed fresh blocks that may be added by blockchain
|
|
// before the subscription.
|
|
b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block!
|
|
if b.Timestamp >= s.lastTimestamp {
|
|
s.handleChainBlock(b)
|
|
}
|
|
events:
|
|
for {
|
|
select {
|
|
case <-s.quit:
|
|
s.dbft.Timer.Stop()
|
|
s.Chain.UnsubscribeFromBlocks(s.blockEvents)
|
|
break events
|
|
case <-s.dbft.Timer.C():
|
|
hv := s.dbft.Timer.HV()
|
|
s.log.Debug("timer fired",
|
|
zap.Uint32("height", hv.Height),
|
|
zap.Uint("view", uint(hv.View)))
|
|
s.dbft.OnTimeout(hv)
|
|
case msg := <-s.messages:
|
|
fields := []zap.Field{
|
|
zap.Uint8("from", msg.message.ValidatorIndex),
|
|
zap.Stringer("type", msg.Type()),
|
|
}
|
|
|
|
if msg.Type() == payload.RecoveryMessageType {
|
|
rec := msg.GetRecoveryMessage().(*recoveryMessage)
|
|
if rec.preparationHash == nil {
|
|
req := rec.GetPrepareRequest(&msg, s.dbft.Validators, uint16(s.dbft.PrimaryIndex))
|
|
if req != nil {
|
|
h := req.Hash()
|
|
rec.preparationHash = &h
|
|
}
|
|
}
|
|
|
|
fields = append(fields,
|
|
zap.Int("#preparation", len(rec.preparationPayloads)),
|
|
zap.Int("#commit", len(rec.commitPayloads)),
|
|
zap.Int("#changeview", len(rec.changeViewPayloads)),
|
|
zap.Bool("#request", rec.prepareRequest != nil),
|
|
zap.Bool("#hash", rec.preparationHash != nil))
|
|
}
|
|
|
|
s.log.Debug("received message", fields...)
|
|
s.dbft.OnReceive(&msg)
|
|
case tx := <-s.transactions:
|
|
s.dbft.OnTransaction(tx)
|
|
case b := <-s.blockEvents:
|
|
s.handleChainBlock(b)
|
|
}
|
|
// Always process block event if there is any, we can add one above or external
|
|
// services can add several blocks during message processing.
|
|
var latestBlock *coreb.Block
|
|
syncLoop:
|
|
for {
|
|
select {
|
|
case latestBlock = <-s.blockEvents:
|
|
default:
|
|
break syncLoop
|
|
}
|
|
}
|
|
if latestBlock != nil {
|
|
s.handleChainBlock(latestBlock)
|
|
}
|
|
}
|
|
drainLoop:
|
|
for {
|
|
select {
|
|
case <-s.messages:
|
|
case <-s.transactions:
|
|
case <-s.blockEvents:
|
|
default:
|
|
break drainLoop
|
|
}
|
|
}
|
|
close(s.messages)
|
|
close(s.transactions)
|
|
close(s.blockEvents)
|
|
close(s.finished)
|
|
}
|
|
|
|
func (s *service) handleChainBlock(b *coreb.Block) {
|
|
// We can get our own block here, so check for index.
|
|
if b.Index >= s.dbft.BlockIndex {
|
|
s.log.Debug("new block in the chain",
|
|
zap.Uint32("dbft index", s.dbft.BlockIndex),
|
|
zap.Uint32("chain index", s.Chain.BlockHeight()))
|
|
s.postBlock(b)
|
|
s.dbft.InitializeConsensus(0, b.Timestamp*nsInMs)
|
|
}
|
|
}
|
|
|
|
func (s *service) validatePayload(p *Payload) bool {
|
|
validators := s.getValidators()
|
|
if int(p.message.ValidatorIndex) >= len(validators) {
|
|
return false
|
|
}
|
|
|
|
pub := validators[p.message.ValidatorIndex]
|
|
h := pub.(*publicKey).GetScriptHash()
|
|
return p.Sender == h
|
|
}
|
|
|
|
func (s *service) getKeyPair(pubs []crypto.PublicKey) (int, crypto.PrivateKey, crypto.PublicKey) {
|
|
if s.wallet != nil {
|
|
for i := range pubs {
|
|
sh := pubs[i].(*publicKey).GetScriptHash()
|
|
acc := s.wallet.GetAccount(sh)
|
|
if acc == nil {
|
|
continue
|
|
}
|
|
|
|
if !acc.CanSign() {
|
|
err := acc.Decrypt(s.Config.Wallet.Password, s.wallet.Scrypt)
|
|
if err != nil {
|
|
s.log.Fatal("can't unlock account", zap.String("address", address.Uint160ToString(sh)))
|
|
break
|
|
}
|
|
}
|
|
|
|
return i, &privateKey{PrivateKey: acc.PrivateKey()}, &publicKey{PublicKey: acc.PublicKey()}
|
|
}
|
|
}
|
|
return -1, nil, nil
|
|
}
|
|
|
|
func (s *service) payloadFromExtensible(ep *npayload.Extensible) *Payload {
|
|
return &Payload{
|
|
Extensible: *ep,
|
|
message: message{
|
|
stateRootEnabled: s.ProtocolConfiguration.StateRootInHeader,
|
|
},
|
|
}
|
|
}
|
|
|
|
// OnPayload handles Payload receive.
|
|
func (s *service) OnPayload(cp *npayload.Extensible) error {
|
|
log := s.log.With(zap.Stringer("hash", cp.Hash()))
|
|
p := s.payloadFromExtensible(cp)
|
|
// decode payload data into message
|
|
if err := p.decodeData(); err != nil {
|
|
log.Info("can't decode payload data", zap.Error(err))
|
|
return nil
|
|
}
|
|
|
|
if !s.validatePayload(p) {
|
|
log.Info("can't validate payload")
|
|
return nil
|
|
}
|
|
|
|
if s.dbft == nil || !s.started.Load() {
|
|
log.Debug("dbft is inactive or not started yet")
|
|
return nil
|
|
}
|
|
|
|
s.messages <- *p
|
|
return nil
|
|
}
|
|
|
|
func (s *service) OnTransaction(tx *transaction.Transaction) {
|
|
if s.dbft != nil && s.started.Load() {
|
|
s.transactions <- tx
|
|
}
|
|
}
|
|
|
|
func (s *service) broadcast(p payload.ConsensusPayload) {
|
|
if err := p.(*Payload).Sign(s.dbft.Priv.(*privateKey)); err != nil {
|
|
s.log.Warn("can't sign consensus payload", zap.Error(err))
|
|
}
|
|
|
|
ep := &p.(*Payload).Extensible
|
|
s.Config.Broadcast(ep)
|
|
}
|
|
|
|
func (s *service) getTx(h util.Uint256) block.Transaction {
|
|
if tx := s.txx.Get(h); tx != nil {
|
|
return tx.(*transaction.Transaction)
|
|
}
|
|
|
|
tx, _, _ := s.Config.Chain.GetTransaction(h)
|
|
|
|
// this is needed because in case of absent tx dBFT expects to
|
|
// get nil interface, not a nil pointer to any concrete type
|
|
if tx != nil {
|
|
return tx
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) verifyBlock(b block.Block) bool {
|
|
coreb := &b.(*neoBlock).Block
|
|
|
|
if s.Chain.BlockHeight() >= coreb.Index {
|
|
s.log.Warn("proposed block has already outdated")
|
|
return false
|
|
}
|
|
if s.lastTimestamp >= coreb.Timestamp {
|
|
s.log.Warn("proposed block has small timestamp",
|
|
zap.Uint64("ts", coreb.Timestamp),
|
|
zap.Uint64("last", s.lastTimestamp))
|
|
return false
|
|
}
|
|
|
|
size := coreb.GetExpectedBlockSize()
|
|
if size > int(s.ProtocolConfiguration.MaxBlockSize) {
|
|
s.log.Warn("proposed block size exceeds config MaxBlockSize",
|
|
zap.Uint32("max size allowed", s.ProtocolConfiguration.MaxBlockSize),
|
|
zap.Int("block size", size))
|
|
return false
|
|
}
|
|
|
|
var fee int64
|
|
var pool = mempool.New(len(coreb.Transactions), 0, false, nil)
|
|
var mainPool = s.Chain.GetMemPool()
|
|
for _, tx := range coreb.Transactions {
|
|
var err error
|
|
|
|
fee += tx.SystemFee
|
|
if mainPool.ContainsKey(tx.Hash()) {
|
|
err = pool.Add(tx, s.Chain)
|
|
if err == nil {
|
|
continue
|
|
}
|
|
} else {
|
|
err = s.Chain.PoolTx(tx, pool)
|
|
}
|
|
if err != nil {
|
|
s.log.Warn("invalid transaction in proposed block",
|
|
zap.Stringer("hash", tx.Hash()),
|
|
zap.Error(err))
|
|
return false
|
|
}
|
|
if s.Chain.BlockHeight() >= coreb.Index {
|
|
s.log.Warn("proposed block has already outdated")
|
|
return false
|
|
}
|
|
}
|
|
|
|
maxBlockSysFee := s.ProtocolConfiguration.MaxBlockSystemFee
|
|
if fee > maxBlockSysFee {
|
|
s.log.Warn("proposed block system fee exceeds config MaxBlockSystemFee",
|
|
zap.Int("max system fee allowed", int(maxBlockSysFee)),
|
|
zap.Int("block system fee", int(fee)))
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
var (
|
|
errInvalidPrevHash = errors.New("invalid PrevHash")
|
|
errInvalidVersion = errors.New("invalid Version")
|
|
errInvalidStateRoot = errors.New("state root mismatch")
|
|
errInvalidTransactionsCount = errors.New("invalid transactions count")
|
|
)
|
|
|
|
func (s *service) verifyRequest(p payload.ConsensusPayload) error {
|
|
req := p.GetPrepareRequest().(*prepareRequest)
|
|
if req.prevHash != s.dbft.PrevHash {
|
|
return errInvalidPrevHash
|
|
}
|
|
if req.version != s.dbft.Version {
|
|
return errInvalidVersion
|
|
}
|
|
if s.ProtocolConfiguration.StateRootInHeader {
|
|
sr, err := s.Chain.GetStateRoot(s.dbft.BlockIndex - 1)
|
|
if err != nil {
|
|
return err
|
|
} else if sr.Root != req.stateRoot {
|
|
return fmt.Errorf("%w: %s != %s", errInvalidStateRoot, sr.Root, req.stateRoot)
|
|
}
|
|
}
|
|
if len(req.TransactionHashes()) > int(s.ProtocolConfiguration.MaxTransactionsPerBlock) {
|
|
return fmt.Errorf("%w: max = %d, got %d", errInvalidTransactionsCount, s.ProtocolConfiguration.MaxTransactionsPerBlock, len(req.TransactionHashes()))
|
|
}
|
|
// Save lastProposal for getVerified().
|
|
s.lastProposal = req.transactionHashes
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) processBlock(b block.Block) {
|
|
bb := &b.(*neoBlock).Block
|
|
bb.Script = *(s.getBlockWitness(bb))
|
|
|
|
if err := s.BlockQueue.PutBlock(bb); err != nil {
|
|
// The block might already be added via the regular network
|
|
// interaction.
|
|
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {
|
|
s.log.Warn("error on enqueue block", zap.Error(err))
|
|
}
|
|
}
|
|
s.postBlock(bb)
|
|
}
|
|
|
|
func (s *service) postBlock(b *coreb.Block) {
|
|
if s.lastTimestamp < b.Timestamp {
|
|
s.lastTimestamp = b.Timestamp
|
|
}
|
|
s.lastProposal = nil
|
|
}
|
|
|
|
func (s *service) getBlockWitness(b *coreb.Block) *transaction.Witness {
|
|
dctx := s.dbft.Context
|
|
pubs := convertKeys(dctx.Validators)
|
|
sigs := make(map[*keys.PublicKey][]byte)
|
|
|
|
for i := range pubs {
|
|
if p := dctx.CommitPayloads[i]; p != nil && p.ViewNumber() == dctx.ViewNumber {
|
|
sigs[pubs[i]] = p.GetCommit().Signature()
|
|
}
|
|
}
|
|
|
|
m := s.dbft.Context.M()
|
|
verif, err := smartcontract.CreateMultiSigRedeemScript(m, pubs)
|
|
if err != nil {
|
|
s.log.Warn("can't create multisig redeem script", zap.Error(err))
|
|
return nil
|
|
}
|
|
|
|
sort.Sort(keys.PublicKeys(pubs))
|
|
|
|
buf := io.NewBufBinWriter()
|
|
for i, j := 0, 0; i < len(pubs) && j < m; i++ {
|
|
if sig, ok := sigs[pubs[i]]; ok {
|
|
emit.Bytes(buf.BinWriter, sig)
|
|
j++
|
|
}
|
|
}
|
|
|
|
return &transaction.Witness{
|
|
InvocationScript: buf.Bytes(),
|
|
VerificationScript: verif,
|
|
}
|
|
}
|
|
|
|
func (s *service) getBlock(h util.Uint256) block.Block {
|
|
b, err := s.Chain.GetBlock(h)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
return &neoBlock{network: s.ProtocolConfiguration.Magic, Block: *b}
|
|
}
|
|
|
|
func (s *service) getVerifiedTx() []block.Transaction {
|
|
pool := s.Config.Chain.GetMemPool()
|
|
|
|
var txx []*transaction.Transaction
|
|
|
|
if s.dbft.ViewNumber > 0 && len(s.lastProposal) > 0 {
|
|
txx = make([]*transaction.Transaction, 0, len(s.lastProposal))
|
|
for i := range s.lastProposal {
|
|
if tx, ok := pool.TryGetValue(s.lastProposal[i]); ok {
|
|
txx = append(txx, tx)
|
|
}
|
|
}
|
|
|
|
if len(txx) < len(s.lastProposal)/2 {
|
|
txx = pool.GetVerifiedTransactions()
|
|
}
|
|
} else {
|
|
txx = pool.GetVerifiedTransactions()
|
|
}
|
|
|
|
if len(txx) > 0 {
|
|
txx = s.Config.Chain.ApplyPolicyToTxSet(txx)
|
|
}
|
|
|
|
res := make([]block.Transaction, len(txx))
|
|
for i := range txx {
|
|
res[i] = txx[i]
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func (s *service) getValidators(txes ...block.Transaction) []crypto.PublicKey {
|
|
var (
|
|
pKeys []*keys.PublicKey
|
|
err error
|
|
)
|
|
if txes == nil {
|
|
// getValidators with empty args is used by dbft to fill the list of
|
|
// block's validators, thus should return validators from the current
|
|
// epoch without recalculation.
|
|
pKeys, err = s.Chain.GetNextBlockValidators()
|
|
}
|
|
// getValidators with non-empty args is used by dbft to fill block's
|
|
// NextConsensus field, but NeoGo doesn't provide WithGetConsensusAddress
|
|
// callback and fills NextConsensus by itself via WithNewBlockFromContext
|
|
// callback. Thus, leave pKeys empty if txes != nil.
|
|
|
|
if err != nil {
|
|
s.log.Error("error while trying to get validators", zap.Error(err))
|
|
}
|
|
|
|
pubs := make([]crypto.PublicKey, len(pKeys))
|
|
for i := range pKeys {
|
|
pubs[i] = &publicKey{PublicKey: pKeys[i]}
|
|
}
|
|
|
|
return pubs
|
|
}
|
|
|
|
func (s *service) getConsensusAddress(validators ...crypto.PublicKey) util.Uint160 {
|
|
return util.Uint160{}
|
|
}
|
|
|
|
func convertKeys(validators []crypto.PublicKey) (pubs []*keys.PublicKey) {
|
|
pubs = make([]*keys.PublicKey, len(validators))
|
|
for i, k := range validators {
|
|
pubs[i] = k.(*publicKey).PublicKey
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *service) newBlockFromContext(ctx *dbft.Context) block.Block {
|
|
block := &neoBlock{network: s.ProtocolConfiguration.Magic}
|
|
|
|
block.Block.Timestamp = ctx.Timestamp / nsInMs
|
|
block.Block.Nonce = ctx.Nonce
|
|
block.Block.Index = ctx.BlockIndex
|
|
if s.ProtocolConfiguration.StateRootInHeader {
|
|
sr, err := s.Chain.GetStateRoot(ctx.BlockIndex - 1)
|
|
if err != nil {
|
|
s.log.Fatal(fmt.Sprintf("failed to get state root: %s", err.Error()))
|
|
}
|
|
block.StateRootEnabled = true
|
|
block.PrevStateRoot = sr.Root
|
|
}
|
|
|
|
// ComputeNextBlockValidators returns proper set of validators wrt dBFT epochs
|
|
// boundary. I.e. for the last block in the dBFT epoch this method returns the
|
|
// list of validators recalculated from the latest relevant information about
|
|
// NEO votes; in this case list of validators may differ from the one returned
|
|
// by GetNextBlockValidators. For the not-last block of dBFT epoch this method
|
|
// returns the same list as GetNextBlockValidators. Note, that by this moment
|
|
// we must be sure that previous block was successfully persisted to chain
|
|
// (i.e. PostPersist was completed for native Neo contract and PostPersist
|
|
// execution cache was persisted to s.Chain's DAO), otherwise the wrong
|
|
// (outdated, relevant for the previous dBFT epoch) value will be returned.
|
|
var validators = s.Chain.ComputeNextBlockValidators()
|
|
script, err := smartcontract.CreateDefaultMultiSigRedeemScript(validators)
|
|
if err != nil {
|
|
s.log.Fatal(fmt.Sprintf("failed to create multisignature script: %s", err.Error()))
|
|
}
|
|
block.Block.NextConsensus = crypto.Hash160(script)
|
|
block.Block.PrevHash = ctx.PrevHash
|
|
block.Block.Version = ctx.Version
|
|
|
|
primaryIndex := byte(ctx.PrimaryIndex)
|
|
block.Block.PrimaryIndex = primaryIndex
|
|
|
|
// it's OK to have ctx.TransactionsHashes == nil here
|
|
hashes := make([]util.Uint256, len(ctx.TransactionHashes))
|
|
copy(hashes, ctx.TransactionHashes)
|
|
block.Block.MerkleRoot = hash.CalcMerkleRoot(hashes)
|
|
|
|
return block
|
|
}
|