mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2024-11-25 23:42:23 +00:00
network: make cash size of bqueue configurable
Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
parent
d9a6a7cd3f
commit
6f2712ee55
3 changed files with 28 additions and 23 deletions
|
@ -27,29 +27,34 @@ type Queue struct {
|
|||
discarded atomic.Bool
|
||||
len int
|
||||
lenUpdateF func(int)
|
||||
cacheSize int
|
||||
}
|
||||
|
||||
// CacheSize is the amount of blocks above the current height
|
||||
// DefaultCacheSize is the default amount of blocks above the current height
|
||||
// which are stored in the queue.
|
||||
const CacheSize = 2000
|
||||
const DefaultCacheSize = 2000
|
||||
|
||||
func indexToPosition(i uint32) int {
|
||||
return int(i) % CacheSize
|
||||
func (bq *Queue) indexToPosition(i uint32) int {
|
||||
return int(i) % bq.cacheSize
|
||||
}
|
||||
|
||||
// New creates an instance of BlockQueue.
|
||||
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), lenMetricsUpdater func(l int)) *Queue {
|
||||
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int)) *Queue {
|
||||
if log == nil {
|
||||
return nil
|
||||
}
|
||||
if cacheSize <= 0 {
|
||||
cacheSize = DefaultCacheSize
|
||||
}
|
||||
|
||||
return &Queue{
|
||||
log: log,
|
||||
queue: make([]*block.Block, CacheSize),
|
||||
queue: make([]*block.Block, cacheSize),
|
||||
checkBlocks: make(chan struct{}, 1),
|
||||
chain: bc,
|
||||
relayF: relayer,
|
||||
lenUpdateF: lenMetricsUpdater,
|
||||
cacheSize: cacheSize,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,12 +68,12 @@ func (bq *Queue) Run() {
|
|||
}
|
||||
for {
|
||||
h := bq.chain.BlockHeight()
|
||||
pos := indexToPosition(h + 1)
|
||||
pos := bq.indexToPosition(h + 1)
|
||||
bq.queueLock.Lock()
|
||||
b := bq.queue[pos]
|
||||
// The chain moved forward using blocks from other sources (consensus).
|
||||
for i := lastHeight; i < h; i++ {
|
||||
old := indexToPosition(i + 1)
|
||||
old := bq.indexToPosition(i + 1)
|
||||
if bq.queue[old] != nil && bq.queue[old].Index == i {
|
||||
bq.len--
|
||||
bq.queue[old] = nil
|
||||
|
@ -114,17 +119,17 @@ func (bq *Queue) PutBlock(block *block.Block) error {
|
|||
if bq.discarded.Load() {
|
||||
return nil
|
||||
}
|
||||
if block.Index <= h || h+CacheSize < block.Index {
|
||||
if block.Index <= h || h+uint32(bq.cacheSize) < block.Index {
|
||||
// can easily happen when fetching the same blocks from
|
||||
// different peers, thus not considered as error
|
||||
return nil
|
||||
}
|
||||
pos := indexToPosition(block.Index)
|
||||
pos := bq.indexToPosition(block.Index)
|
||||
// If we already have it, keep the old block, throw away the new one.
|
||||
if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {
|
||||
bq.len++
|
||||
bq.queue[pos] = block
|
||||
for pos < CacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
|
||||
for pos < bq.cacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
|
||||
bq.lastQ = bq.queue[pos].Index
|
||||
pos++
|
||||
}
|
||||
|
@ -147,7 +152,7 @@ func (bq *Queue) PutBlock(block *block.Block) error {
|
|||
func (bq *Queue) LastQueued() (uint32, int) {
|
||||
bq.queueLock.RLock()
|
||||
defer bq.queueLock.RUnlock()
|
||||
return bq.lastQ, CacheSize - bq.len
|
||||
return bq.lastQ, bq.cacheSize - bq.len
|
||||
}
|
||||
|
||||
// Discard stops the queue and prevents it from accepting more blocks to enqueue.
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
func TestBlockQueue(t *testing.T) {
|
||||
chain := fakechain.NewFakeChain()
|
||||
// notice, it's not yet running
|
||||
bq := New(chain, zaptest.NewLogger(t), nil, nil)
|
||||
bq := New(chain, zaptest.NewLogger(t), nil, 0, nil)
|
||||
blocks := make([]*block.Block, 11)
|
||||
for i := 1; i < 11; i++ {
|
||||
blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}}
|
||||
|
@ -24,7 +24,7 @@ func TestBlockQueue(t *testing.T) {
|
|||
}
|
||||
last, capLeft := bq.LastQueued()
|
||||
assert.Equal(t, uint32(0), last)
|
||||
assert.Equal(t, CacheSize-2, capLeft)
|
||||
assert.Equal(t, DefaultCacheSize-2, capLeft)
|
||||
// nothing should be put into the blockchain
|
||||
assert.Equal(t, uint32(0), chain.BlockHeight())
|
||||
assert.Equal(t, 2, bq.length())
|
||||
|
@ -35,18 +35,18 @@ func TestBlockQueue(t *testing.T) {
|
|||
// but they're still not put into the blockchain, because bq isn't running
|
||||
last, capLeft = bq.LastQueued()
|
||||
assert.Equal(t, uint32(4), last)
|
||||
assert.Equal(t, CacheSize-4, capLeft)
|
||||
assert.Equal(t, DefaultCacheSize-4, capLeft)
|
||||
assert.Equal(t, uint32(0), chain.BlockHeight())
|
||||
assert.Equal(t, 4, bq.length())
|
||||
// block with too big index is dropped
|
||||
assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + CacheSize + 1}}))
|
||||
assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + DefaultCacheSize + 1}}))
|
||||
assert.Equal(t, 4, bq.length())
|
||||
go bq.Run()
|
||||
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
|
||||
assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond)
|
||||
last, capLeft = bq.LastQueued()
|
||||
assert.Equal(t, uint32(4), last)
|
||||
assert.Equal(t, CacheSize, capLeft)
|
||||
assert.Equal(t, DefaultCacheSize, capLeft)
|
||||
assert.Equal(t, 0, bq.length())
|
||||
assert.Equal(t, uint32(4), chain.BlockHeight())
|
||||
// put some old blocks
|
||||
|
@ -55,7 +55,7 @@ func TestBlockQueue(t *testing.T) {
|
|||
}
|
||||
last, capLeft = bq.LastQueued()
|
||||
assert.Equal(t, uint32(4), last)
|
||||
assert.Equal(t, CacheSize, capLeft)
|
||||
assert.Equal(t, DefaultCacheSize, capLeft)
|
||||
assert.Equal(t, 0, bq.length())
|
||||
assert.Equal(t, uint32(4), chain.BlockHeight())
|
||||
// unexpected blocks with run() active
|
||||
|
@ -75,7 +75,7 @@ func TestBlockQueue(t *testing.T) {
|
|||
assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond)
|
||||
last, capLeft = bq.LastQueued()
|
||||
assert.Equal(t, uint32(8), last)
|
||||
assert.Equal(t, CacheSize-1, capLeft)
|
||||
assert.Equal(t, DefaultCacheSize-1, capLeft)
|
||||
assert.Equal(t, 1, bq.length())
|
||||
assert.Equal(t, uint32(8), chain.BlockHeight())
|
||||
bq.Discard()
|
||||
|
|
|
@ -216,9 +216,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
|
|||
}
|
||||
s.bQueue = bqueue.New(chain, log, func(b *block.Block) {
|
||||
s.tryStartServices()
|
||||
}, updateBlockQueueLenMetric)
|
||||
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric)
|
||||
|
||||
s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric)
|
||||
s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric)
|
||||
|
||||
if s.MinPeers < 0 {
|
||||
s.log.Info("bad MinPeers configured, using the default value",
|
||||
|
@ -1322,7 +1322,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato
|
|||
if !lastRequestedHeight.CompareAndSwap(old, needHeight) {
|
||||
continue
|
||||
}
|
||||
} else if old < currHeight+(bqueue.CacheSize-payload.MaxHashesCount) {
|
||||
} else if old < currHeight+(bqueue.DefaultCacheSize-payload.MaxHashesCount) {
|
||||
needHeight = currHeight + 1
|
||||
if peerHeight > old+payload.MaxHashesCount {
|
||||
needHeight = old + payload.MaxHashesCount
|
||||
|
@ -1331,7 +1331,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato
|
|||
}
|
||||
}
|
||||
} else {
|
||||
index := mrand.IntN(bqueue.CacheSize / payload.MaxHashesCount)
|
||||
index := mrand.IntN(bqueue.DefaultCacheSize / payload.MaxHashesCount)
|
||||
needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount)
|
||||
}
|
||||
break
|
||||
|
|
Loading…
Reference in a new issue