diff --git a/pkg/network/bqueue/queue.go b/pkg/network/bqueue/queue.go index 982071613..2899b1ac5 100644 --- a/pkg/network/bqueue/queue.go +++ b/pkg/network/bqueue/queue.go @@ -3,6 +3,7 @@ package bqueue import ( "sync" "sync/atomic" + "time" "github.com/nspcc-dev/neo-go/pkg/core/block" "go.uber.org/zap" @@ -15,6 +16,17 @@ type Blockqueuer interface { BlockHeight() uint32 } +// OperationMode is the mode of operation for the block queue. +// Could be either Blocking or NonBlocking. +type OperationMode byte + +const ( + // NonBlocking means that PutBlock will return immediately if the queue is full. + NonBlocking OperationMode = 0 + // Blocking means that PutBlock will wait until there is enough space in the queue. + Blocking OperationMode = 1 +) + // Queue is the block queue. type Queue struct { log *zap.Logger @@ -28,6 +40,7 @@ type Queue struct { len int lenUpdateF func(int) cacheSize int + mode OperationMode } // DefaultCacheSize is the default amount of blocks above the current height @@ -39,7 +52,7 @@ func (bq *Queue) indexToPosition(i uint32) int { } // New creates an instance of BlockQueue. -func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int)) *Queue { +func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue { if log == nil { return nil } @@ -55,6 +68,7 @@ func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize relayF: relayer, lenUpdateF: lenMetricsUpdater, cacheSize: cacheSize, + mode: mode, } } @@ -119,11 +133,32 @@ func (bq *Queue) PutBlock(block *block.Block) error { if bq.discarded.Load() { return nil } - if block.Index <= h || h+uint32(bq.cacheSize) < block.Index { - // can easily happen when fetching the same blocks from - // different peers, thus not considered as error + // Can easily happen when fetching the same blocks from + // different peers, thus not considered as error. + if block.Index <= h { return nil } + if h+uint32(bq.cacheSize) < block.Index { + switch bq.mode { + case NonBlocking: + return nil + case Blocking: + bq.queueLock.Unlock() + t := time.NewTicker(time.Second) + defer t.Stop() + for range t.C { + if bq.discarded.Load() { + bq.queueLock.Lock() + return nil + } + h = bq.chain.BlockHeight() + if h+uint32(bq.cacheSize) >= block.Index { + bq.queueLock.Lock() + break + } + } + } + } pos := bq.indexToPosition(block.Index) // If we already have it, keep the old block, throw away the new one. if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { diff --git a/pkg/network/bqueue/queue_test.go b/pkg/network/bqueue/queue_test.go index 78c8402ff..e481fba56 100644 --- a/pkg/network/bqueue/queue_test.go +++ b/pkg/network/bqueue/queue_test.go @@ -13,7 +13,7 @@ import ( func TestBlockQueue(t *testing.T) { chain := fakechain.NewFakeChain() // notice, it's not yet running - bq := New(chain, zaptest.NewLogger(t), nil, 0, nil) + bq := New(chain, zaptest.NewLogger(t), nil, 0, nil, NonBlocking) blocks := make([]*block.Block, 11) for i := 1; i < 11; i++ { blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}} diff --git a/pkg/network/server.go b/pkg/network/server.go index ef0e33ac7..3bbdceac6 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -216,9 +216,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy } s.bQueue = bqueue.New(chain, log, func(b *block.Block) { s.tryStartServices() - }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric) + }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) - s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric) + s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value",