diff --git a/go.mod b/go.mod index 2f5d4537b..7e94a0a32 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,6 @@ module github.com/nspcc-dev/neo-go require ( - github.com/Workiva/go-datastructures v1.0.53 github.com/abiosoft/ishell/v2 v2.0.2 github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db github.com/btcsuite/btcd v0.22.0-beta diff --git a/go.sum b/go.sum index 5ca0ba5c2..a38e4b761 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,6 @@ github.com/CityOfZion/neo-go v0.71.1-pre.0.20200129171427-f773ec69fb84/go.mod h1 github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= -github.com/Workiva/go-datastructures v1.0.53 h1:J6Y/52yX10Xc5JjXmGtWoSSxs3mZnGSaq37xZZh7Yig= -github.com/Workiva/go-datastructures v1.0.53/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= github.com/abiosoft/ishell v2.0.0+incompatible h1:zpwIuEHc37EzrsIYah3cpevrIc8Oma7oZPxr03tlmmw= github.com/abiosoft/ishell v2.0.0+incompatible/go.mod h1:HQR9AqF2R3P4XXpMpI0NAzgHf/aS6+zVXRj14cVk9qg= github.com/abiosoft/ishell/v2 v2.0.2 h1:5qVfGiQISaYM8TkbBl7RFO6MddABoXpATrsFbVI+SNo= @@ -215,7 +213,6 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= -github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -276,8 +273,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/syndtr/goleveldb v0.0.0-20180307113352-169b1b37be73/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 h1:xQdMZ1WLrgkkvOZ/LDQxjVxMLdby7osSh4ZEVa5sIjs= github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM= -github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= -github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/twmb/murmur3 v1.1.5 h1:i9OLS9fkuLzBXjt6dptlAEyk58fJsSTXbRg3SgVyqgk= github.com/twmb/murmur3 v1.1.5/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index 748315754..10e762dcc 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -484,7 +484,7 @@ func (s *FakeStateSync) AddMPTNodes(nodes [][]byte) error { // BlockHeight implements StateSync interface. func (s *FakeStateSync) BlockHeight() uint32 { - panic("TODO") + return 0 } // IsActive implements StateSync interface. diff --git a/pkg/core/block/block.go b/pkg/core/block/block.go index 240fe816b..a7bf0770c 100644 --- a/pkg/core/block/block.go +++ b/pkg/core/block/block.go @@ -5,7 +5,6 @@ import ( "errors" "math" - "github.com/Workiva/go-datastructures/queue" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/io" @@ -155,19 +154,6 @@ func (b *Block) EncodeBinary(bw *io.BinWriter) { } } -// Compare implements the queue Item interface. -func (b *Block) Compare(item queue.Item) int { - other := item.(*Block) - switch { - case b.Index > other.Index: - return 1 - case b.Index == other.Index: - return 0 - default: - return -1 - } -} - // MarshalJSON implements json.Marshaler interface. func (b Block) MarshalJSON() ([]byte, error) { auxb, err := json.Marshal(auxBlockOut{ diff --git a/pkg/core/block/block_test.go b/pkg/core/block/block_test.go index d604aa4d7..a5b178db9 100644 --- a/pkg/core/block/block_test.go +++ b/pkg/core/block/block_test.go @@ -204,15 +204,6 @@ func TestBlockSizeCalculation(t *testing.T) { assert.Equal(t, rawBlock, base64.StdEncoding.EncodeToString(benc)) } -func TestBlockCompare(t *testing.T) { - b1 := Block{Header: Header{Index: 1}} - b2 := Block{Header: Header{Index: 2}} - b3 := Block{Header: Header{Index: 3}} - assert.Equal(t, 1, b2.Compare(&b1)) - assert.Equal(t, 0, b2.Compare(&b2)) - assert.Equal(t, -1, b2.Compare(&b3)) -} - func TestBlockEncodeDecode(t *testing.T) { t.Run("positive", func(t *testing.T) { b := newDumbBlock() diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index 7cc796886..3b92e62f6 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -1,7 +1,8 @@ package network import ( - "github.com/Workiva/go-datastructures/queue" + "sync" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "go.uber.org/atomic" @@ -10,11 +11,13 @@ import ( type blockQueue struct { log *zap.Logger - queue *queue.PriorityQueue + queueLock sync.Mutex + queue []*block.Block checkBlocks chan struct{} chain blockchainer.Blockqueuer relayF func(*block.Block) discarded *atomic.Bool + len int } const ( @@ -30,7 +33,7 @@ func newBlockQueue(capacity int, bc blockchainer.Blockqueuer, log *zap.Logger, r return &blockQueue{ log: log, - queue: queue.NewPriorityQueue(capacity, false), + queue: make([]*block.Block, blockCacheSize), checkBlocks: make(chan struct{}, 1), chain: bc, relayF: relayer, @@ -39,67 +42,93 @@ func newBlockQueue(capacity int, bc blockchainer.Blockqueuer, log *zap.Logger, r } func (bq *blockQueue) run() { + var lastHeight = bq.chain.BlockHeight() for { _, ok := <-bq.checkBlocks if !ok { break } for { - item := bq.queue.Peek() - if item == nil { - break - } - minblock := item.(*block.Block) - if minblock.Index <= bq.chain.BlockHeight()+1 { - _, _ = bq.queue.Get(1) - updateBlockQueueLenMetric(bq.length()) - if minblock.Index == bq.chain.BlockHeight()+1 { - err := bq.chain.AddBlock(minblock) - if err != nil { - // The block might already be added by consensus. - if bq.chain.BlockHeight() < minblock.Index { - bq.log.Warn("blockQueue: failed adding block into the blockchain", - zap.String("error", err.Error()), - zap.Uint32("blockHeight", bq.chain.BlockHeight()), - zap.Uint32("nextIndex", minblock.Index)) - } - } else if bq.relayF != nil { - bq.relayF(minblock) - } + h := bq.chain.BlockHeight() + pos := int(h+1) % blockCacheSize + bq.queueLock.Lock() + b := bq.queue[pos] + // The chain moved forward using blocks from other sources (consensus). + for i := lastHeight; i < h; i++ { + old := int(i+1) % blockCacheSize + if bq.queue[old] != nil && bq.queue[old].Index == i { + bq.len-- + bq.queue[old] = nil } - } else { + } + bq.queueLock.Unlock() + lastHeight = h + if b == nil { break } + + err := bq.chain.AddBlock(b) + if err != nil { + // The block might already be added by consensus. + if bq.chain.BlockHeight() < b.Index { + bq.log.Warn("blockQueue: failed adding block into the blockchain", + zap.String("error", err.Error()), + zap.Uint32("blockHeight", bq.chain.BlockHeight()), + zap.Uint32("nextIndex", b.Index)) + } + } else if bq.relayF != nil { + bq.relayF(b) + } + bq.queueLock.Lock() + bq.len-- + l := bq.len + if bq.queue[pos] == b { + bq.queue[pos] = nil + } + bq.queueLock.Unlock() + updateBlockQueueLenMetric(l) } } } func (bq *blockQueue) putBlock(block *block.Block) error { h := bq.chain.BlockHeight() + bq.queueLock.Lock() if block.Index <= h || h+blockCacheSize < block.Index { // can easily happen when fetching the same blocks from // different peers, thus not considered as error + bq.queueLock.Unlock() return nil } - err := bq.queue.Put(block) + pos := block.Index % blockCacheSize + // If we already have it, keep the old block, throw away new one. + if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { + bq.len++ + bq.queue[pos] = block + } + l := bq.len + bq.queueLock.Unlock() // update metrics - updateBlockQueueLenMetric(bq.length()) + updateBlockQueueLenMetric(l) select { case bq.checkBlocks <- struct{}{}: // ok, signalled to goroutine processing queue default: // it's already busy processing blocks } - return err + return nil } func (bq *blockQueue) discard() { if bq.discarded.CAS(false, true) { close(bq.checkBlocks) - bq.queue.Dispose() + bq.queueLock.Lock() + // Technically we could bq.queue = nil, but this would cost + // another if in run(). + for i := 0; i < len(bq.queue); i++ { + bq.queue[i] = nil + } + bq.len = 0 + bq.queueLock.Unlock() } } - -func (bq *blockQueue) length() int { - return bq.queue.Len() -} diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go index 1fb442434..cffdb1873 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/blockqueue_test.go @@ -74,3 +74,10 @@ func TestBlockQueue(t *testing.T) { bq.discard() assert.Equal(t, 0, bq.length()) } + +// length wraps len access for tests to make them thread-safe. +func (bq *blockQueue) length() int { + bq.queueLock.Lock() + defer bq.queueLock.Unlock() + return bq.len +}