2023-03-07 08:36:42 +00:00
|
|
|
package bqueue
|
2019-09-25 16:54:31 +00:00
|
|
|
|
|
|
|
import (
|
2021-10-31 09:23:07 +00:00
|
|
|
"sync"
|
|
|
|
|
2020-03-03 14:21:42 +00:00
|
|
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
2021-07-30 13:57:42 +00:00
|
|
|
"go.uber.org/atomic"
|
2019-12-30 07:43:05 +00:00
|
|
|
"go.uber.org/zap"
|
2019-09-25 16:54:31 +00:00
|
|
|
)
|
|
|
|
|
2022-04-20 18:30:09 +00:00
|
|
|
// Blockqueuer is an interface for a block queue.
|
2022-01-14 01:09:54 +00:00
|
|
|
type Blockqueuer interface {
|
|
|
|
AddBlock(block *block.Block) error
|
|
|
|
AddHeaders(...*block.Header) error
|
|
|
|
BlockHeight() uint32
|
|
|
|
}
|
|
|
|
|
2023-03-07 08:36:42 +00:00
|
|
|
// Queue is the block queue.
|
|
|
|
type Queue struct {
|
2019-12-30 07:43:05 +00:00
|
|
|
log *zap.Logger
|
2022-01-17 21:04:41 +00:00
|
|
|
queueLock sync.RWMutex
|
2021-10-31 09:23:07 +00:00
|
|
|
queue []*block.Block
|
2022-01-17 21:04:41 +00:00
|
|
|
lastQ uint32
|
2019-09-25 16:54:31 +00:00
|
|
|
checkBlocks chan struct{}
|
2022-01-14 01:09:54 +00:00
|
|
|
chain Blockqueuer
|
2020-02-04 16:32:29 +00:00
|
|
|
relayF func(*block.Block)
|
2021-07-30 13:57:42 +00:00
|
|
|
discarded *atomic.Bool
|
2021-10-31 09:23:07 +00:00
|
|
|
len int
|
2023-03-07 08:36:42 +00:00
|
|
|
lenUpdateF func(int)
|
2019-09-25 16:54:31 +00:00
|
|
|
}
|
|
|
|
|
2023-03-07 08:36:42 +00:00
|
|
|
// CacheSize is the amount of blocks above the current height
|
|
|
|
// which are stored in the queue.
|
|
|
|
const CacheSize = 2000
|
2020-09-02 12:00:53 +00:00
|
|
|
|
2022-01-17 21:01:26 +00:00
|
|
|
func indexToPosition(i uint32) int {
|
2023-03-07 08:36:42 +00:00
|
|
|
return int(i) % CacheSize
|
2022-01-17 21:01:26 +00:00
|
|
|
}
|
|
|
|
|
2023-03-07 08:36:42 +00:00
|
|
|
// New creates an instance of BlockQueue.
|
|
|
|
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), lenMetricsUpdater func(l int)) *Queue {
|
2019-12-30 07:43:05 +00:00
|
|
|
if log == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-03-07 08:36:42 +00:00
|
|
|
return &Queue{
|
2019-12-30 07:43:05 +00:00
|
|
|
log: log,
|
2023-03-07 08:36:42 +00:00
|
|
|
queue: make([]*block.Block, CacheSize),
|
2019-09-25 16:54:31 +00:00
|
|
|
checkBlocks: make(chan struct{}, 1),
|
|
|
|
chain: bc,
|
2020-02-04 16:32:29 +00:00
|
|
|
relayF: relayer,
|
2021-07-30 13:57:42 +00:00
|
|
|
discarded: atomic.NewBool(false),
|
2023-03-07 08:36:42 +00:00
|
|
|
lenUpdateF: lenMetricsUpdater,
|
2019-09-25 16:54:31 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-03-07 08:36:42 +00:00
|
|
|
// Run runs the BlockQueue queueing loop. It must be called in a separate routine.
|
|
|
|
func (bq *Queue) Run() {
|
2021-10-31 09:23:07 +00:00
|
|
|
var lastHeight = bq.chain.BlockHeight()
|
2019-09-25 16:54:31 +00:00
|
|
|
for {
|
|
|
|
_, ok := <-bq.checkBlocks
|
|
|
|
if !ok {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
for {
|
2021-10-31 09:23:07 +00:00
|
|
|
h := bq.chain.BlockHeight()
|
2022-01-17 21:01:26 +00:00
|
|
|
pos := indexToPosition(h + 1)
|
2021-10-31 09:23:07 +00:00
|
|
|
bq.queueLock.Lock()
|
|
|
|
b := bq.queue[pos]
|
|
|
|
// The chain moved forward using blocks from other sources (consensus).
|
|
|
|
for i := lastHeight; i < h; i++ {
|
2022-01-17 21:01:26 +00:00
|
|
|
old := indexToPosition(i + 1)
|
2021-10-31 09:23:07 +00:00
|
|
|
if bq.queue[old] != nil && bq.queue[old].Index == i {
|
|
|
|
bq.len--
|
|
|
|
bq.queue[old] = nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
bq.queueLock.Unlock()
|
|
|
|
lastHeight = h
|
|
|
|
if b == nil {
|
2019-09-25 16:54:31 +00:00
|
|
|
break
|
|
|
|
}
|
2021-10-31 09:23:07 +00:00
|
|
|
|
|
|
|
err := bq.chain.AddBlock(b)
|
|
|
|
if err != nil {
|
2022-04-20 18:30:09 +00:00
|
|
|
// The block might already be added by the consensus.
|
2021-10-31 09:23:07 +00:00
|
|
|
if bq.chain.BlockHeight() < b.Index {
|
|
|
|
bq.log.Warn("blockQueue: failed adding block into the blockchain",
|
|
|
|
zap.String("error", err.Error()),
|
|
|
|
zap.Uint32("blockHeight", bq.chain.BlockHeight()),
|
|
|
|
zap.Uint32("nextIndex", b.Index))
|
2019-09-25 16:54:31 +00:00
|
|
|
}
|
2021-10-31 09:23:07 +00:00
|
|
|
} else if bq.relayF != nil {
|
|
|
|
bq.relayF(b)
|
2019-09-25 16:54:31 +00:00
|
|
|
}
|
2021-10-31 09:23:07 +00:00
|
|
|
bq.queueLock.Lock()
|
|
|
|
bq.len--
|
|
|
|
l := bq.len
|
|
|
|
if bq.queue[pos] == b {
|
|
|
|
bq.queue[pos] = nil
|
|
|
|
}
|
|
|
|
bq.queueLock.Unlock()
|
2023-03-07 08:36:42 +00:00
|
|
|
if bq.lenUpdateF != nil {
|
|
|
|
bq.lenUpdateF(l)
|
|
|
|
}
|
2019-09-25 16:54:31 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-03-07 08:36:42 +00:00
|
|
|
// PutBlock enqueues block to be added to the chain.
|
|
|
|
func (bq *Queue) PutBlock(block *block.Block) error {
|
2020-09-02 12:00:53 +00:00
|
|
|
h := bq.chain.BlockHeight()
|
2021-10-31 09:23:07 +00:00
|
|
|
bq.queueLock.Lock()
|
2022-04-22 11:45:37 +00:00
|
|
|
defer bq.queueLock.Unlock()
|
|
|
|
if bq.discarded.Load() {
|
|
|
|
return nil
|
|
|
|
}
|
2023-03-07 08:36:42 +00:00
|
|
|
if block.Index <= h || h+CacheSize < block.Index {
|
2019-09-25 16:54:31 +00:00
|
|
|
// can easily happen when fetching the same blocks from
|
|
|
|
// different peers, thus not considered as error
|
|
|
|
return nil
|
|
|
|
}
|
2022-01-17 21:01:26 +00:00
|
|
|
pos := indexToPosition(block.Index)
|
2022-04-20 18:30:09 +00:00
|
|
|
// If we already have it, keep the old block, throw away the new one.
|
2021-10-31 09:23:07 +00:00
|
|
|
if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {
|
|
|
|
bq.len++
|
|
|
|
bq.queue[pos] = block
|
2023-03-07 08:36:42 +00:00
|
|
|
for pos < CacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
|
2022-01-17 21:04:41 +00:00
|
|
|
bq.lastQ = bq.queue[pos].Index
|
|
|
|
pos++
|
|
|
|
}
|
2021-10-31 09:23:07 +00:00
|
|
|
}
|
2019-10-29 17:51:17 +00:00
|
|
|
// update metrics
|
2023-03-07 08:36:42 +00:00
|
|
|
if bq.lenUpdateF != nil {
|
|
|
|
bq.lenUpdateF(bq.len)
|
|
|
|
}
|
2019-09-25 16:54:31 +00:00
|
|
|
select {
|
|
|
|
case bq.checkBlocks <- struct{}{}:
|
|
|
|
// ok, signalled to goroutine processing queue
|
|
|
|
default:
|
|
|
|
// it's already busy processing blocks
|
|
|
|
}
|
2021-10-31 09:23:07 +00:00
|
|
|
return nil
|
2019-09-25 16:54:31 +00:00
|
|
|
}
|
|
|
|
|
2023-03-07 08:36:42 +00:00
|
|
|
// LastQueued returns the index of the last queued block and the queue's capacity
|
network: do not allow to request invalid block count
The problem is in peer disconnection due to invalid GetBlockByIndex
payload (the logs are from some patched neo-go version):
```
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.490Z INFO new peer connected {"addr": "10.78.69.115:50846", "peerCount": 3}
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.490Z WARN peer disconnected {"addr": "10.78.69.115:50846", "error": "invalid block count", "peerCount": 2}
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.490Z INFO started protocol {"addr": "10.78.69.115:50846", "userAgent": "/NEO-GO:1.0.0/", "startHeight": 0, "id": 1339571820}
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.491Z INFO new peer connected {"addr": "10.78.69.115:50856", "peerCount": 3}
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.492Z WARN peer disconnected {"addr": "10.78.69.115:50856", "error": "invalid block count", "peerCount": 2}
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.492Z INFO started protocol {"addr": "10.78.69.115:50856", "userAgent": "/NEO-GO:1.0.0/", "startHeight": 0, "id": 1339571820}
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.492Z INFO new peer connected {"addr": "10.78.69.115:50858", "peerCount": 3}
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.493Z INFO started protocol {"addr": "10.78.69.115:50858", "userAgent": "/NEO-GO:1.0.0/", "startHeight": 0, "id": 1339571820}
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.493Z WARN peer disconnected {"addr": "10.78.69.115:50858", "error": "invalid block count", "peerCount": 2}
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.494Z INFO new peer connected {"addr": "10.78.69.115:50874", "peerCount": 3}
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.494Z INFO started protocol {"addr": "10.78.69.115:50874", "userAgent": "/NEO-GO:1.0.0/", "startHeight": 0, "id": 1339571820}
дек 15 16:02:39 glagoli neo-go[928530]: 2022-12-15T16:02:39.494Z WARN peer disconnected {"addr": "10.78.69.115:50874", "error": "invalid block count", "peerCount": 2}
```
GetBlockByIndex payload can't be decoded, and the only possible cause
is zero (or <-1, but it's probably not the case) block count requested.
Error is improved as far.
2022-12-15 17:10:09 +00:00
|
|
|
// left.
|
2023-03-07 08:36:42 +00:00
|
|
|
func (bq *Queue) LastQueued() (uint32, int) {
|
2022-01-17 21:04:41 +00:00
|
|
|
bq.queueLock.RLock()
|
|
|
|
defer bq.queueLock.RUnlock()
|
2023-03-07 08:36:42 +00:00
|
|
|
return bq.lastQ, CacheSize - bq.len
|
2022-01-17 21:04:41 +00:00
|
|
|
}
|
|
|
|
|
2023-03-07 08:36:42 +00:00
|
|
|
// Discard stops the queue and prevents it from accepting more blocks to enqueue.
|
|
|
|
func (bq *Queue) Discard() {
|
2021-07-30 13:57:42 +00:00
|
|
|
if bq.discarded.CAS(false, true) {
|
2021-10-31 09:23:07 +00:00
|
|
|
bq.queueLock.Lock()
|
2022-04-22 11:45:37 +00:00
|
|
|
close(bq.checkBlocks)
|
2021-10-31 09:23:07 +00:00
|
|
|
// Technically we could bq.queue = nil, but this would cost
|
2023-03-07 08:36:42 +00:00
|
|
|
// another if in Run().
|
2021-10-31 09:23:07 +00:00
|
|
|
for i := 0; i < len(bq.queue); i++ {
|
|
|
|
bq.queue[i] = nil
|
|
|
|
}
|
|
|
|
bq.len = 0
|
|
|
|
bq.queueLock.Unlock()
|
2021-07-30 13:57:42 +00:00
|
|
|
}
|
2019-09-25 16:54:31 +00:00
|
|
|
}
|