diff --git a/cli/server/server.go b/cli/server/server.go index 399476905..2856c4f57 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -384,6 +384,7 @@ func mkConsensus(config config.Consensus, tpb time.Duration, chain *core.Blockch Logger: log, Broadcast: serv.BroadcastExtensible, Chain: chain, + BlockQueue: serv.GetBlockQueue(), ProtocolConfiguration: chain.GetConfig().ProtocolConfiguration, RequestTx: serv.RequestTx, StopTxFlow: serv.StopTxFlow, diff --git a/go.mod b/go.mod index dc8850413..a26d99e0d 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/holiman/uint256 v1.2.0 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/mr-tron/base58 v1.2.0 - github.com/nspcc-dev/dbft v0.0.0-20221020093431-31c1bbdc74f2 + github.com/nspcc-dev/dbft v0.0.0-20230314095711-114b11c42cf6 github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20221202075445-cb5c18dc73eb github.com/nspcc-dev/neofs-sdk-go v0.0.0-20220113123743-7f3162110659 diff --git a/go.sum b/go.sum index b15846a0c..19945916f 100644 --- a/go.sum +++ b/go.sum @@ -251,8 +251,8 @@ github.com/nspcc-dev/dbft v0.0.0-20191209120240-0d6b7568d9ae/go.mod h1:3FjXOoHmA github.com/nspcc-dev/dbft v0.0.0-20200117124306-478e5cfbf03a/go.mod h1:/YFK+XOxxg0Bfm6P92lY5eDSLYfp06XOdL8KAVgXjVk= github.com/nspcc-dev/dbft v0.0.0-20200219114139-199d286ed6c1/go.mod h1:O0qtn62prQSqizzoagHmuuKoz8QMkU3SzBoKdEvm3aQ= github.com/nspcc-dev/dbft v0.0.0-20210721160347-1b03241391ac/go.mod h1:U8MSnEShH+o5hexfWJdze6uMFJteP0ko7J2frO7Yu1Y= -github.com/nspcc-dev/dbft v0.0.0-20221020093431-31c1bbdc74f2 h1:2soBy8en5W4/1Gvbog8RyVpEbarGWZwPxppZjffWzZE= -github.com/nspcc-dev/dbft v0.0.0-20221020093431-31c1bbdc74f2/go.mod h1:g9xisXmX9NP9MjioaTe862n9SlZTrP+6PVUWLBYOr98= +github.com/nspcc-dev/dbft v0.0.0-20230314095711-114b11c42cf6 h1:K4AcGPB0JDLloGwRY2BPJJeWyNQw+n0BnrdgFjSEBIs= +github.com/nspcc-dev/dbft v0.0.0-20230314095711-114b11c42cf6/go.mod h1:IsUsZqxQkav23pFrCyZEibz0VukpI787XnbcsFkekI8= github.com/nspcc-dev/go-ordered-json v0.0.0-20210915112629-e1b6cce73d02/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U= github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 h1:n4ZaFCKt1pQJd7PXoMJabZWK9ejjbLOVrkl/lOUmshg= github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U= @@ -403,6 +403,7 @@ golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.4.0 h1:UVQgzMY87xqpKNgb+kDsll2Igd33HszWHFLmpaRMq/8= golang.org/x/crypto v0.4.0/go.mod h1:3quD/ATkf6oY+rnes5c3ExXTbLc8mueNue5/DoinL80= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -477,6 +478,7 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= @@ -557,12 +559,14 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210429154555-c04ba851c2a4/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -573,6 +577,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= diff --git a/internal/testcli/executor.go b/internal/testcli/executor.go index ea6749059..d65823334 100644 --- a/internal/testcli/executor.go +++ b/internal/testcli/executor.go @@ -155,6 +155,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch Logger: zap.NewNop(), Broadcast: netSrv.BroadcastExtensible, Chain: chain, + BlockQueue: netSrv.GetBlockQueue(), ProtocolConfiguration: cfg.ProtocolConfiguration, RequestTx: netSrv.RequestTx, StopTxFlow: netSrv.StopTxFlow, diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index a2833ac64..915b51629 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -42,7 +42,6 @@ const nsInMs = 1000000 // Ledger is the interface to Blockchain sufficient for Service. type Ledger interface { - AddBlock(block *coreb.Block) error ApplyPolicyToTxSet([]*transaction.Transaction) []*transaction.Transaction GetConfig() config.Blockchain GetMemPool() *mempool.Pool @@ -58,6 +57,11 @@ type Ledger interface { mempool.Feer } +// BlockQueuer is an interface to the block queue manager sufficient for Service. +type BlockQueuer interface { + PutBlock(block *coreb.Block) error +} + // Service represents a consensus instance. type Service interface { // Name returns service name. @@ -89,7 +93,8 @@ type service struct { messages chan Payload transactions chan *transaction.Transaction // blockEvents is used to pass a new block event to the consensus - // process. + // process. It has a tiny buffer in order to avoid Blockchain blocking + // on block addition under the high load. blockEvents chan *coreb.Block lastProposal []util.Uint256 wallet *wallet.Wallet @@ -114,6 +119,8 @@ type Config struct { Broadcast func(p *npayload.Extensible) // Chain is a Ledger instance. Chain Ledger + // BlockQueue is a BlockQueuer instance. + BlockQueue BlockQueuer // ProtocolConfiguration contains protocol settings. ProtocolConfiguration config.ProtocolConfiguration // RequestTx is a callback to which will be called @@ -336,11 +343,19 @@ events: case b := <-s.blockEvents: s.handleChainBlock(b) } - // Always process block event if there is any, we can add one above. - select { - case b := <-s.blockEvents: - s.handleChainBlock(b) - default: + // Always process block event if there is any, we can add one above or external + // services can add several blocks during message processing. + var latestBlock *coreb.Block + syncLoop: + for { + select { + case latestBlock = <-s.blockEvents: + default: + break syncLoop + } + } + if latestBlock != nil { + s.handleChainBlock(latestBlock) } } drainLoop: @@ -564,11 +579,11 @@ func (s *service) processBlock(b block.Block) { bb := &b.(*neoBlock).Block bb.Script = *(s.getBlockWitness(bb)) - if err := s.Chain.AddBlock(bb); err != nil { + if err := s.BlockQueue.PutBlock(bb); err != nil { // The block might already be added via the regular network // interaction. if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil { - s.log.Warn("error on add block", zap.Error(err)) + s.log.Warn("error on enqueue block", zap.Error(err)) } } s.postBlock(bb) diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 8626527e9..469e14767 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/core" + coreb "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/fee" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/storage" @@ -50,6 +51,7 @@ func TestNewWatchingService(t *testing.T) { Logger: zaptest.NewLogger(t), Broadcast: func(*npayload.Extensible) {}, Chain: bc, + BlockQueue: testBlockQueuer{bc: bc}, ProtocolConfiguration: bc.GetConfig().ProtocolConfiguration, RequestTx: func(...util.Uint256) {}, StopTxFlow: func() {}, @@ -62,6 +64,14 @@ func TestNewWatchingService(t *testing.T) { require.NotPanics(t, srv.Shutdown) } +func collectBlock(t *testing.T, bc *core.Blockchain, srv *service) { + h := bc.BlockHeight() + srv.dbft.OnTimeout(timer.HV{Height: srv.dbft.Context.BlockIndex}) // Collect and add block to the chain. + header, err := bc.GetHeader(bc.GetHeaderHash(h + 1)) + require.NoError(t, err) + srv.dbft.InitializeConsensus(0, header.Timestamp*nsInMs) // Init consensus manually at the next height, as we don't run the consensus service. +} + func initServiceNextConsensus(t *testing.T, newAcc *wallet.Account, offset uint32) (*service, *wallet.Account) { acc, err := wallet.NewAccountFromWIF(testchain.WIF(testchain.IDToOrder(0))) require.NoError(t, err) @@ -89,7 +99,11 @@ func initServiceNextConsensus(t *testing.T, newAcc *wallet.Account, offset uint3 require.NoError(t, bc.PoolTx(tx)) srv := newTestServiceWithChain(t, bc) + h := bc.BlockHeight() srv.dbft.Start(0) + header, err := bc.GetHeader(bc.GetHeaderHash(h + 1)) + require.NoError(t, err) + srv.dbft.InitializeConsensus(0, header.Timestamp*nsInMs) // Init consensus manually at the next height, as we don't run the consensus service. // Register new candidate. b.Reset() @@ -104,11 +118,11 @@ func initServiceNextConsensus(t *testing.T, newAcc *wallet.Account, offset uint3 require.NoError(t, newAcc.SignTx(netmode.UnitTestNet, tx)) require.NoError(t, bc.PoolTx(tx)) - srv.dbft.OnTimeout(timer.HV{Height: srv.dbft.Context.BlockIndex}) + collectBlock(t, bc, srv) cfg := bc.GetConfig() for i := srv.dbft.BlockIndex; !cfg.ShouldUpdateCommitteeAt(i + offset); i++ { - srv.dbft.OnTimeout(timer.HV{Height: srv.dbft.Context.BlockIndex}) + collectBlock(t, bc, srv) } // Vote for new candidate. @@ -125,7 +139,7 @@ func initServiceNextConsensus(t *testing.T, newAcc *wallet.Account, offset uint3 require.NoError(t, newAcc.SignTx(netmode.UnitTestNet, tx)) require.NoError(t, bc.PoolTx(tx)) - srv.dbft.OnTimeout(timer.HV{Height: srv.dbft.BlockIndex}) + collectBlock(t, bc, srv) return srv, acc } @@ -153,7 +167,7 @@ func TestService_NextConsensus(t *testing.T) { // OnPersist <- update committee // Block <- - srv.dbft.OnTimeout(timer.HV{Height: srv.dbft.BlockIndex}) + collectBlock(t, bc, srv) checkNextConsensus(t, bc, height+1, hash.Hash160(script)) }) /* @@ -495,6 +509,7 @@ func newTestServiceWithChain(t *testing.T, bc *core.Blockchain) *service { Logger: zaptest.NewLogger(t), Broadcast: func(*npayload.Extensible) {}, Chain: bc, + BlockQueue: testBlockQueuer{bc: bc}, ProtocolConfiguration: bc.GetConfig().ProtocolConfiguration, RequestTx: func(...util.Uint256) {}, StopTxFlow: func() {}, @@ -509,6 +524,17 @@ func newTestServiceWithChain(t *testing.T, bc *core.Blockchain) *service { return srv.(*service) } +type testBlockQueuer struct { + bc *core.Blockchain +} + +var _ = BlockQueuer(testBlockQueuer{}) + +// PutBlock implements BlockQueuer interface. +func (bq testBlockQueuer) PutBlock(b *coreb.Block) error { + return bq.bc.AddBlock(b) +} + func getTestValidator(i int) (*privateKey, *publicKey) { key := testchain.PrivateKey(i) return &privateKey{PrivateKey: key}, &publicKey{PublicKey: key.PublicKey()} diff --git a/pkg/network/blockqueue.go b/pkg/network/bqueue/queue.go similarity index 69% rename from pkg/network/blockqueue.go rename to pkg/network/bqueue/queue.go index 5927b72f7..1cf8ec548 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/bqueue/queue.go @@ -1,4 +1,4 @@ -package network +package bqueue import ( "sync" @@ -15,7 +15,8 @@ type Blockqueuer interface { BlockHeight() uint32 } -type blockQueue struct { +// Queue is the block queue. +type Queue struct { log *zap.Logger queueLock sync.RWMutex queue []*block.Block @@ -25,34 +26,36 @@ type blockQueue struct { relayF func(*block.Block) discarded *atomic.Bool len int + lenUpdateF func(int) } -const ( - // blockCacheSize is the amount of blocks above the current height - // which are stored in the queue. - blockCacheSize = 2000 -) +// CacheSize is the amount of blocks above the current height +// which are stored in the queue. +const CacheSize = 2000 func indexToPosition(i uint32) int { - return int(i) % blockCacheSize + return int(i) % CacheSize } -func newBlockQueue(capacity int, bc Blockqueuer, log *zap.Logger, relayer func(*block.Block)) *blockQueue { +// New creates an instance of BlockQueue. +func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), lenMetricsUpdater func(l int)) *Queue { if log == nil { return nil } - return &blockQueue{ + return &Queue{ log: log, - queue: make([]*block.Block, blockCacheSize), + queue: make([]*block.Block, CacheSize), checkBlocks: make(chan struct{}, 1), chain: bc, relayF: relayer, discarded: atomic.NewBool(false), + lenUpdateF: lenMetricsUpdater, } } -func (bq *blockQueue) run() { +// Run runs the BlockQueue queueing loop. It must be called in a separate routine. +func (bq *Queue) Run() { var lastHeight = bq.chain.BlockHeight() for { _, ok := <-bq.checkBlocks @@ -97,19 +100,22 @@ func (bq *blockQueue) run() { bq.queue[pos] = nil } bq.queueLock.Unlock() - updateBlockQueueLenMetric(l) + if bq.lenUpdateF != nil { + bq.lenUpdateF(l) + } } } } -func (bq *blockQueue) putBlock(block *block.Block) error { +// PutBlock enqueues block to be added to the chain. +func (bq *Queue) PutBlock(block *block.Block) error { h := bq.chain.BlockHeight() bq.queueLock.Lock() defer bq.queueLock.Unlock() if bq.discarded.Load() { return nil } - if block.Index <= h || h+blockCacheSize < block.Index { + if block.Index <= h || h+CacheSize < block.Index { // can easily happen when fetching the same blocks from // different peers, thus not considered as error return nil @@ -119,14 +125,15 @@ func (bq *blockQueue) putBlock(block *block.Block) error { if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { bq.len++ bq.queue[pos] = block - for pos < blockCacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { + for pos < CacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { bq.lastQ = bq.queue[pos].Index pos++ } } - l := bq.len // update metrics - updateBlockQueueLenMetric(l) + if bq.lenUpdateF != nil { + bq.lenUpdateF(bq.len) + } select { case bq.checkBlocks <- struct{}{}: // ok, signalled to goroutine processing queue @@ -136,20 +143,21 @@ func (bq *blockQueue) putBlock(block *block.Block) error { return nil } -// lastQueued returns the index of the last queued block and the queue's capacity +// LastQueued returns the index of the last queued block and the queue's capacity // left. -func (bq *blockQueue) lastQueued() (uint32, int) { +func (bq *Queue) LastQueued() (uint32, int) { bq.queueLock.RLock() defer bq.queueLock.RUnlock() - return bq.lastQ, blockCacheSize - bq.len + return bq.lastQ, CacheSize - bq.len } -func (bq *blockQueue) discard() { +// Discard stops the queue and prevents it from accepting more blocks to enqueue. +func (bq *Queue) Discard() { if bq.discarded.CAS(false, true) { bq.queueLock.Lock() close(bq.checkBlocks) // Technically we could bq.queue = nil, but this would cost - // another if in run(). + // another if in Run(). for i := 0; i < len(bq.queue); i++ { bq.queue[i] = nil } diff --git a/pkg/network/blockqueue_test.go b/pkg/network/bqueue/queue_test.go similarity index 69% rename from pkg/network/blockqueue_test.go rename to pkg/network/bqueue/queue_test.go index 642ba8ccd..34eabc4b6 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/bqueue/queue_test.go @@ -1,4 +1,4 @@ -package network +package bqueue import ( "testing" @@ -13,77 +13,77 @@ import ( func TestBlockQueue(t *testing.T) { chain := fakechain.NewFakeChain() // notice, it's not yet running - bq := newBlockQueue(0, chain, zaptest.NewLogger(t), nil) + bq := New(chain, zaptest.NewLogger(t), nil, nil) blocks := make([]*block.Block, 11) for i := 1; i < 11; i++ { blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}} } // not the ones expected currently for i := 3; i < 5; i++ { - assert.NoError(t, bq.putBlock(blocks[i])) + assert.NoError(t, bq.PutBlock(blocks[i])) } - last, capLeft := bq.lastQueued() + last, capLeft := bq.LastQueued() assert.Equal(t, uint32(0), last) - assert.Equal(t, blockCacheSize-2, capLeft) + assert.Equal(t, CacheSize-2, capLeft) // nothing should be put into the blockchain assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 2, bq.length()) // now added the expected ones (with duplicates) for i := 1; i < 5; i++ { - assert.NoError(t, bq.putBlock(blocks[i])) + assert.NoError(t, bq.PutBlock(blocks[i])) } // but they're still not put into the blockchain, because bq isn't running - last, capLeft = bq.lastQueued() + last, capLeft = bq.LastQueued() assert.Equal(t, uint32(4), last) - assert.Equal(t, blockCacheSize-4, capLeft) + assert.Equal(t, CacheSize-4, capLeft) assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 4, bq.length()) // block with too big index is dropped - assert.NoError(t, bq.putBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + blockCacheSize + 1}})) + assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + CacheSize + 1}})) assert.Equal(t, 4, bq.length()) - go bq.run() + go bq.Run() // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond) - last, capLeft = bq.lastQueued() + last, capLeft = bq.LastQueued() assert.Equal(t, uint32(4), last) - assert.Equal(t, blockCacheSize, capLeft) + assert.Equal(t, CacheSize, capLeft) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // put some old blocks for i := 1; i < 5; i++ { - assert.NoError(t, bq.putBlock(blocks[i])) + assert.NoError(t, bq.PutBlock(blocks[i])) } - last, capLeft = bq.lastQueued() + last, capLeft = bq.LastQueued() assert.Equal(t, uint32(4), last) - assert.Equal(t, blockCacheSize, capLeft) + assert.Equal(t, CacheSize, capLeft) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // unexpected blocks with run() active - assert.NoError(t, bq.putBlock(blocks[8])) + assert.NoError(t, bq.PutBlock(blocks[8])) assert.Equal(t, 1, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) - assert.NoError(t, bq.putBlock(blocks[7])) + assert.NoError(t, bq.PutBlock(blocks[7])) assert.Equal(t, 2, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // sparse put - assert.NoError(t, bq.putBlock(blocks[10])) + assert.NoError(t, bq.PutBlock(blocks[10])) assert.Equal(t, 3, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) - assert.NoError(t, bq.putBlock(blocks[6])) - assert.NoError(t, bq.putBlock(blocks[5])) + assert.NoError(t, bq.PutBlock(blocks[6])) + assert.NoError(t, bq.PutBlock(blocks[5])) // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond) - last, capLeft = bq.lastQueued() + last, capLeft = bq.LastQueued() assert.Equal(t, uint32(8), last) - assert.Equal(t, blockCacheSize-1, capLeft) + assert.Equal(t, CacheSize-1, capLeft) assert.Equal(t, 1, bq.length()) assert.Equal(t, uint32(8), chain.BlockHeight()) - bq.discard() + bq.Discard() assert.Equal(t, 0, bq.length()) } // length wraps len access for tests to make them thread-safe. -func (bq *blockQueue) length() int { +func (bq *Queue) length() int { bq.queueLock.Lock() defer bq.queueLock.Unlock() return bq.len diff --git a/pkg/network/server.go b/pkg/network/server.go index 112ab56f4..ca37d774f 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -24,6 +24,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" @@ -57,7 +58,7 @@ type ( Ledger interface { extpool.Ledger mempool.Feer - Blockqueuer + bqueue.Blockqueuer GetBlock(hash util.Uint256) (*block.Block, error) GetConfig() config.Blockchain GetHeader(hash util.Uint256) (*block.Header, error) @@ -100,8 +101,8 @@ type ( transports []Transporter discovery Discoverer chain Ledger - bQueue *blockQueue - bSyncQueue *blockQueue + bQueue *bqueue.Queue + bSyncQueue *bqueue.Queue mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool @@ -204,11 +205,11 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy }, s.notaryFeer) }) } - s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { + s.bQueue = bqueue.New(chain, log, func(b *block.Block) { s.tryStartServices() - }) + }, updateBlockQueueLenMetric) - s.bSyncQueue = newBlockQueue(maxBlockBatch, s.stateSync, log, nil) + s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric) if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", @@ -278,8 +279,8 @@ func (s *Server) Start(errChan chan error) { } go s.broadcastTxLoop() go s.relayBlocksLoop() - go s.bQueue.run() - go s.bSyncQueue.run() + go s.bQueue.Run() + go s.bSyncQueue.Run() for _, tr := range s.transports { go tr.Accept() } @@ -297,8 +298,8 @@ func (s *Server) Shutdown() { for _, p := range s.getPeers(nil) { p.Disconnect(errServerShutdown) } - s.bQueue.discard() - s.bSyncQueue.discard() + s.bQueue.Discard() + s.bSyncQueue.Discard() s.serviceLock.RLock() for _, svc := range s.services { svc.Shutdown() @@ -323,6 +324,11 @@ func (s *Server) addService(svc Service) { s.services[svc.Name()] = svc } +// GetBlockQueue returns the block queue instance managed by Server. +func (s *Server) GetBlockQueue() *bqueue.Queue { + return s.bQueue +} + // AddExtensibleService register a service that handles an extensible payload of some kind. func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) { s.serviceLock.Lock() @@ -723,9 +729,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // handleBlockCmd processes the block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { if s.stateSync.IsActive() { - return s.bSyncQueue.putBlock(block) + return s.bSyncQueue.PutBlock(block) } - return s.bQueue.putBlock(block) + return s.bQueue.PutBlock(block) } // handlePing processes a ping request. @@ -749,7 +755,7 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error { return nil } var ( - bq Blockqueuer = s.chain + bq bqueue.Blockqueuer = s.chain requestMPTNodes bool ) if s.stateSync.IsActive() { @@ -1247,9 +1253,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error { // 1. Block range is divided into chunks of payload.MaxHashesCount. // 2. Send requests for chunk in increasing order. // 3. After all requests have been sent, request random height. -func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error { +func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error { pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) - lq, capLeft := s.bQueue.lastQueued() + lq, capLeft := s.bQueue.LastQueued() if capLeft == 0 { // No more blocks will fit into the queue. return nil @@ -1274,7 +1280,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato if !lastRequestedHeight.CAS(old, needHeight) { continue } - } else if old < currHeight+(blockCacheSize-payload.MaxHashesCount) { + } else if old < currHeight+(bqueue.CacheSize-payload.MaxHashesCount) { needHeight = currHeight + 1 if peerHeight > old+payload.MaxHashesCount { needHeight = old + payload.MaxHashesCount @@ -1283,7 +1289,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato } } } else { - index := mrand.Intn(blockCacheSize / payload.MaxHashesCount) + index := mrand.Intn(bqueue.CacheSize / payload.MaxHashesCount) needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount) } break @@ -1381,7 +1387,7 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { func (s *Server) tryInitStateSync() { if !s.stateSync.IsActive() { - s.bSyncQueue.discard() + s.bSyncQueue.Discard() return } @@ -1421,7 +1427,7 @@ func (s *Server) tryInitStateSync() { // module can be inactive after init (i.e. full state is collected and ordinary block processing is needed) if !s.stateSync.IsActive() { - s.bSyncQueue.discard() + s.bSyncQueue.Discard() } } } diff --git a/pkg/network/state_sync.go b/pkg/network/state_sync.go index 8e6392e7a..b7e5a3d3b 100644 --- a/pkg/network/state_sync.go +++ b/pkg/network/state_sync.go @@ -2,13 +2,14 @@ package network import ( "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/util" ) // StateSync represents state sync module. type StateSync interface { AddMPTNodes([][]byte) error - Blockqueuer + bqueue.Blockqueuer Init(currChainHeight uint32) error IsActive() bool IsInitialized() bool diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index dc3d93a85..e3b6bea96 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -64,7 +64,12 @@ type ( mp *mempool.Pool // requests channel - reqCh chan mempoolevent.Event + reqCh chan mempoolevent.Event + // blocksCh is a channel used to receive block notifications from the + // Blockchain. It is not buffered intentionally, as it's important to keep + // the notary request pool in sync with the current blockchain heigh, thus, + // it's not recommended to use a large size of notary requests pool as it may + // slow down the block processing. blocksCh chan *block.Block stopCh chan struct{} done chan struct{} diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 4dd21e42b..32de50124 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -63,9 +63,12 @@ type ( timePerBlock time.Duration maxRetries int relayExtensible RelayCallback - blockCh chan *block.Block - stopCh chan struct{} - done chan struct{} + // blockCh is a channel used to receive block notifications from the + // Blockchain. It has a tiny buffer in order to avoid Blockchain blocking + // on block addition under the high load. + blockCh chan *block.Block + stopCh chan struct{} + done chan struct{} } ) @@ -84,7 +87,7 @@ func New(cfg config.StateRoot, sm *stateroot.Module, log *zap.Logger, bc Ledger, chain: bc, log: log, incompleteRoots: make(map[uint32]*incompleteRoot), - blockCh: make(chan *block.Block), + blockCh: make(chan *block.Block, 1), stopCh: make(chan struct{}), done: make(chan struct{}), timePerBlock: bcConf.TimePerBlock,