network: move blockqueue to a separate package

This commit is contained in:
Anna Shaleva 2023-03-07 11:36:42 +03:00
parent 91a77c25a2
commit 04d0b45ceb
4 changed files with 77 additions and 67 deletions

View file

@ -1,4 +1,4 @@
package network package bqueue
import ( import (
"sync" "sync"
@ -15,7 +15,8 @@ type Blockqueuer interface {
BlockHeight() uint32 BlockHeight() uint32
} }
type blockQueue struct { // Queue is the block queue.
type Queue struct {
log *zap.Logger log *zap.Logger
queueLock sync.RWMutex queueLock sync.RWMutex
queue []*block.Block queue []*block.Block
@ -25,34 +26,36 @@ type blockQueue struct {
relayF func(*block.Block) relayF func(*block.Block)
discarded *atomic.Bool discarded *atomic.Bool
len int len int
lenUpdateF func(int)
} }
const ( // CacheSize is the amount of blocks above the current height
// blockCacheSize is the amount of blocks above the current height // which are stored in the queue.
// which are stored in the queue. const CacheSize = 2000
blockCacheSize = 2000
)
func indexToPosition(i uint32) int { func indexToPosition(i uint32) int {
return int(i) % blockCacheSize return int(i) % CacheSize
} }
func newBlockQueue(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block)) *blockQueue { // New creates an instance of BlockQueue.
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), lenMetricsUpdater func(l int)) *Queue {
if log == nil { if log == nil {
return nil return nil
} }
return &blockQueue{ return &Queue{
log: log, log: log,
queue: make([]*block.Block, blockCacheSize), queue: make([]*block.Block, CacheSize),
checkBlocks: make(chan struct{}, 1), checkBlocks: make(chan struct{}, 1),
chain: bc, chain: bc,
relayF: relayer, relayF: relayer,
discarded: atomic.NewBool(false), discarded: atomic.NewBool(false),
lenUpdateF: lenMetricsUpdater,
} }
} }
func (bq *blockQueue) run() { // Run runs the BlockQueue queueing loop. It must be called in a separate routine.
func (bq *Queue) Run() {
var lastHeight = bq.chain.BlockHeight() var lastHeight = bq.chain.BlockHeight()
for { for {
_, ok := <-bq.checkBlocks _, ok := <-bq.checkBlocks
@ -97,19 +100,22 @@ func (bq *blockQueue) run() {
bq.queue[pos] = nil bq.queue[pos] = nil
} }
bq.queueLock.Unlock() bq.queueLock.Unlock()
updateBlockQueueLenMetric(l) if bq.lenUpdateF != nil {
bq.lenUpdateF(l)
}
} }
} }
} }
func (bq *blockQueue) putBlock(block *block.Block) error { // PutBlock enqueues block to be added to the chain.
func (bq *Queue) PutBlock(block *block.Block) error {
h := bq.chain.BlockHeight() h := bq.chain.BlockHeight()
bq.queueLock.Lock() bq.queueLock.Lock()
defer bq.queueLock.Unlock() defer bq.queueLock.Unlock()
if bq.discarded.Load() { if bq.discarded.Load() {
return nil return nil
} }
if block.Index <= h || h+blockCacheSize < block.Index { if block.Index <= h || h+CacheSize < block.Index {
// can easily happen when fetching the same blocks from // can easily happen when fetching the same blocks from
// different peers, thus not considered as error // different peers, thus not considered as error
return nil return nil
@ -119,14 +125,15 @@ func (bq *blockQueue) putBlock(block *block.Block) error {
if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {
bq.len++ bq.len++
bq.queue[pos] = block bq.queue[pos] = block
for pos < blockCacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { for pos < CacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
bq.lastQ = bq.queue[pos].Index bq.lastQ = bq.queue[pos].Index
pos++ pos++
} }
} }
l := bq.len
// update metrics // update metrics
updateBlockQueueLenMetric(l) if bq.lenUpdateF != nil {
bq.lenUpdateF(bq.len)
}
select { select {
case bq.checkBlocks <- struct{}{}: case bq.checkBlocks <- struct{}{}:
// ok, signalled to goroutine processing queue // ok, signalled to goroutine processing queue
@ -136,20 +143,21 @@ func (bq *blockQueue) putBlock(block *block.Block) error {
return nil return nil
} }
// lastQueued returns the index of the last queued block and the queue's capacity // LastQueued returns the index of the last queued block and the queue's capacity
// left. // left.
func (bq *blockQueue) lastQueued() (uint32, int) { func (bq *Queue) LastQueued() (uint32, int) {
bq.queueLock.RLock() bq.queueLock.RLock()
defer bq.queueLock.RUnlock() defer bq.queueLock.RUnlock()
return bq.lastQ, blockCacheSize - bq.len return bq.lastQ, CacheSize - bq.len
} }
func (bq *blockQueue) discard() { // Discard stops the queue and prevents it from accepting more blocks to enqueue.
func (bq *Queue) Discard() {
if bq.discarded.CAS(false, true) { if bq.discarded.CAS(false, true) {
bq.queueLock.Lock() bq.queueLock.Lock()
close(bq.checkBlocks) close(bq.checkBlocks)
// Technically we could bq.queue = nil, but this would cost // Technically we could bq.queue = nil, but this would cost
// another if in run(). // another if in Run().
for i := 0; i < len(bq.queue); i++ { for i := 0; i < len(bq.queue); i++ {
bq.queue[i] = nil bq.queue[i] = nil
} }

View file

@ -1,4 +1,4 @@
package network package bqueue
import ( import (
"testing" "testing"
@ -13,77 +13,77 @@ import (
func TestBlockQueue(t *testing.T) { func TestBlockQueue(t *testing.T) {
chain := fakechain.NewFakeChain() chain := fakechain.NewFakeChain()
// notice, it's not yet running // notice, it's not yet running
bq := newBlockQueue(chain, zaptest.NewLogger(t), nil) bq := New(chain, zaptest.NewLogger(t), nil, nil)
blocks := make([]*block.Block, 11) blocks := make([]*block.Block, 11)
for i := 1; i < 11; i++ { for i := 1; i < 11; i++ {
blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}} blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}}
} }
// not the ones expected currently // not the ones expected currently
for i := 3; i < 5; i++ { for i := 3; i < 5; i++ {
assert.NoError(t, bq.putBlock(blocks[i])) assert.NoError(t, bq.PutBlock(blocks[i]))
} }
last, capLeft := bq.lastQueued() last, capLeft := bq.LastQueued()
assert.Equal(t, uint32(0), last) assert.Equal(t, uint32(0), last)
assert.Equal(t, blockCacheSize-2, capLeft) assert.Equal(t, CacheSize-2, capLeft)
// nothing should be put into the blockchain // nothing should be put into the blockchain
assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, uint32(0), chain.BlockHeight())
assert.Equal(t, 2, bq.length()) assert.Equal(t, 2, bq.length())
// now added the expected ones (with duplicates) // now added the expected ones (with duplicates)
for i := 1; i < 5; i++ { for i := 1; i < 5; i++ {
assert.NoError(t, bq.putBlock(blocks[i])) assert.NoError(t, bq.PutBlock(blocks[i]))
} }
// but they're still not put into the blockchain, because bq isn't running // but they're still not put into the blockchain, because bq isn't running
last, capLeft = bq.lastQueued() last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(4), last) assert.Equal(t, uint32(4), last)
assert.Equal(t, blockCacheSize-4, capLeft) assert.Equal(t, CacheSize-4, capLeft)
assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, uint32(0), chain.BlockHeight())
assert.Equal(t, 4, bq.length()) assert.Equal(t, 4, bq.length())
// block with too big index is dropped // block with too big index is dropped
assert.NoError(t, bq.putBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + blockCacheSize + 1}})) assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + CacheSize + 1}}))
assert.Equal(t, 4, bq.length()) assert.Equal(t, 4, bq.length())
go bq.run() go bq.Run()
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond) assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond)
last, capLeft = bq.lastQueued() last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(4), last) assert.Equal(t, uint32(4), last)
assert.Equal(t, blockCacheSize, capLeft) assert.Equal(t, CacheSize, capLeft)
assert.Equal(t, 0, bq.length()) assert.Equal(t, 0, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight()) assert.Equal(t, uint32(4), chain.BlockHeight())
// put some old blocks // put some old blocks
for i := 1; i < 5; i++ { for i := 1; i < 5; i++ {
assert.NoError(t, bq.putBlock(blocks[i])) assert.NoError(t, bq.PutBlock(blocks[i]))
} }
last, capLeft = bq.lastQueued() last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(4), last) assert.Equal(t, uint32(4), last)
assert.Equal(t, blockCacheSize, capLeft) assert.Equal(t, CacheSize, capLeft)
assert.Equal(t, 0, bq.length()) assert.Equal(t, 0, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight()) assert.Equal(t, uint32(4), chain.BlockHeight())
// unexpected blocks with run() active // unexpected blocks with run() active
assert.NoError(t, bq.putBlock(blocks[8])) assert.NoError(t, bq.PutBlock(blocks[8]))
assert.Equal(t, 1, bq.length()) assert.Equal(t, 1, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight()) assert.Equal(t, uint32(4), chain.BlockHeight())
assert.NoError(t, bq.putBlock(blocks[7])) assert.NoError(t, bq.PutBlock(blocks[7]))
assert.Equal(t, 2, bq.length()) assert.Equal(t, 2, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight()) assert.Equal(t, uint32(4), chain.BlockHeight())
// sparse put // sparse put
assert.NoError(t, bq.putBlock(blocks[10])) assert.NoError(t, bq.PutBlock(blocks[10]))
assert.Equal(t, 3, bq.length()) assert.Equal(t, 3, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight()) assert.Equal(t, uint32(4), chain.BlockHeight())
assert.NoError(t, bq.putBlock(blocks[6])) assert.NoError(t, bq.PutBlock(blocks[6]))
assert.NoError(t, bq.putBlock(blocks[5])) assert.NoError(t, bq.PutBlock(blocks[5]))
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond) assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond)
last, capLeft = bq.lastQueued() last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(8), last) assert.Equal(t, uint32(8), last)
assert.Equal(t, blockCacheSize-1, capLeft) assert.Equal(t, CacheSize-1, capLeft)
assert.Equal(t, 1, bq.length()) assert.Equal(t, 1, bq.length())
assert.Equal(t, uint32(8), chain.BlockHeight()) assert.Equal(t, uint32(8), chain.BlockHeight())
bq.discard() bq.Discard()
assert.Equal(t, 0, bq.length()) assert.Equal(t, 0, bq.length())
} }
// length wraps len access for tests to make them thread-safe. // length wraps len access for tests to make them thread-safe.
func (bq *blockQueue) length() int { func (bq *Queue) length() int {
bq.queueLock.Lock() bq.queueLock.Lock()
defer bq.queueLock.Unlock() defer bq.queueLock.Unlock()
return bq.len return bq.len

View file

@ -24,6 +24,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/network/payload"
@ -57,7 +58,7 @@ type (
Ledger interface { Ledger interface {
extpool.Ledger extpool.Ledger
mempool.Feer mempool.Feer
Blockqueuer bqueue.Blockqueuer
GetBlock(hash util.Uint256) (*block.Block, error) GetBlock(hash util.Uint256) (*block.Block, error)
GetConfig() config.Blockchain GetConfig() config.Blockchain
GetHeader(hash util.Uint256) (*block.Header, error) GetHeader(hash util.Uint256) (*block.Header, error)
@ -100,8 +101,8 @@ type (
transports []Transporter transports []Transporter
discovery Discoverer discovery Discoverer
chain Ledger chain Ledger
bQueue *blockQueue bQueue *bqueue.Queue
bSyncQueue *blockQueue bSyncQueue *bqueue.Queue
mempool *mempool.Pool mempool *mempool.Pool
notaryRequestPool *mempool.Pool notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool extensiblePool *extpool.Pool
@ -204,11 +205,11 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}, s.notaryFeer) }, s.notaryFeer)
}) })
} }
s.bQueue = newBlockQueue(chain, log, func(b *block.Block) { s.bQueue = bqueue.New(chain, log, func(b *block.Block) {
s.tryStartServices() s.tryStartServices()
}) }, updateBlockQueueLenMetric)
s.bSyncQueue = newBlockQueue(s.stateSync, log, nil) s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric)
if s.MinPeers < 0 { if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value", s.log.Info("bad MinPeers configured, using the default value",
@ -278,8 +279,8 @@ func (s *Server) Start(errChan chan error) {
} }
go s.broadcastTxLoop() go s.broadcastTxLoop()
go s.relayBlocksLoop() go s.relayBlocksLoop()
go s.bQueue.run() go s.bQueue.Run()
go s.bSyncQueue.run() go s.bSyncQueue.Run()
for _, tr := range s.transports { for _, tr := range s.transports {
go tr.Accept() go tr.Accept()
} }
@ -297,8 +298,8 @@ func (s *Server) Shutdown() {
for _, p := range s.getPeers(nil) { for _, p := range s.getPeers(nil) {
p.Disconnect(errServerShutdown) p.Disconnect(errServerShutdown)
} }
s.bQueue.discard() s.bQueue.Discard()
s.bSyncQueue.discard() s.bSyncQueue.Discard()
s.serviceLock.RLock() s.serviceLock.RLock()
for _, svc := range s.services { for _, svc := range s.services {
svc.Shutdown() svc.Shutdown()
@ -723,9 +724,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
// handleBlockCmd processes the block received from its peer. // handleBlockCmd processes the block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
if s.stateSync.IsActive() { if s.stateSync.IsActive() {
return s.bSyncQueue.putBlock(block) return s.bSyncQueue.PutBlock(block)
} }
return s.bQueue.putBlock(block) return s.bQueue.PutBlock(block)
} }
// handlePing processes a ping request. // handlePing processes a ping request.
@ -749,7 +750,7 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error {
return nil return nil
} }
var ( var (
bq Blockqueuer = s.chain bq bqueue.Blockqueuer = s.chain
requestMPTNodes bool requestMPTNodes bool
) )
if s.stateSync.IsActive() { if s.stateSync.IsActive() {
@ -1247,9 +1248,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
// 1. Block range is divided into chunks of payload.MaxHashesCount. // 1. Block range is divided into chunks of payload.MaxHashesCount.
// 2. Send requests for chunk in increasing order. // 2. Send requests for chunk in increasing order.
// 3. After all requests have been sent, request random height. // 3. After all requests have been sent, request random height.
func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error { func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error {
pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock)
lq, capLeft := s.bQueue.lastQueued() lq, capLeft := s.bQueue.LastQueued()
if capLeft == 0 { if capLeft == 0 {
// No more blocks will fit into the queue. // No more blocks will fit into the queue.
return nil return nil
@ -1274,7 +1275,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato
if !lastRequestedHeight.CAS(old, needHeight) { if !lastRequestedHeight.CAS(old, needHeight) {
continue continue
} }
} else if old < currHeight+(blockCacheSize-payload.MaxHashesCount) { } else if old < currHeight+(bqueue.CacheSize-payload.MaxHashesCount) {
needHeight = currHeight + 1 needHeight = currHeight + 1
if peerHeight > old+payload.MaxHashesCount { if peerHeight > old+payload.MaxHashesCount {
needHeight = old + payload.MaxHashesCount needHeight = old + payload.MaxHashesCount
@ -1283,7 +1284,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato
} }
} }
} else { } else {
index := mrand.Intn(blockCacheSize / payload.MaxHashesCount) index := mrand.Intn(bqueue.CacheSize / payload.MaxHashesCount)
needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount) needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount)
} }
break break
@ -1381,7 +1382,7 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
func (s *Server) tryInitStateSync() { func (s *Server) tryInitStateSync() {
if !s.stateSync.IsActive() { if !s.stateSync.IsActive() {
s.bSyncQueue.discard() s.bSyncQueue.Discard()
return return
} }
@ -1421,7 +1422,7 @@ func (s *Server) tryInitStateSync() {
// module can be inactive after init (i.e. full state is collected and ordinary block processing is needed) // module can be inactive after init (i.e. full state is collected and ordinary block processing is needed)
if !s.stateSync.IsActive() { if !s.stateSync.IsActive() {
s.bSyncQueue.discard() s.bSyncQueue.Discard()
} }
} }
} }

View file

@ -2,13 +2,14 @@ package network
import ( import (
"github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
) )
// StateSync represents state sync module. // StateSync represents state sync module.
type StateSync interface { type StateSync interface {
AddMPTNodes([][]byte) error AddMPTNodes([][]byte) error
Blockqueuer bqueue.Blockqueuer
Init(currChainHeight uint32) error Init(currChainHeight uint32) error
IsActive() bool IsActive() bool
IsInitialized() bool IsInitialized() bool