diff --git a/pkg/network/blockqueue.go b/pkg/network/bqueue/queue.go similarity index 69% rename from pkg/network/blockqueue.go rename to pkg/network/bqueue/queue.go index 15e325dfd..1cf8ec548 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/bqueue/queue.go @@ -1,4 +1,4 @@ -package network +package bqueue import ( "sync" @@ -15,7 +15,8 @@ type Blockqueuer interface { BlockHeight() uint32 } -type blockQueue struct { +// Queue is the block queue. +type Queue struct { log *zap.Logger queueLock sync.RWMutex queue []*block.Block @@ -25,34 +26,36 @@ type blockQueue struct { relayF func(*block.Block) discarded *atomic.Bool len int + lenUpdateF func(int) } -const ( - // blockCacheSize is the amount of blocks above the current height - // which are stored in the queue. - blockCacheSize = 2000 -) +// CacheSize is the amount of blocks above the current height +// which are stored in the queue. +const CacheSize = 2000 func indexToPosition(i uint32) int { - return int(i) % blockCacheSize + return int(i) % CacheSize } -func newBlockQueue(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block)) *blockQueue { +// New creates an instance of BlockQueue. +func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), lenMetricsUpdater func(l int)) *Queue { if log == nil { return nil } - return &blockQueue{ + return &Queue{ log: log, - queue: make([]*block.Block, blockCacheSize), + queue: make([]*block.Block, CacheSize), checkBlocks: make(chan struct{}, 1), chain: bc, relayF: relayer, discarded: atomic.NewBool(false), + lenUpdateF: lenMetricsUpdater, } } -func (bq *blockQueue) run() { +// Run runs the BlockQueue queueing loop. It must be called in a separate routine. +func (bq *Queue) Run() { var lastHeight = bq.chain.BlockHeight() for { _, ok := <-bq.checkBlocks @@ -97,19 +100,22 @@ func (bq *blockQueue) run() { bq.queue[pos] = nil } bq.queueLock.Unlock() - updateBlockQueueLenMetric(l) + if bq.lenUpdateF != nil { + bq.lenUpdateF(l) + } } } } -func (bq *blockQueue) putBlock(block *block.Block) error { +// PutBlock enqueues block to be added to the chain. +func (bq *Queue) PutBlock(block *block.Block) error { h := bq.chain.BlockHeight() bq.queueLock.Lock() defer bq.queueLock.Unlock() if bq.discarded.Load() { return nil } - if block.Index <= h || h+blockCacheSize < block.Index { + if block.Index <= h || h+CacheSize < block.Index { // can easily happen when fetching the same blocks from // different peers, thus not considered as error return nil @@ -119,14 +125,15 @@ func (bq *blockQueue) putBlock(block *block.Block) error { if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { bq.len++ bq.queue[pos] = block - for pos < blockCacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { + for pos < CacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { bq.lastQ = bq.queue[pos].Index pos++ } } - l := bq.len // update metrics - updateBlockQueueLenMetric(l) + if bq.lenUpdateF != nil { + bq.lenUpdateF(bq.len) + } select { case bq.checkBlocks <- struct{}{}: // ok, signalled to goroutine processing queue @@ -136,20 +143,21 @@ func (bq *blockQueue) putBlock(block *block.Block) error { return nil } -// lastQueued returns the index of the last queued block and the queue's capacity +// LastQueued returns the index of the last queued block and the queue's capacity // left. -func (bq *blockQueue) lastQueued() (uint32, int) { +func (bq *Queue) LastQueued() (uint32, int) { bq.queueLock.RLock() defer bq.queueLock.RUnlock() - return bq.lastQ, blockCacheSize - bq.len + return bq.lastQ, CacheSize - bq.len } -func (bq *blockQueue) discard() { +// Discard stops the queue and prevents it from accepting more blocks to enqueue. +func (bq *Queue) Discard() { if bq.discarded.CAS(false, true) { bq.queueLock.Lock() close(bq.checkBlocks) // Technically we could bq.queue = nil, but this would cost - // another if in run(). + // another if in Run(). for i := 0; i < len(bq.queue); i++ { bq.queue[i] = nil } diff --git a/pkg/network/blockqueue_test.go b/pkg/network/bqueue/queue_test.go similarity index 69% rename from pkg/network/blockqueue_test.go rename to pkg/network/bqueue/queue_test.go index 2427356b5..34eabc4b6 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/bqueue/queue_test.go @@ -1,4 +1,4 @@ -package network +package bqueue import ( "testing" @@ -13,77 +13,77 @@ import ( func TestBlockQueue(t *testing.T) { chain := fakechain.NewFakeChain() // notice, it's not yet running - bq := newBlockQueue(chain, zaptest.NewLogger(t), nil) + bq := New(chain, zaptest.NewLogger(t), nil, nil) blocks := make([]*block.Block, 11) for i := 1; i < 11; i++ { blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}} } // not the ones expected currently for i := 3; i < 5; i++ { - assert.NoError(t, bq.putBlock(blocks[i])) + assert.NoError(t, bq.PutBlock(blocks[i])) } - last, capLeft := bq.lastQueued() + last, capLeft := bq.LastQueued() assert.Equal(t, uint32(0), last) - assert.Equal(t, blockCacheSize-2, capLeft) + assert.Equal(t, CacheSize-2, capLeft) // nothing should be put into the blockchain assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 2, bq.length()) // now added the expected ones (with duplicates) for i := 1; i < 5; i++ { - assert.NoError(t, bq.putBlock(blocks[i])) + assert.NoError(t, bq.PutBlock(blocks[i])) } // but they're still not put into the blockchain, because bq isn't running - last, capLeft = bq.lastQueued() + last, capLeft = bq.LastQueued() assert.Equal(t, uint32(4), last) - assert.Equal(t, blockCacheSize-4, capLeft) + assert.Equal(t, CacheSize-4, capLeft) assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 4, bq.length()) // block with too big index is dropped - assert.NoError(t, bq.putBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + blockCacheSize + 1}})) + assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + CacheSize + 1}})) assert.Equal(t, 4, bq.length()) - go bq.run() + go bq.Run() // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond) - last, capLeft = bq.lastQueued() + last, capLeft = bq.LastQueued() assert.Equal(t, uint32(4), last) - assert.Equal(t, blockCacheSize, capLeft) + assert.Equal(t, CacheSize, capLeft) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // put some old blocks for i := 1; i < 5; i++ { - assert.NoError(t, bq.putBlock(blocks[i])) + assert.NoError(t, bq.PutBlock(blocks[i])) } - last, capLeft = bq.lastQueued() + last, capLeft = bq.LastQueued() assert.Equal(t, uint32(4), last) - assert.Equal(t, blockCacheSize, capLeft) + assert.Equal(t, CacheSize, capLeft) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // unexpected blocks with run() active - assert.NoError(t, bq.putBlock(blocks[8])) + assert.NoError(t, bq.PutBlock(blocks[8])) assert.Equal(t, 1, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) - assert.NoError(t, bq.putBlock(blocks[7])) + assert.NoError(t, bq.PutBlock(blocks[7])) assert.Equal(t, 2, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // sparse put - assert.NoError(t, bq.putBlock(blocks[10])) + assert.NoError(t, bq.PutBlock(blocks[10])) assert.Equal(t, 3, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) - assert.NoError(t, bq.putBlock(blocks[6])) - assert.NoError(t, bq.putBlock(blocks[5])) + assert.NoError(t, bq.PutBlock(blocks[6])) + assert.NoError(t, bq.PutBlock(blocks[5])) // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond) - last, capLeft = bq.lastQueued() + last, capLeft = bq.LastQueued() assert.Equal(t, uint32(8), last) - assert.Equal(t, blockCacheSize-1, capLeft) + assert.Equal(t, CacheSize-1, capLeft) assert.Equal(t, 1, bq.length()) assert.Equal(t, uint32(8), chain.BlockHeight()) - bq.discard() + bq.Discard() assert.Equal(t, 0, bq.length()) } // length wraps len access for tests to make them thread-safe. -func (bq *blockQueue) length() int { +func (bq *Queue) length() int { bq.queueLock.Lock() defer bq.queueLock.Unlock() return bq.len diff --git a/pkg/network/server.go b/pkg/network/server.go index f7d94b95e..5598ec661 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -24,6 +24,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" @@ -57,7 +58,7 @@ type ( Ledger interface { extpool.Ledger mempool.Feer - Blockqueuer + bqueue.Blockqueuer GetBlock(hash util.Uint256) (*block.Block, error) GetConfig() config.Blockchain GetHeader(hash util.Uint256) (*block.Header, error) @@ -100,8 +101,8 @@ type ( transports []Transporter discovery Discoverer chain Ledger - bQueue *blockQueue - bSyncQueue *blockQueue + bQueue *bqueue.Queue + bSyncQueue *bqueue.Queue mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool @@ -204,11 +205,11 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy }, s.notaryFeer) }) } - s.bQueue = newBlockQueue(chain, log, func(b *block.Block) { + s.bQueue = bqueue.New(chain, log, func(b *block.Block) { s.tryStartServices() - }) + }, updateBlockQueueLenMetric) - s.bSyncQueue = newBlockQueue(s.stateSync, log, nil) + s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric) if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", @@ -278,8 +279,8 @@ func (s *Server) Start(errChan chan error) { } go s.broadcastTxLoop() go s.relayBlocksLoop() - go s.bQueue.run() - go s.bSyncQueue.run() + go s.bQueue.Run() + go s.bSyncQueue.Run() for _, tr := range s.transports { go tr.Accept() } @@ -297,8 +298,8 @@ func (s *Server) Shutdown() { for _, p := range s.getPeers(nil) { p.Disconnect(errServerShutdown) } - s.bQueue.discard() - s.bSyncQueue.discard() + s.bQueue.Discard() + s.bSyncQueue.Discard() s.serviceLock.RLock() for _, svc := range s.services { svc.Shutdown() @@ -723,9 +724,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // handleBlockCmd processes the block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { if s.stateSync.IsActive() { - return s.bSyncQueue.putBlock(block) + return s.bSyncQueue.PutBlock(block) } - return s.bQueue.putBlock(block) + return s.bQueue.PutBlock(block) } // handlePing processes a ping request. @@ -749,7 +750,7 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error { return nil } var ( - bq Blockqueuer = s.chain + bq bqueue.Blockqueuer = s.chain requestMPTNodes bool ) if s.stateSync.IsActive() { @@ -1247,9 +1248,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error { // 1. Block range is divided into chunks of payload.MaxHashesCount. // 2. Send requests for chunk in increasing order. // 3. After all requests have been sent, request random height. -func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error { +func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error { pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) - lq, capLeft := s.bQueue.lastQueued() + lq, capLeft := s.bQueue.LastQueued() if capLeft == 0 { // No more blocks will fit into the queue. return nil @@ -1274,7 +1275,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato if !lastRequestedHeight.CAS(old, needHeight) { continue } - } else if old < currHeight+(blockCacheSize-payload.MaxHashesCount) { + } else if old < currHeight+(bqueue.CacheSize-payload.MaxHashesCount) { needHeight = currHeight + 1 if peerHeight > old+payload.MaxHashesCount { needHeight = old + payload.MaxHashesCount @@ -1283,7 +1284,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato } } } else { - index := mrand.Intn(blockCacheSize / payload.MaxHashesCount) + index := mrand.Intn(bqueue.CacheSize / payload.MaxHashesCount) needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount) } break @@ -1381,7 +1382,7 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { func (s *Server) tryInitStateSync() { if !s.stateSync.IsActive() { - s.bSyncQueue.discard() + s.bSyncQueue.Discard() return } @@ -1421,7 +1422,7 @@ func (s *Server) tryInitStateSync() { // module can be inactive after init (i.e. full state is collected and ordinary block processing is needed) if !s.stateSync.IsActive() { - s.bSyncQueue.discard() + s.bSyncQueue.Discard() } } } diff --git a/pkg/network/state_sync.go b/pkg/network/state_sync.go index 8e6392e7a..b7e5a3d3b 100644 --- a/pkg/network/state_sync.go +++ b/pkg/network/state_sync.go @@ -2,13 +2,14 @@ package network import ( "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/util" ) // StateSync represents state sync module. type StateSync interface { AddMPTNodes([][]byte) error - Blockqueuer + bqueue.Blockqueuer Init(currChainHeight uint32) error IsActive() bool IsInitialized() bool