From 8bb1ecb45a5b52aa24a2ca2e7e3708e24419cf46 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sun, 31 Oct 2021 12:23:07 +0300 Subject: [PATCH] network: remove priority queue from block queue Use circular buffer which is a bit more appropriate. The problem is that priority queue accepts and stores equal items which wastes memory even in normal usage scenario, but it's especially dangerous if the node is stuck for some reason. In this case it'll accept from peers and put into queue the same blocks again and again leaking memory up to OOM condition. Notice that queue length calculation might be wrong in case circular buffer wraps, but it's not very likely to happen (usually blocks not coming from the queue are added by consensus and it's not very fast in doing so). --- go.mod | 1 - go.sum | 5 -- internal/fakechain/fakechain.go | 2 +- pkg/core/block/block.go | 14 ----- pkg/core/block/block_test.go | 9 ---- pkg/network/blockqueue.go | 95 +++++++++++++++++++++------------ pkg/network/blockqueue_test.go | 7 +++ 7 files changed, 70 insertions(+), 63 deletions(-) diff --git a/go.mod b/go.mod index 26b6d1790..5f33c028a 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,6 @@ module github.com/nspcc-dev/neo-go require ( - github.com/Workiva/go-datastructures v1.0.53 github.com/abiosoft/ishell/v2 v2.0.2 github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db github.com/alicebob/miniredis/v2 v2.15.1 diff --git a/go.sum b/go.sum index 15deb0983..09bdec7a4 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,6 @@ github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= -github.com/Workiva/go-datastructures v1.0.53 h1:J6Y/52yX10Xc5JjXmGtWoSSxs3mZnGSaq37xZZh7Yig= -github.com/Workiva/go-datastructures v1.0.53/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= github.com/abiosoft/ishell v2.0.0+incompatible h1:zpwIuEHc37EzrsIYah3cpevrIc8Oma7oZPxr03tlmmw= github.com/abiosoft/ishell v2.0.0+incompatible/go.mod h1:HQR9AqF2R3P4XXpMpI0NAzgHf/aS6+zVXRj14cVk9qg= github.com/abiosoft/ishell/v2 v2.0.2 h1:5qVfGiQISaYM8TkbBl7RFO6MddABoXpATrsFbVI+SNo= @@ -228,7 +226,6 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= -github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -289,8 +286,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/syndtr/goleveldb v0.0.0-20180307113352-169b1b37be73/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 h1:xQdMZ1WLrgkkvOZ/LDQxjVxMLdby7osSh4ZEVa5sIjs= github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM= -github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= -github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/twmb/murmur3 v1.1.5 h1:i9OLS9fkuLzBXjt6dptlAEyk58fJsSTXbRg3SgVyqgk= github.com/twmb/murmur3 v1.1.5/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index 748315754..10e762dcc 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -484,7 +484,7 @@ func (s *FakeStateSync) AddMPTNodes(nodes [][]byte) error { // BlockHeight implements StateSync interface. func (s *FakeStateSync) BlockHeight() uint32 { - panic("TODO") + return 0 } // IsActive implements StateSync interface. diff --git a/pkg/core/block/block.go b/pkg/core/block/block.go index 240fe816b..a7bf0770c 100644 --- a/pkg/core/block/block.go +++ b/pkg/core/block/block.go @@ -5,7 +5,6 @@ import ( "errors" "math" - "github.com/Workiva/go-datastructures/queue" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/io" @@ -155,19 +154,6 @@ func (b *Block) EncodeBinary(bw *io.BinWriter) { } } -// Compare implements the queue Item interface. -func (b *Block) Compare(item queue.Item) int { - other := item.(*Block) - switch { - case b.Index > other.Index: - return 1 - case b.Index == other.Index: - return 0 - default: - return -1 - } -} - // MarshalJSON implements json.Marshaler interface. func (b Block) MarshalJSON() ([]byte, error) { auxb, err := json.Marshal(auxBlockOut{ diff --git a/pkg/core/block/block_test.go b/pkg/core/block/block_test.go index d604aa4d7..a5b178db9 100644 --- a/pkg/core/block/block_test.go +++ b/pkg/core/block/block_test.go @@ -204,15 +204,6 @@ func TestBlockSizeCalculation(t *testing.T) { assert.Equal(t, rawBlock, base64.StdEncoding.EncodeToString(benc)) } -func TestBlockCompare(t *testing.T) { - b1 := Block{Header: Header{Index: 1}} - b2 := Block{Header: Header{Index: 2}} - b3 := Block{Header: Header{Index: 3}} - assert.Equal(t, 1, b2.Compare(&b1)) - assert.Equal(t, 0, b2.Compare(&b2)) - assert.Equal(t, -1, b2.Compare(&b3)) -} - func TestBlockEncodeDecode(t *testing.T) { t.Run("positive", func(t *testing.T) { b := newDumbBlock() diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index 7cc796886..3b92e62f6 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -1,7 +1,8 @@ package network import ( - "github.com/Workiva/go-datastructures/queue" + "sync" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "go.uber.org/atomic" @@ -10,11 +11,13 @@ import ( type blockQueue struct { log *zap.Logger - queue *queue.PriorityQueue + queueLock sync.Mutex + queue []*block.Block checkBlocks chan struct{} chain blockchainer.Blockqueuer relayF func(*block.Block) discarded *atomic.Bool + len int } const ( @@ -30,7 +33,7 @@ func newBlockQueue(capacity int, bc blockchainer.Blockqueuer, log *zap.Logger, r return &blockQueue{ log: log, - queue: queue.NewPriorityQueue(capacity, false), + queue: make([]*block.Block, blockCacheSize), checkBlocks: make(chan struct{}, 1), chain: bc, relayF: relayer, @@ -39,67 +42,93 @@ func newBlockQueue(capacity int, bc blockchainer.Blockqueuer, log *zap.Logger, r } func (bq *blockQueue) run() { + var lastHeight = bq.chain.BlockHeight() for { _, ok := <-bq.checkBlocks if !ok { break } for { - item := bq.queue.Peek() - if item == nil { - break - } - minblock := item.(*block.Block) - if minblock.Index <= bq.chain.BlockHeight()+1 { - _, _ = bq.queue.Get(1) - updateBlockQueueLenMetric(bq.length()) - if minblock.Index == bq.chain.BlockHeight()+1 { - err := bq.chain.AddBlock(minblock) - if err != nil { - // The block might already be added by consensus. - if bq.chain.BlockHeight() < minblock.Index { - bq.log.Warn("blockQueue: failed adding block into the blockchain", - zap.String("error", err.Error()), - zap.Uint32("blockHeight", bq.chain.BlockHeight()), - zap.Uint32("nextIndex", minblock.Index)) - } - } else if bq.relayF != nil { - bq.relayF(minblock) - } + h := bq.chain.BlockHeight() + pos := int(h+1) % blockCacheSize + bq.queueLock.Lock() + b := bq.queue[pos] + // The chain moved forward using blocks from other sources (consensus). + for i := lastHeight; i < h; i++ { + old := int(i+1) % blockCacheSize + if bq.queue[old] != nil && bq.queue[old].Index == i { + bq.len-- + bq.queue[old] = nil } - } else { + } + bq.queueLock.Unlock() + lastHeight = h + if b == nil { break } + + err := bq.chain.AddBlock(b) + if err != nil { + // The block might already be added by consensus. + if bq.chain.BlockHeight() < b.Index { + bq.log.Warn("blockQueue: failed adding block into the blockchain", + zap.String("error", err.Error()), + zap.Uint32("blockHeight", bq.chain.BlockHeight()), + zap.Uint32("nextIndex", b.Index)) + } + } else if bq.relayF != nil { + bq.relayF(b) + } + bq.queueLock.Lock() + bq.len-- + l := bq.len + if bq.queue[pos] == b { + bq.queue[pos] = nil + } + bq.queueLock.Unlock() + updateBlockQueueLenMetric(l) } } } func (bq *blockQueue) putBlock(block *block.Block) error { h := bq.chain.BlockHeight() + bq.queueLock.Lock() if block.Index <= h || h+blockCacheSize < block.Index { // can easily happen when fetching the same blocks from // different peers, thus not considered as error + bq.queueLock.Unlock() return nil } - err := bq.queue.Put(block) + pos := block.Index % blockCacheSize + // If we already have it, keep the old block, throw away new one. + if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { + bq.len++ + bq.queue[pos] = block + } + l := bq.len + bq.queueLock.Unlock() // update metrics - updateBlockQueueLenMetric(bq.length()) + updateBlockQueueLenMetric(l) select { case bq.checkBlocks <- struct{}{}: // ok, signalled to goroutine processing queue default: // it's already busy processing blocks } - return err + return nil } func (bq *blockQueue) discard() { if bq.discarded.CAS(false, true) { close(bq.checkBlocks) - bq.queue.Dispose() + bq.queueLock.Lock() + // Technically we could bq.queue = nil, but this would cost + // another if in run(). + for i := 0; i < len(bq.queue); i++ { + bq.queue[i] = nil + } + bq.len = 0 + bq.queueLock.Unlock() } } - -func (bq *blockQueue) length() int { - return bq.queue.Len() -} diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go index 1fb442434..cffdb1873 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/blockqueue_test.go @@ -74,3 +74,10 @@ func TestBlockQueue(t *testing.T) { bq.discard() assert.Equal(t, 0, bq.length()) } + +// length wraps len access for tests to make them thread-safe. +func (bq *blockQueue) length() int { + bq.queueLock.Lock() + defer bq.queueLock.Unlock() + return bq.len +}