queue: add Blocking OperationMode

If Blocking mode is on PutBlock will block until there is enough space
in the queue.

Co-authored-by: Anna Shaleva <shaleva.ann@nspcc.ru>
Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-09-05 08:13:39 +04:00
parent 6f2712ee55
commit 69b655ec7a
3 changed files with 42 additions and 7 deletions

View file

@ -3,6 +3,7 @@ package bqueue
import (
"sync"
"sync/atomic"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"go.uber.org/zap"
@ -15,6 +16,17 @@ type Blockqueuer interface {
BlockHeight() uint32
}
// OperationMode is the mode of operation for the block queue.
// Could be either Blocking or NonBlocking.
type OperationMode byte
const (
// NonBlocking means that PutBlock will return immediately if the queue is full.
NonBlocking OperationMode = 0
// Blocking means that PutBlock will wait until there is enough space in the queue.
Blocking OperationMode = 1
)
// Queue is the block queue.
type Queue struct {
log *zap.Logger
@ -28,6 +40,7 @@ type Queue struct {
len int
lenUpdateF func(int)
cacheSize int
mode OperationMode
}
// DefaultCacheSize is the default amount of blocks above the current height
@ -39,7 +52,7 @@ func (bq *Queue) indexToPosition(i uint32) int {
}
// New creates an instance of BlockQueue.
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int)) *Queue {
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue {
if log == nil {
return nil
}
@ -55,6 +68,7 @@ func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize
relayF: relayer,
lenUpdateF: lenMetricsUpdater,
cacheSize: cacheSize,
mode: mode,
}
}
@ -119,11 +133,32 @@ func (bq *Queue) PutBlock(block *block.Block) error {
if bq.discarded.Load() {
return nil
}
if block.Index <= h || h+uint32(bq.cacheSize) < block.Index {
// can easily happen when fetching the same blocks from
// different peers, thus not considered as error
// Can easily happen when fetching the same blocks from
// different peers, thus not considered as error.
if block.Index <= h {
return nil
}
if h+uint32(bq.cacheSize) < block.Index {
switch bq.mode {
case NonBlocking:
return nil
case Blocking:
bq.queueLock.Unlock()
t := time.NewTicker(time.Second)
defer t.Stop()
for range t.C {
if bq.discarded.Load() {
bq.queueLock.Lock()
return nil
}
h = bq.chain.BlockHeight()
if h+uint32(bq.cacheSize) >= block.Index {
bq.queueLock.Lock()
break
}
}
}
}
pos := bq.indexToPosition(block.Index)
// If we already have it, keep the old block, throw away the new one.
if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {

View file

@ -13,7 +13,7 @@ import (
func TestBlockQueue(t *testing.T) {
chain := fakechain.NewFakeChain()
// notice, it's not yet running
bq := New(chain, zaptest.NewLogger(t), nil, 0, nil)
bq := New(chain, zaptest.NewLogger(t), nil, 0, nil, NonBlocking)
blocks := make([]*block.Block, 11)
for i := 1; i < 11; i++ {
blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}}

View file

@ -216,9 +216,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}
s.bQueue = bqueue.New(chain, log, func(b *block.Block) {
s.tryStartServices()
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric)
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric)
s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",