neoneo-go/pkg/consensus/consensus.go
Evgenii Stratonikov 3d704a3a3a consensus: try to use previous proposal after ChangeView
When system and network pressure is high it can be beneficial
to use transactions which and were already proposed.
The assumption is that they will be in other node's memory pool
with more probability.
2020-01-14 16:05:27 +03:00

419 lines
10 KiB
Go

package consensus
import (
"errors"
"math/rand"
"sort"
"time"
"github.com/CityOfZion/neo-go/config"
"github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/crypto/hash"
"github.com/CityOfZion/neo-go/pkg/crypto/keys"
"github.com/CityOfZion/neo-go/pkg/smartcontract"
"github.com/CityOfZion/neo-go/pkg/util"
"github.com/CityOfZion/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/dbft"
"github.com/nspcc-dev/dbft/block"
"github.com/nspcc-dev/dbft/crypto"
"github.com/nspcc-dev/dbft/payload"
"go.uber.org/zap"
)
// cacheMaxCapacity is the default cache capacity taken
// from C# implementation https://github.com/neo-project/neo/blob/master/neo/Ledger/Blockchain.cs#L64
const cacheMaxCapacity = 100
// defaultTimePerBlock is a period between blocks which is used in NEO.
const defaultTimePerBlock = 15 * time.Second
// Service represents consensus instance.
type Service interface {
// Start initializes dBFT and starts event loop for consensus service.
// It must be called only when sufficient amount of peers are connected.
Start()
// OnPayload is a callback to notify Service about new received payload.
OnPayload(p *Payload)
// OnTransaction is a callback to notify Service about new received transaction.
OnTransaction(tx *transaction.Transaction)
// GetPayload returns Payload with specified hash if it is present in the local cache.
GetPayload(h util.Uint256) *Payload
}
type service struct {
Config
log *zap.Logger
// cache is a fifo cache which stores recent payloads.
cache *relayCache
// txx is a fifo cache which stores miner transactions.
txx *relayCache
dbft *dbft.DBFT
// messages and transactions are channels needed to process
// everything in single thread.
messages chan Payload
transactions chan *transaction.Transaction
lastProposal []util.Uint256
}
// Config is a configuration for consensus services.
type Config struct {
// Logger is a logger instance.
Logger *zap.Logger
// Broadcast is a callback which is called to notify server
// about new consensus payload to sent.
Broadcast func(p *Payload)
// RelayBlock is a callback that is called to notify server
// about the new block that needs to be broadcasted.
RelayBlock func(b *core.Block)
// Chain is a core.Blockchainer instance.
Chain core.Blockchainer
// RequestTx is a callback to which will be called
// when a node lacks transactions present in a block.
RequestTx func(h ...util.Uint256)
// TimePerBlock minimal time that should pass before next block is accepted.
TimePerBlock time.Duration
// Wallet is a local-node wallet configuration.
Wallet *config.WalletConfig
}
// NewService returns new consensus.Service instance.
func NewService(cfg Config) (Service, error) {
if cfg.TimePerBlock <= 0 {
cfg.TimePerBlock = defaultTimePerBlock
}
if cfg.Logger == nil {
return nil, errors.New("empty logger")
}
srv := &service{
Config: cfg,
log: cfg.Logger,
cache: newFIFOCache(cacheMaxCapacity),
txx: newFIFOCache(cacheMaxCapacity),
messages: make(chan Payload, 100),
transactions: make(chan *transaction.Transaction, 100),
}
if cfg.Wallet == nil {
return srv, nil
}
priv, pub := getKeyPair(cfg.Wallet)
srv.dbft = dbft.New(
dbft.WithLogger(srv.log),
dbft.WithSecondsPerBlock(cfg.TimePerBlock),
dbft.WithKeyPair(priv, pub),
dbft.WithTxPerBlock(10000),
dbft.WithRequestTx(cfg.RequestTx),
dbft.WithGetTx(srv.getTx),
dbft.WithGetVerified(srv.getVerifiedTx),
dbft.WithBroadcast(srv.broadcast),
dbft.WithProcessBlock(srv.processBlock),
dbft.WithVerifyBlock(srv.verifyBlock),
dbft.WithGetBlock(srv.getBlock),
dbft.WithWatchOnly(func() bool { return false }),
dbft.WithNewBlock(func() block.Block { return new(neoBlock) }),
dbft.WithCurrentHeight(cfg.Chain.BlockHeight),
dbft.WithCurrentBlockHash(cfg.Chain.CurrentBlockHash),
dbft.WithGetValidators(srv.getValidators),
dbft.WithGetConsensusAddress(srv.getConsensusAddress),
dbft.WithNewConsensusPayload(func() payload.ConsensusPayload { return new(Payload) }),
dbft.WithNewPrepareRequest(func() payload.PrepareRequest { return new(prepareRequest) }),
dbft.WithNewPrepareResponse(func() payload.PrepareResponse { return new(prepareResponse) }),
dbft.WithNewChangeView(func() payload.ChangeView { return new(changeView) }),
dbft.WithNewCommit(func() payload.Commit { return new(commit) }),
)
if srv.dbft == nil {
return nil, errors.New("can't initialize dBFT")
}
return srv, nil
}
var (
_ block.Transaction = (*transaction.Transaction)(nil)
_ block.Block = (*neoBlock)(nil)
)
func (s *service) Start() {
s.dbft.Start()
go s.eventLoop()
}
func (s *service) eventLoop() {
for {
select {
case hv := <-s.dbft.Timer.C():
s.log.Debug("timer fired",
zap.Uint32("height", hv.Height),
zap.Uint("view", uint(hv.View)))
s.dbft.OnTimeout(hv)
case msg := <-s.messages:
s.log.Debug("received message", zap.Uint16("from", msg.validatorIndex))
s.dbft.OnReceive(&msg)
case tx := <-s.transactions:
s.dbft.OnTransaction(tx)
}
}
}
func (s *service) validatePayload(p *Payload) bool {
validators := s.getValidators()
if int(p.validatorIndex) >= len(validators) {
return false
}
pub := validators[p.validatorIndex]
vs := pub.(*publicKey).GetVerificationScript()
h := hash.Hash160(vs)
return p.Verify(h)
}
func getKeyPair(cfg *config.WalletConfig) (crypto.PrivateKey, crypto.PublicKey) {
// TODO: replace with wallet opening from the given path (#588)
key, err := keys.NEP2Decrypt(cfg.Path, cfg.Password)
if err != nil {
return nil, nil
}
return &privateKey{PrivateKey: key}, &publicKey{PublicKey: key.PublicKey()}
}
// OnPayload handles Payload receive.
func (s *service) OnPayload(cp *Payload) {
if !s.validatePayload(cp) || s.cache.Has(cp.Hash()) {
return
}
s.Config.Broadcast(cp)
s.cache.Add(cp)
if s.dbft == nil {
return
}
// we use switch here because other payloads could be possibly added in future
switch cp.Type() {
case payload.PrepareRequestType:
req := cp.GetPrepareRequest().(*prepareRequest)
s.txx.Add(&req.minerTx)
s.lastProposal = req.transactionHashes
}
s.messages <- *cp
}
func (s *service) OnTransaction(tx *transaction.Transaction) {
if s.dbft != nil {
s.transactions <- tx
}
}
// GetPayload returns payload stored in cache.
func (s *service) GetPayload(h util.Uint256) *Payload {
p := s.cache.Get(h)
if p == nil {
return (*Payload)(nil)
}
cp := *p.(*Payload)
return &cp
}
func (s *service) broadcast(p payload.ConsensusPayload) {
switch p.Type() {
case payload.PrepareRequestType:
pr := p.GetPrepareRequest().(*prepareRequest)
pr.minerTx = *s.txx.Get(pr.transactionHashes[0]).(*transaction.Transaction)
}
if err := p.(*Payload).Sign(s.dbft.Priv.(*privateKey)); err != nil {
s.log.Warn("can't sign consensus payload", zap.Error(err))
}
s.cache.Add(p)
s.Config.Broadcast(p.(*Payload))
}
func (s *service) getTx(h util.Uint256) block.Transaction {
if tx := s.txx.Get(h); tx != nil {
return tx.(*transaction.Transaction)
}
tx, _, _ := s.Config.Chain.GetTransaction(h)
// this is needed because in case of absent tx dBFT expects to
// get nil interface, not a nil pointer to any concrete type
if tx != nil {
return tx
}
return nil
}
func (s *service) verifyBlock(b block.Block) bool {
coreb := &b.(*neoBlock).Block
for _, tx := range coreb.Transactions {
if err := s.Chain.VerifyTx(tx, coreb); err != nil {
return false
}
}
return true
}
func (s *service) processBlock(b block.Block) {
bb := &b.(*neoBlock).Block
bb.Script = *(s.getBlockWitness(bb))
if err := s.Chain.AddBlock(bb); err != nil {
s.log.Warn("error on add block", zap.Error(err))
} else {
s.Config.RelayBlock(bb)
}
}
func (s *service) getBlockWitness(b *core.Block) *transaction.Witness {
dctx := s.dbft.Context
pubs := convertKeys(dctx.Validators)
sigs := make(map[*keys.PublicKey][]byte)
for i := range pubs {
if p := dctx.CommitPayloads[i]; p != nil && p.ViewNumber() == dctx.ViewNumber {
sigs[pubs[i]] = p.GetCommit().Signature()
}
}
m := s.dbft.Context.M()
verif, err := smartcontract.CreateMultiSigRedeemScript(m, pubs)
if err != nil {
s.log.Warn("can't create multisig redeem script", zap.Error(err))
return nil
}
sort.Sort(keys.PublicKeys(pubs))
var invoc []byte
for i, j := 0, 0; i < len(pubs) && j < m; i++ {
if sig, ok := sigs[pubs[i]]; ok {
invoc = append(invoc, byte(opcode.PUSHBYTES64))
invoc = append(invoc, sig...)
j++
}
}
return &transaction.Witness{
InvocationScript: invoc,
VerificationScript: verif,
}
}
func (s *service) getBlock(h util.Uint256) block.Block {
b, err := s.Chain.GetBlock(h)
if err != nil {
return nil
}
return &neoBlock{Block: *b}
}
func (s *service) getVerifiedTx(count int) []block.Transaction {
pool := s.Config.Chain.GetMemPool()
var txx []*transaction.Transaction
if s.dbft.ViewNumber > 0 {
txx = make([]*transaction.Transaction, 0, len(s.lastProposal))
for i := range s.lastProposal {
if tx, ok := pool.TryGetValue(s.lastProposal[i]); ok {
txx = append(txx, tx)
}
}
if len(txx) < len(s.lastProposal)/2 {
txx = pool.GetVerifiedTransactions()
}
} else {
txx = pool.GetVerifiedTransactions()
}
res := make([]block.Transaction, len(txx)+1)
for i := range txx {
res[i+1] = txx[i]
}
for {
nonce := rand.Uint32()
res[0] = &transaction.Transaction{
Type: transaction.MinerType,
Version: 0,
Data: &transaction.MinerTX{Nonce: nonce},
Attributes: nil,
Inputs: nil,
Outputs: nil,
Scripts: nil,
Trimmed: false,
}
if tx, _, _ := s.Chain.GetTransaction(res[0].Hash()); tx == nil {
break
}
}
s.txx.Add(res[0])
return res
}
func (s *service) getValidators(txx ...block.Transaction) []crypto.PublicKey {
var pKeys []*keys.PublicKey
if len(txx) == 0 {
pKeys, _ = s.Chain.GetValidators()
} else {
ntxx := make([]*transaction.Transaction, len(txx))
for i := range ntxx {
ntxx[i] = txx[i].(*transaction.Transaction)
}
pKeys, _ = s.Chain.GetValidators(ntxx...)
}
pubs := make([]crypto.PublicKey, len(pKeys))
for i := range pKeys {
pubs[i] = &publicKey{PublicKey: pKeys[i]}
}
return pubs
}
func (s *service) getConsensusAddress(validators ...crypto.PublicKey) (h util.Uint160) {
pubs := convertKeys(validators)
script, err := smartcontract.CreateMultiSigRedeemScript(s.dbft.M(), pubs)
if err != nil {
return
}
return crypto.Hash160(script)
}
func convertKeys(validators []crypto.PublicKey) (pubs []*keys.PublicKey) {
pubs = make([]*keys.PublicKey, len(validators))
for i, k := range validators {
pubs[i] = k.(*publicKey).PublicKey
}
return
}