Merge pull request #2238 from nspcc-dev/fix-block-queue

network: remove priority queue from block queue
This commit is contained in:
Roman Khimov 2021-11-01 12:03:45 +03:00 committed by GitHub
commit 328f8b7954
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 70 additions and 63 deletions

1
go.mod
View file

@ -1,7 +1,6 @@
module github.com/nspcc-dev/neo-go
require (
github.com/Workiva/go-datastructures v1.0.53
github.com/abiosoft/ishell/v2 v2.0.2
github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db
github.com/btcsuite/btcd v0.22.0-beta

5
go.sum
View file

@ -8,8 +8,6 @@ github.com/CityOfZion/neo-go v0.71.1-pre.0.20200129171427-f773ec69fb84/go.mod h1
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/Workiva/go-datastructures v1.0.53 h1:J6Y/52yX10Xc5JjXmGtWoSSxs3mZnGSaq37xZZh7Yig=
github.com/Workiva/go-datastructures v1.0.53/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A=
github.com/abiosoft/ishell v2.0.0+incompatible h1:zpwIuEHc37EzrsIYah3cpevrIc8Oma7oZPxr03tlmmw=
github.com/abiosoft/ishell v2.0.0+incompatible/go.mod h1:HQR9AqF2R3P4XXpMpI0NAzgHf/aS6+zVXRj14cVk9qg=
github.com/abiosoft/ishell/v2 v2.0.2 h1:5qVfGiQISaYM8TkbBl7RFO6MddABoXpATrsFbVI+SNo=
@ -215,7 +213,6 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
@ -276,8 +273,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/syndtr/goleveldb v0.0.0-20180307113352-169b1b37be73/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 h1:xQdMZ1WLrgkkvOZ/LDQxjVxMLdby7osSh4ZEVa5sIjs=
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/twmb/murmur3 v1.1.5 h1:i9OLS9fkuLzBXjt6dptlAEyk58fJsSTXbRg3SgVyqgk=
github.com/twmb/murmur3 v1.1.5/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=

View file

@ -484,7 +484,7 @@ func (s *FakeStateSync) AddMPTNodes(nodes [][]byte) error {
// BlockHeight implements StateSync interface.
func (s *FakeStateSync) BlockHeight() uint32 {
panic("TODO")
return 0
}
// IsActive implements StateSync interface.

View file

@ -5,7 +5,6 @@ import (
"errors"
"math"
"github.com/Workiva/go-datastructures/queue"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/io"
@ -155,19 +154,6 @@ func (b *Block) EncodeBinary(bw *io.BinWriter) {
}
}
// Compare implements the queue Item interface.
func (b *Block) Compare(item queue.Item) int {
other := item.(*Block)
switch {
case b.Index > other.Index:
return 1
case b.Index == other.Index:
return 0
default:
return -1
}
}
// MarshalJSON implements json.Marshaler interface.
func (b Block) MarshalJSON() ([]byte, error) {
auxb, err := json.Marshal(auxBlockOut{

View file

@ -204,15 +204,6 @@ func TestBlockSizeCalculation(t *testing.T) {
assert.Equal(t, rawBlock, base64.StdEncoding.EncodeToString(benc))
}
func TestBlockCompare(t *testing.T) {
b1 := Block{Header: Header{Index: 1}}
b2 := Block{Header: Header{Index: 2}}
b3 := Block{Header: Header{Index: 3}}
assert.Equal(t, 1, b2.Compare(&b1))
assert.Equal(t, 0, b2.Compare(&b2))
assert.Equal(t, -1, b2.Compare(&b3))
}
func TestBlockEncodeDecode(t *testing.T) {
t.Run("positive", func(t *testing.T) {
b := newDumbBlock()

View file

@ -1,7 +1,8 @@
package network
import (
"github.com/Workiva/go-datastructures/queue"
"sync"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"go.uber.org/atomic"
@ -10,11 +11,13 @@ import (
type blockQueue struct {
log *zap.Logger
queue *queue.PriorityQueue
queueLock sync.Mutex
queue []*block.Block
checkBlocks chan struct{}
chain blockchainer.Blockqueuer
relayF func(*block.Block)
discarded *atomic.Bool
len int
}
const (
@ -30,7 +33,7 @@ func newBlockQueue(capacity int, bc blockchainer.Blockqueuer, log *zap.Logger, r
return &blockQueue{
log: log,
queue: queue.NewPriorityQueue(capacity, false),
queue: make([]*block.Block, blockCacheSize),
checkBlocks: make(chan struct{}, 1),
chain: bc,
relayF: relayer,
@ -39,67 +42,93 @@ func newBlockQueue(capacity int, bc blockchainer.Blockqueuer, log *zap.Logger, r
}
func (bq *blockQueue) run() {
var lastHeight = bq.chain.BlockHeight()
for {
_, ok := <-bq.checkBlocks
if !ok {
break
}
for {
item := bq.queue.Peek()
if item == nil {
break
}
minblock := item.(*block.Block)
if minblock.Index <= bq.chain.BlockHeight()+1 {
_, _ = bq.queue.Get(1)
updateBlockQueueLenMetric(bq.length())
if minblock.Index == bq.chain.BlockHeight()+1 {
err := bq.chain.AddBlock(minblock)
if err != nil {
// The block might already be added by consensus.
if bq.chain.BlockHeight() < minblock.Index {
bq.log.Warn("blockQueue: failed adding block into the blockchain",
zap.String("error", err.Error()),
zap.Uint32("blockHeight", bq.chain.BlockHeight()),
zap.Uint32("nextIndex", minblock.Index))
}
} else if bq.relayF != nil {
bq.relayF(minblock)
}
h := bq.chain.BlockHeight()
pos := int(h+1) % blockCacheSize
bq.queueLock.Lock()
b := bq.queue[pos]
// The chain moved forward using blocks from other sources (consensus).
for i := lastHeight; i < h; i++ {
old := int(i+1) % blockCacheSize
if bq.queue[old] != nil && bq.queue[old].Index == i {
bq.len--
bq.queue[old] = nil
}
} else {
}
bq.queueLock.Unlock()
lastHeight = h
if b == nil {
break
}
err := bq.chain.AddBlock(b)
if err != nil {
// The block might already be added by consensus.
if bq.chain.BlockHeight() < b.Index {
bq.log.Warn("blockQueue: failed adding block into the blockchain",
zap.String("error", err.Error()),
zap.Uint32("blockHeight", bq.chain.BlockHeight()),
zap.Uint32("nextIndex", b.Index))
}
} else if bq.relayF != nil {
bq.relayF(b)
}
bq.queueLock.Lock()
bq.len--
l := bq.len
if bq.queue[pos] == b {
bq.queue[pos] = nil
}
bq.queueLock.Unlock()
updateBlockQueueLenMetric(l)
}
}
}
func (bq *blockQueue) putBlock(block *block.Block) error {
h := bq.chain.BlockHeight()
bq.queueLock.Lock()
if block.Index <= h || h+blockCacheSize < block.Index {
// can easily happen when fetching the same blocks from
// different peers, thus not considered as error
bq.queueLock.Unlock()
return nil
}
err := bq.queue.Put(block)
pos := block.Index % blockCacheSize
// If we already have it, keep the old block, throw away new one.
if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {
bq.len++
bq.queue[pos] = block
}
l := bq.len
bq.queueLock.Unlock()
// update metrics
updateBlockQueueLenMetric(bq.length())
updateBlockQueueLenMetric(l)
select {
case bq.checkBlocks <- struct{}{}:
// ok, signalled to goroutine processing queue
default:
// it's already busy processing blocks
}
return err
return nil
}
func (bq *blockQueue) discard() {
if bq.discarded.CAS(false, true) {
close(bq.checkBlocks)
bq.queue.Dispose()
bq.queueLock.Lock()
// Technically we could bq.queue = nil, but this would cost
// another if in run().
for i := 0; i < len(bq.queue); i++ {
bq.queue[i] = nil
}
bq.len = 0
bq.queueLock.Unlock()
}
}
func (bq *blockQueue) length() int {
return bq.queue.Len()
}

View file

@ -74,3 +74,10 @@ func TestBlockQueue(t *testing.T) {
bq.discard()
assert.Equal(t, 0, bq.length())
}
// length wraps len access for tests to make them thread-safe.
func (bq *blockQueue) length() int {
bq.queueLock.Lock()
defer bq.queueLock.Unlock()
return bq.len
}