From c531dc0bdeb3306b6d46658086fc1d0f2b78380d Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 25 Sep 2019 19:54:31 +0300 Subject: [PATCH] network: add block queue This one will replace blockCache in Blockchain itself as it can and should be external from it. The idea is that we only feed successive blocks into the Blockchain and it only stores valid proper Blockchain and nothing else. --- go.mod | 1 + go.sum | 2 + pkg/core/block.go | 14 +++++++ pkg/core/block_test.go | 9 ++++ pkg/network/blockqueue.go | 77 ++++++++++++++++++++++++++++++++++ pkg/network/blockqueue_test.go | 71 +++++++++++++++++++++++++++++++ pkg/network/helper_test.go | 18 +++++--- pkg/network/server.go | 9 ++-- 8 files changed, 191 insertions(+), 10 deletions(-) create mode 100644 pkg/network/blockqueue.go create mode 100644 pkg/network/blockqueue_test.go diff --git a/go.mod b/go.mod index 5aa463fdb..6f1e947d8 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/CityOfZion/neo-go require ( + github.com/Workiva/go-datastructures v1.0.50 github.com/abiosoft/ishell v2.0.0+incompatible // indirect github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db // indirect github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect diff --git a/go.sum b/go.sum index 4ba89b8ea..3ec2e92ef 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo= +github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/abiosoft/ishell v2.0.0+incompatible h1:zpwIuEHc37EzrsIYah3cpevrIc8Oma7oZPxr03tlmmw= github.com/abiosoft/ishell v2.0.0+incompatible/go.mod h1:HQR9AqF2R3P4XXpMpI0NAzgHf/aS6+zVXRj14cVk9qg= github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db h1:CjPUSXOiYptLbTdr1RceuZgSFDQ7U15ITERUGrUORx8= diff --git a/pkg/core/block.go b/pkg/core/block.go index 27ec6b225..97fdeabce 100644 --- a/pkg/core/block.go +++ b/pkg/core/block.go @@ -5,6 +5,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/crypto" "github.com/CityOfZion/neo-go/pkg/io" "github.com/CityOfZion/neo-go/pkg/util" + "github.com/Workiva/go-datastructures/queue" log "github.com/sirupsen/logrus" ) @@ -132,3 +133,16 @@ func (b *Block) EncodeBinary(bw *io.BinWriter) { tx.EncodeBinary(bw) } } + +// Compare implements the queue Item interface. +func (b *Block) Compare(item queue.Item) int { + other := item.(*Block) + switch { + case b.Index > other.Index: + return 1 + case b.Index == other.Index: + return 0 + default: + return -1 + } +} diff --git a/pkg/core/block_test.go b/pkg/core/block_test.go index a26007d7e..164a0ab88 100644 --- a/pkg/core/block_test.go +++ b/pkg/core/block_test.go @@ -259,3 +259,12 @@ func TestBlockSizeCalculation(t *testing.T) { assert.Equal(t, 7360, len(benc)) assert.Equal(t, rawBlock, hex.EncodeToString(benc)) } + +func TestBlockCompare(t *testing.T) { + b1 := Block{BlockBase: BlockBase{Index: 1}} + b2 := Block{BlockBase: BlockBase{Index: 2}} + b3 := Block{BlockBase: BlockBase{Index: 3}} + assert.Equal(t, 1, b2.Compare(&b1)) + assert.Equal(t, 0, b2.Compare(&b2)) + assert.Equal(t, -1, b2.Compare(&b3)) +} diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go new file mode 100644 index 000000000..501ddd804 --- /dev/null +++ b/pkg/network/blockqueue.go @@ -0,0 +1,77 @@ +package network + +import ( + "github.com/CityOfZion/neo-go/pkg/core" + "github.com/Workiva/go-datastructures/queue" + log "github.com/sirupsen/logrus" +) + +type blockQueue struct { + queue *queue.PriorityQueue + checkBlocks chan struct{} + chain core.Blockchainer +} + +func newBlockQueue(capacity int, bc core.Blockchainer) *blockQueue { + return &blockQueue{ + queue: queue.NewPriorityQueue(capacity, false), + checkBlocks: make(chan struct{}, 1), + chain: bc, + } +} + +func (bq *blockQueue) run() { + for { + _, ok := <-bq.checkBlocks + if !ok { + break + } + for { + item := bq.queue.Peek() + if item == nil { + break + } + minblock := item.(*core.Block) + if minblock.Index <= bq.chain.BlockHeight()+1 { + _, _ = bq.queue.Get(1) + if minblock.Index == bq.chain.BlockHeight()+1 { + err := bq.chain.AddBlock(minblock) + if err != nil { + log.WithFields(log.Fields{ + "error": err.Error(), + "blockHeight": bq.chain.BlockHeight(), + "nextIndex": minblock.Index, + }).Warn("blockQueue: failed adding block into the blockchain") + } + } + } else { + break + } + } + } +} + +func (bq *blockQueue) putBlock(block *core.Block) error { + if bq.chain.BlockHeight() >= block.Index { + // can easily happen when fetching the same blocks from + // different peers, thus not considered as error + return nil + } + err := bq.queue.Put(block) + select { + case bq.checkBlocks <- struct{}{}: + // ok, signalled to goroutine processing queue + default: + // it's already busy processing blocks + } + return err +} + +func (bq *blockQueue) discard() { + close(bq.checkBlocks) + bq.queue.Dispose() +} + +func (bq *blockQueue) length() int { + return bq.queue.Len() +} diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go new file mode 100644 index 000000000..da4124c06 --- /dev/null +++ b/pkg/network/blockqueue_test.go @@ -0,0 +1,71 @@ +package network + +import ( + "testing" + "time" + + "github.com/CityOfZion/neo-go/pkg/core" + "github.com/stretchr/testify/assert" +) + +func TestBlockQueue(t *testing.T) { + chain := &testChain{} + // notice, it's not yet running + bq := newBlockQueue(0, chain) + blocks := make([]*core.Block, 11) + for i := 1; i < 11; i++ { + blocks[i] = &core.Block{BlockBase: core.BlockBase{Index: uint32(i)}} + } + // not the ones expected currently + for i := 3; i < 5; i++ { + assert.NoError(t, bq.putBlock(blocks[i])) + } + // nothing should be put into the blockchain + assert.Equal(t, uint32(0), chain.BlockHeight()) + assert.Equal(t, 2, bq.length()) + // now added expected ones (with duplicates) + for i := 1; i < 5; i++ { + assert.NoError(t, bq.putBlock(blocks[i])) + } + // but they're still not put into the blockchain, because bq isn't running + assert.Equal(t, uint32(0), chain.BlockHeight()) + assert.Equal(t, 4, bq.length()) + go bq.run() + // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one + for i := 0; i < 5; i++ { + if chain.BlockHeight() != 4 { + time.Sleep(time.Second) + } + } + assert.Equal(t, 0, bq.length()) + assert.Equal(t, uint32(4), chain.BlockHeight()) + // put some old blocks + for i := 1; i < 5; i++ { + assert.NoError(t, bq.putBlock(blocks[i])) + } + assert.Equal(t, 0, bq.length()) + assert.Equal(t, uint32(4), chain.BlockHeight()) + // unexpected blocks with run() active + assert.NoError(t, bq.putBlock(blocks[8])) + assert.Equal(t, 1, bq.length()) + assert.Equal(t, uint32(4), chain.BlockHeight()) + assert.NoError(t, bq.putBlock(blocks[7])) + assert.Equal(t, 2, bq.length()) + assert.Equal(t, uint32(4), chain.BlockHeight()) + // sparse put + assert.NoError(t, bq.putBlock(blocks[10])) + assert.Equal(t, 3, bq.length()) + assert.Equal(t, uint32(4), chain.BlockHeight()) + assert.NoError(t, bq.putBlock(blocks[6])) + assert.NoError(t, bq.putBlock(blocks[5])) + // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one + for i := 0; i < 5; i++ { + if chain.BlockHeight() != 8 { + time.Sleep(time.Second) + } + } + assert.Equal(t, 1, bq.length()) + assert.Equal(t, uint32(8), chain.BlockHeight()) + bq.discard() + assert.Equal(t, 0, bq.length()) +} diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index bcbdff5df..5580f3873 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -3,6 +3,7 @@ package network import ( "math/rand" "net" + "sync/atomic" "testing" "time" @@ -13,7 +14,9 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" ) -type testChain struct{} +type testChain struct { + blockheight uint32 +} func (chain testChain) GetConfig() config.ProtocolConfiguration { panic("TODO") @@ -38,11 +41,14 @@ func (chain testChain) NetworkFee(t *transaction.Transaction) util.Fixed8 { func (chain testChain) AddHeaders(...*core.Header) error { panic("TODO") } -func (chain testChain) AddBlock(*core.Block) error { - panic("TODO") +func (chain *testChain) AddBlock(block *core.Block) error { + if block.Index == chain.blockheight+1 { + atomic.StoreUint32(&chain.blockheight, block.Index) + } + return nil } -func (chain testChain) BlockHeight() uint32 { - return 0 +func (chain *testChain) BlockHeight() uint32 { + return atomic.LoadUint32(&chain.blockheight) } func (chain testChain) HeaderHeight() uint32 { return 0 @@ -168,7 +174,7 @@ func (p *localPeer) Handshaked() bool { func newTestServer() *Server { return &Server{ ServerConfig: ServerConfig{}, - chain: testChain{}, + chain: &testChain{}, transport: localTransport{}, discovery: testDiscovery{}, id: rand.Uint32(), diff --git a/pkg/network/server.go b/pkg/network/server.go index 1de1b060d..cff5ba21b 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -45,6 +45,7 @@ type ( transport Transporter discovery Discoverer chain core.Blockchainer + bQueue *blockQueue lock sync.RWMutex peers map[Peer]bool @@ -66,6 +67,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server { s := &Server{ ServerConfig: config, chain: chain, + bQueue: newBlockQueue(maxBlockBatch, chain), id: rand.Uint32(), quit: make(chan struct{}), addrReq: make(chan *Message, minPeers), @@ -97,6 +99,7 @@ func (s *Server) Start(errChan chan error) { s.discovery.BackFill(s.Seeds...) + go s.bQueue.run() go s.transport.Accept() s.run() } @@ -106,6 +109,7 @@ func (s *Server) Shutdown() { log.WithFields(log.Fields{ "peers": s.PeerCount(), }).Info("shutting down server") + s.bQueue.discard() close(s.quit) } @@ -273,10 +277,7 @@ func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) { // handleBlockCmd processes the received block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *core.Block) error { - if !s.chain.HasBlock(block.Hash()) { - return s.chain.AddBlock(block) - } - return nil + return s.bQueue.putBlock(block) } // handleInvCmd will process the received inventory.