diff --git a/cli/server/server.go b/cli/server/server.go index 399476905..2856c4f57 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -384,6 +384,7 @@ func mkConsensus(config config.Consensus, tpb time.Duration, chain *core.Blockch Logger: log, Broadcast: serv.BroadcastExtensible, Chain: chain, + BlockQueue: serv.GetBlockQueue(), ProtocolConfiguration: chain.GetConfig().ProtocolConfiguration, RequestTx: serv.RequestTx, StopTxFlow: serv.StopTxFlow, diff --git a/internal/testcli/executor.go b/internal/testcli/executor.go index ea6749059..d65823334 100644 --- a/internal/testcli/executor.go +++ b/internal/testcli/executor.go @@ -155,6 +155,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch Logger: zap.NewNop(), Broadcast: netSrv.BroadcastExtensible, Chain: chain, + BlockQueue: netSrv.GetBlockQueue(), ProtocolConfiguration: cfg.ProtocolConfiguration, RequestTx: netSrv.RequestTx, StopTxFlow: netSrv.StopTxFlow, diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index ee447e9cb..fb11bdea3 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -42,7 +42,6 @@ const nsInMs = 1000000 // Ledger is the interface to Blockchain sufficient for Service. type Ledger interface { - AddBlock(block *coreb.Block) error ApplyPolicyToTxSet([]*transaction.Transaction) []*transaction.Transaction GetConfig() config.Blockchain GetMemPool() *mempool.Pool @@ -58,6 +57,11 @@ type Ledger interface { mempool.Feer } +// BlockQueuer is an interface to the block queue manager sufficient for Service. +type BlockQueuer interface { + PutBlock(block *coreb.Block) error +} + // Service represents a consensus instance. type Service interface { // Name returns service name. @@ -115,6 +119,8 @@ type Config struct { Broadcast func(p *npayload.Extensible) // Chain is a Ledger instance. Chain Ledger + // BlockQueue is a BlockQueuer instance. + BlockQueue BlockQueuer // ProtocolConfiguration contains protocol settings. ProtocolConfiguration config.ProtocolConfiguration // RequestTx is a callback to which will be called @@ -570,11 +576,11 @@ func (s *service) processBlock(b block.Block) { bb := &b.(*neoBlock).Block bb.Script = *(s.getBlockWitness(bb)) - if err := s.Chain.AddBlock(bb); err != nil { + if err := s.BlockQueue.PutBlock(bb); err != nil { // The block might already be added via the regular network // interaction. if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil { - s.log.Warn("error on add block", zap.Error(err)) + s.log.Warn("error on enqueue block", zap.Error(err)) } } s.postBlock(bb) diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 8626527e9..78676cfe8 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/core" + coreb "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/fee" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/storage" @@ -50,6 +51,7 @@ func TestNewWatchingService(t *testing.T) { Logger: zaptest.NewLogger(t), Broadcast: func(*npayload.Extensible) {}, Chain: bc, + BlockQueue: testBlockQueuer{bc: bc}, ProtocolConfiguration: bc.GetConfig().ProtocolConfiguration, RequestTx: func(...util.Uint256) {}, StopTxFlow: func() {}, @@ -495,6 +497,7 @@ func newTestServiceWithChain(t *testing.T, bc *core.Blockchain) *service { Logger: zaptest.NewLogger(t), Broadcast: func(*npayload.Extensible) {}, Chain: bc, + BlockQueue: testBlockQueuer{bc: bc}, ProtocolConfiguration: bc.GetConfig().ProtocolConfiguration, RequestTx: func(...util.Uint256) {}, StopTxFlow: func() {}, @@ -509,6 +512,17 @@ func newTestServiceWithChain(t *testing.T, bc *core.Blockchain) *service { return srv.(*service) } +type testBlockQueuer struct { + bc *core.Blockchain +} + +var _ = BlockQueuer(testBlockQueuer{}) + +// PutBlock implements BlockQueuer interface. +func (bq testBlockQueuer) PutBlock(b *coreb.Block) error { + return bq.bc.AddBlock(b) +} + func getTestValidator(i int) (*privateKey, *publicKey) { key := testchain.PrivateKey(i) return &privateKey{PrivateKey: key}, &publicKey{PublicKey: key.PublicKey()} diff --git a/pkg/network/server.go b/pkg/network/server.go index 5598ec661..ca37d774f 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -324,6 +324,11 @@ func (s *Server) addService(svc Service) { s.services[svc.Name()] = svc } +// GetBlockQueue returns the block queue instance managed by Server. +func (s *Server) GetBlockQueue() *bqueue.Queue { + return s.bQueue +} + // AddExtensibleService register a service that handles an extensible payload of some kind. func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) { s.serviceLock.Lock()