package bqueue import ( "sync" "sync/atomic" "time" "github.com/nspcc-dev/neo-go/pkg/core/block" "go.uber.org/zap" ) // Blockqueuer is an interface for a block queue. type Blockqueuer interface { AddBlock(block *block.Block) error AddHeaders(...*block.Header) error BlockHeight() uint32 } // OperationMode is the mode of operation for the block queue. // Could be either Blocking or NonBlocking. type OperationMode byte const ( // NonBlocking means that PutBlock will return immediately if the queue is full. NonBlocking OperationMode = 0 // Blocking means that PutBlock will wait until there is enough space in the queue. Blocking OperationMode = 1 ) // Queue is the block queue. type Queue struct { log *zap.Logger queueLock sync.RWMutex queue []*block.Block lastQ uint32 checkBlocks chan struct{} chain Blockqueuer relayF func(*block.Block) discarded atomic.Bool len int lenUpdateF func(int) cacheSize int mode OperationMode } // DefaultCacheSize is the default amount of blocks above the current height // which are stored in the queue. const DefaultCacheSize = 2000 func (bq *Queue) indexToPosition(i uint32) int { return int(i) % bq.cacheSize } // New creates an instance of BlockQueue. func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue { if log == nil { return nil } if cacheSize <= 0 { cacheSize = DefaultCacheSize } return &Queue{ log: log, queue: make([]*block.Block, cacheSize), checkBlocks: make(chan struct{}, 1), chain: bc, relayF: relayer, lenUpdateF: lenMetricsUpdater, cacheSize: cacheSize, mode: mode, } } // Run runs the BlockQueue queueing loop. It must be called in a separate routine. func (bq *Queue) Run() { var lastHeight = bq.chain.BlockHeight() for { _, ok := <-bq.checkBlocks if !ok { break } for { h := bq.chain.BlockHeight() pos := bq.indexToPosition(h + 1) bq.queueLock.Lock() b := bq.queue[pos] // The chain moved forward using blocks from other sources (consensus). for i := lastHeight; i < h; i++ { old := bq.indexToPosition(i + 1) if bq.queue[old] != nil && bq.queue[old].Index == i { bq.len-- bq.queue[old] = nil } } bq.queueLock.Unlock() lastHeight = h if b == nil { break } err := bq.chain.AddBlock(b) if err != nil { // The block might already be added by the consensus. if bq.chain.BlockHeight() < b.Index { bq.log.Warn("blockQueue: failed adding block into the blockchain", zap.String("error", err.Error()), zap.Uint32("blockHeight", bq.chain.BlockHeight()), zap.Uint32("nextIndex", b.Index)) } } else if bq.relayF != nil { bq.relayF(b) } bq.queueLock.Lock() bq.len-- l := bq.len if bq.queue[pos] == b { bq.queue[pos] = nil } bq.queueLock.Unlock() if bq.lenUpdateF != nil { bq.lenUpdateF(l) } } } } // PutBlock enqueues block to be added to the chain. func (bq *Queue) PutBlock(block *block.Block) error { h := bq.chain.BlockHeight() bq.queueLock.Lock() defer bq.queueLock.Unlock() if bq.discarded.Load() { return nil } // Can easily happen when fetching the same blocks from // different peers, thus not considered as error. if block.Index <= h { return nil } if h+uint32(bq.cacheSize) < block.Index { switch bq.mode { case NonBlocking: return nil case Blocking: bq.queueLock.Unlock() t := time.NewTicker(time.Second) defer t.Stop() for range t.C { if bq.discarded.Load() { bq.queueLock.Lock() return nil } h = bq.chain.BlockHeight() if h+uint32(bq.cacheSize) >= block.Index { bq.queueLock.Lock() break } } } } pos := bq.indexToPosition(block.Index) // If we already have it, keep the old block, throw away the new one. if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { bq.len++ bq.queue[pos] = block for pos < bq.cacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { bq.lastQ = bq.queue[pos].Index pos++ } } // update metrics if bq.lenUpdateF != nil { bq.lenUpdateF(bq.len) } select { case bq.checkBlocks <- struct{}{}: // ok, signalled to goroutine processing queue default: // it's already busy processing blocks } return nil } // LastQueued returns the index of the last queued block and the queue's capacity // left. func (bq *Queue) LastQueued() (uint32, int) { bq.queueLock.RLock() defer bq.queueLock.RUnlock() return bq.lastQ, bq.cacheSize - bq.len } // Discard stops the queue and prevents it from accepting more blocks to enqueue. func (bq *Queue) Discard() { if bq.discarded.CompareAndSwap(false, true) { bq.queueLock.Lock() close(bq.checkBlocks) // Technically we could bq.queue = nil, but this would cost // another if in Run(). clear(bq.queue) bq.len = 0 bq.queueLock.Unlock() } }