consensus: enqueue newly created blocks

Do not add them directly to chain, it will be done by the block queue
manager. Close https://github.com/nspcc-dev/neo-go/issues/2923. However,
this commit is not valid without
https://github.com/roman-khimov/dbft/pull/4.
It's the neo-go's duty to initialize consensus after subsequent block
addition; the dBFT itself must wait for the neo-go to complete the block
addition and notify the dBFT, so that it can initialize at 0-th view to
collect the next block.
This commit is contained in:
Anna Shaleva 2023-03-07 12:06:53 +03:00
parent 04d0b45ceb
commit 0cbef58b3c
5 changed files with 30 additions and 3 deletions

View file

@ -384,6 +384,7 @@ func mkConsensus(config config.Consensus, tpb time.Duration, chain *core.Blockch
Logger: log, Logger: log,
Broadcast: serv.BroadcastExtensible, Broadcast: serv.BroadcastExtensible,
Chain: chain, Chain: chain,
BlockQueue: serv.GetBlockQueue(),
ProtocolConfiguration: chain.GetConfig().ProtocolConfiguration, ProtocolConfiguration: chain.GetConfig().ProtocolConfiguration,
RequestTx: serv.RequestTx, RequestTx: serv.RequestTx,
StopTxFlow: serv.StopTxFlow, StopTxFlow: serv.StopTxFlow,

View file

@ -155,6 +155,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
Logger: zap.NewNop(), Logger: zap.NewNop(),
Broadcast: netSrv.BroadcastExtensible, Broadcast: netSrv.BroadcastExtensible,
Chain: chain, Chain: chain,
BlockQueue: netSrv.GetBlockQueue(),
ProtocolConfiguration: cfg.ProtocolConfiguration, ProtocolConfiguration: cfg.ProtocolConfiguration,
RequestTx: netSrv.RequestTx, RequestTx: netSrv.RequestTx,
StopTxFlow: netSrv.StopTxFlow, StopTxFlow: netSrv.StopTxFlow,

View file

@ -42,7 +42,6 @@ const nsInMs = 1000000
// Ledger is the interface to Blockchain sufficient for Service. // Ledger is the interface to Blockchain sufficient for Service.
type Ledger interface { type Ledger interface {
AddBlock(block *coreb.Block) error
ApplyPolicyToTxSet([]*transaction.Transaction) []*transaction.Transaction ApplyPolicyToTxSet([]*transaction.Transaction) []*transaction.Transaction
GetConfig() config.Blockchain GetConfig() config.Blockchain
GetMemPool() *mempool.Pool GetMemPool() *mempool.Pool
@ -58,6 +57,11 @@ type Ledger interface {
mempool.Feer mempool.Feer
} }
// BlockQueuer is an interface to the block queue manager sufficient for Service.
type BlockQueuer interface {
PutBlock(block *coreb.Block) error
}
// Service represents a consensus instance. // Service represents a consensus instance.
type Service interface { type Service interface {
// Name returns service name. // Name returns service name.
@ -115,6 +119,8 @@ type Config struct {
Broadcast func(p *npayload.Extensible) Broadcast func(p *npayload.Extensible)
// Chain is a Ledger instance. // Chain is a Ledger instance.
Chain Ledger Chain Ledger
// BlockQueue is a BlockQueuer instance.
BlockQueue BlockQueuer
// ProtocolConfiguration contains protocol settings. // ProtocolConfiguration contains protocol settings.
ProtocolConfiguration config.ProtocolConfiguration ProtocolConfiguration config.ProtocolConfiguration
// RequestTx is a callback to which will be called // RequestTx is a callback to which will be called
@ -570,11 +576,11 @@ func (s *service) processBlock(b block.Block) {
bb := &b.(*neoBlock).Block bb := &b.(*neoBlock).Block
bb.Script = *(s.getBlockWitness(bb)) bb.Script = *(s.getBlockWitness(bb))
if err := s.Chain.AddBlock(bb); err != nil { if err := s.BlockQueue.PutBlock(bb); err != nil {
// The block might already be added via the regular network // The block might already be added via the regular network
// interaction. // interaction.
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil { if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {
s.log.Warn("error on add block", zap.Error(err)) s.log.Warn("error on enqueue block", zap.Error(err))
} }
} }
s.postBlock(bb) s.postBlock(bb)

View file

@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core"
coreb "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/fee" "github.com/nspcc-dev/neo-go/pkg/core/fee"
"github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/native"
"github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/storage"
@ -50,6 +51,7 @@ func TestNewWatchingService(t *testing.T) {
Logger: zaptest.NewLogger(t), Logger: zaptest.NewLogger(t),
Broadcast: func(*npayload.Extensible) {}, Broadcast: func(*npayload.Extensible) {},
Chain: bc, Chain: bc,
BlockQueue: testBlockQueuer{bc: bc},
ProtocolConfiguration: bc.GetConfig().ProtocolConfiguration, ProtocolConfiguration: bc.GetConfig().ProtocolConfiguration,
RequestTx: func(...util.Uint256) {}, RequestTx: func(...util.Uint256) {},
StopTxFlow: func() {}, StopTxFlow: func() {},
@ -495,6 +497,7 @@ func newTestServiceWithChain(t *testing.T, bc *core.Blockchain) *service {
Logger: zaptest.NewLogger(t), Logger: zaptest.NewLogger(t),
Broadcast: func(*npayload.Extensible) {}, Broadcast: func(*npayload.Extensible) {},
Chain: bc, Chain: bc,
BlockQueue: testBlockQueuer{bc: bc},
ProtocolConfiguration: bc.GetConfig().ProtocolConfiguration, ProtocolConfiguration: bc.GetConfig().ProtocolConfiguration,
RequestTx: func(...util.Uint256) {}, RequestTx: func(...util.Uint256) {},
StopTxFlow: func() {}, StopTxFlow: func() {},
@ -509,6 +512,17 @@ func newTestServiceWithChain(t *testing.T, bc *core.Blockchain) *service {
return srv.(*service) return srv.(*service)
} }
type testBlockQueuer struct {
bc *core.Blockchain
}
var _ = BlockQueuer(testBlockQueuer{})
// PutBlock implements BlockQueuer interface.
func (bq testBlockQueuer) PutBlock(b *coreb.Block) error {
return bq.bc.AddBlock(b)
}
func getTestValidator(i int) (*privateKey, *publicKey) { func getTestValidator(i int) (*privateKey, *publicKey) {
key := testchain.PrivateKey(i) key := testchain.PrivateKey(i)
return &privateKey{PrivateKey: key}, &publicKey{PublicKey: key.PublicKey()} return &privateKey{PrivateKey: key}, &publicKey{PublicKey: key.PublicKey()}

View file

@ -324,6 +324,11 @@ func (s *Server) addService(svc Service) {
s.services[svc.Name()] = svc s.services[svc.Name()] = svc
} }
// GetBlockQueue returns the block queue instance managed by Server.
func (s *Server) GetBlockQueue() *bqueue.Queue {
return s.bQueue
}
// AddExtensibleService register a service that handles an extensible payload of some kind. // AddExtensibleService register a service that handles an extensible payload of some kind.
func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) { func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) {
s.serviceLock.Lock() s.serviceLock.Lock()