network: add block queue

This one will replace blockCache in Blockchain itself as it can and should be
external from it. The idea is that we only feed successive blocks into the
Blockchain and it only stores valid proper Blockchain and nothing else.
This commit is contained in:
Roman Khimov 2019-09-25 19:54:31 +03:00
parent 903173c991
commit c531dc0bde
8 changed files with 191 additions and 10 deletions

1
go.mod
View file

@ -1,6 +1,7 @@
module github.com/CityOfZion/neo-go
require (
github.com/Workiva/go-datastructures v1.0.50
github.com/abiosoft/ishell v2.0.0+incompatible // indirect
github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db // indirect
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect

2
go.sum
View file

@ -1,3 +1,5 @@
github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo=
github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/abiosoft/ishell v2.0.0+incompatible h1:zpwIuEHc37EzrsIYah3cpevrIc8Oma7oZPxr03tlmmw=
github.com/abiosoft/ishell v2.0.0+incompatible/go.mod h1:HQR9AqF2R3P4XXpMpI0NAzgHf/aS6+zVXRj14cVk9qg=
github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db h1:CjPUSXOiYptLbTdr1RceuZgSFDQ7U15ITERUGrUORx8=

View file

@ -5,6 +5,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/crypto"
"github.com/CityOfZion/neo-go/pkg/io"
"github.com/CityOfZion/neo-go/pkg/util"
"github.com/Workiva/go-datastructures/queue"
log "github.com/sirupsen/logrus"
)
@ -132,3 +133,16 @@ func (b *Block) EncodeBinary(bw *io.BinWriter) {
tx.EncodeBinary(bw)
}
}
// Compare implements the queue Item interface.
func (b *Block) Compare(item queue.Item) int {
other := item.(*Block)
switch {
case b.Index > other.Index:
return 1
case b.Index == other.Index:
return 0
default:
return -1
}
}

View file

@ -259,3 +259,12 @@ func TestBlockSizeCalculation(t *testing.T) {
assert.Equal(t, 7360, len(benc))
assert.Equal(t, rawBlock, hex.EncodeToString(benc))
}
func TestBlockCompare(t *testing.T) {
b1 := Block{BlockBase: BlockBase{Index: 1}}
b2 := Block{BlockBase: BlockBase{Index: 2}}
b3 := Block{BlockBase: BlockBase{Index: 3}}
assert.Equal(t, 1, b2.Compare(&b1))
assert.Equal(t, 0, b2.Compare(&b2))
assert.Equal(t, -1, b2.Compare(&b3))
}

77
pkg/network/blockqueue.go Normal file
View file

@ -0,0 +1,77 @@
package network
import (
"github.com/CityOfZion/neo-go/pkg/core"
"github.com/Workiva/go-datastructures/queue"
log "github.com/sirupsen/logrus"
)
type blockQueue struct {
queue *queue.PriorityQueue
checkBlocks chan struct{}
chain core.Blockchainer
}
func newBlockQueue(capacity int, bc core.Blockchainer) *blockQueue {
return &blockQueue{
queue: queue.NewPriorityQueue(capacity, false),
checkBlocks: make(chan struct{}, 1),
chain: bc,
}
}
func (bq *blockQueue) run() {
for {
_, ok := <-bq.checkBlocks
if !ok {
break
}
for {
item := bq.queue.Peek()
if item == nil {
break
}
minblock := item.(*core.Block)
if minblock.Index <= bq.chain.BlockHeight()+1 {
_, _ = bq.queue.Get(1)
if minblock.Index == bq.chain.BlockHeight()+1 {
err := bq.chain.AddBlock(minblock)
if err != nil {
log.WithFields(log.Fields{
"error": err.Error(),
"blockHeight": bq.chain.BlockHeight(),
"nextIndex": minblock.Index,
}).Warn("blockQueue: failed adding block into the blockchain")
}
}
} else {
break
}
}
}
}
func (bq *blockQueue) putBlock(block *core.Block) error {
if bq.chain.BlockHeight() >= block.Index {
// can easily happen when fetching the same blocks from
// different peers, thus not considered as error
return nil
}
err := bq.queue.Put(block)
select {
case bq.checkBlocks <- struct{}{}:
// ok, signalled to goroutine processing queue
default:
// it's already busy processing blocks
}
return err
}
func (bq *blockQueue) discard() {
close(bq.checkBlocks)
bq.queue.Dispose()
}
func (bq *blockQueue) length() int {
return bq.queue.Len()
}

View file

@ -0,0 +1,71 @@
package network
import (
"testing"
"time"
"github.com/CityOfZion/neo-go/pkg/core"
"github.com/stretchr/testify/assert"
)
func TestBlockQueue(t *testing.T) {
chain := &testChain{}
// notice, it's not yet running
bq := newBlockQueue(0, chain)
blocks := make([]*core.Block, 11)
for i := 1; i < 11; i++ {
blocks[i] = &core.Block{BlockBase: core.BlockBase{Index: uint32(i)}}
}
// not the ones expected currently
for i := 3; i < 5; i++ {
assert.NoError(t, bq.putBlock(blocks[i]))
}
// nothing should be put into the blockchain
assert.Equal(t, uint32(0), chain.BlockHeight())
assert.Equal(t, 2, bq.length())
// now added expected ones (with duplicates)
for i := 1; i < 5; i++ {
assert.NoError(t, bq.putBlock(blocks[i]))
}
// but they're still not put into the blockchain, because bq isn't running
assert.Equal(t, uint32(0), chain.BlockHeight())
assert.Equal(t, 4, bq.length())
go bq.run()
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
for i := 0; i < 5; i++ {
if chain.BlockHeight() != 4 {
time.Sleep(time.Second)
}
}
assert.Equal(t, 0, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
// put some old blocks
for i := 1; i < 5; i++ {
assert.NoError(t, bq.putBlock(blocks[i]))
}
assert.Equal(t, 0, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
// unexpected blocks with run() active
assert.NoError(t, bq.putBlock(blocks[8]))
assert.Equal(t, 1, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
assert.NoError(t, bq.putBlock(blocks[7]))
assert.Equal(t, 2, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
// sparse put
assert.NoError(t, bq.putBlock(blocks[10]))
assert.Equal(t, 3, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
assert.NoError(t, bq.putBlock(blocks[6]))
assert.NoError(t, bq.putBlock(blocks[5]))
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
for i := 0; i < 5; i++ {
if chain.BlockHeight() != 8 {
time.Sleep(time.Second)
}
}
assert.Equal(t, 1, bq.length())
assert.Equal(t, uint32(8), chain.BlockHeight())
bq.discard()
assert.Equal(t, 0, bq.length())
}

View file

@ -3,6 +3,7 @@ package network
import (
"math/rand"
"net"
"sync/atomic"
"testing"
"time"
@ -13,7 +14,9 @@ import (
"github.com/CityOfZion/neo-go/pkg/util"
)
type testChain struct{}
type testChain struct {
blockheight uint32
}
func (chain testChain) GetConfig() config.ProtocolConfiguration {
panic("TODO")
@ -38,11 +41,14 @@ func (chain testChain) NetworkFee(t *transaction.Transaction) util.Fixed8 {
func (chain testChain) AddHeaders(...*core.Header) error {
panic("TODO")
}
func (chain testChain) AddBlock(*core.Block) error {
panic("TODO")
func (chain *testChain) AddBlock(block *core.Block) error {
if block.Index == chain.blockheight+1 {
atomic.StoreUint32(&chain.blockheight, block.Index)
}
func (chain testChain) BlockHeight() uint32 {
return 0
return nil
}
func (chain *testChain) BlockHeight() uint32 {
return atomic.LoadUint32(&chain.blockheight)
}
func (chain testChain) HeaderHeight() uint32 {
return 0
@ -168,7 +174,7 @@ func (p *localPeer) Handshaked() bool {
func newTestServer() *Server {
return &Server{
ServerConfig: ServerConfig{},
chain: testChain{},
chain: &testChain{},
transport: localTransport{},
discovery: testDiscovery{},
id: rand.Uint32(),

View file

@ -45,6 +45,7 @@ type (
transport Transporter
discovery Discoverer
chain core.Blockchainer
bQueue *blockQueue
lock sync.RWMutex
peers map[Peer]bool
@ -66,6 +67,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server {
s := &Server{
ServerConfig: config,
chain: chain,
bQueue: newBlockQueue(maxBlockBatch, chain),
id: rand.Uint32(),
quit: make(chan struct{}),
addrReq: make(chan *Message, minPeers),
@ -97,6 +99,7 @@ func (s *Server) Start(errChan chan error) {
s.discovery.BackFill(s.Seeds...)
go s.bQueue.run()
go s.transport.Accept()
s.run()
}
@ -106,6 +109,7 @@ func (s *Server) Shutdown() {
log.WithFields(log.Fields{
"peers": s.PeerCount(),
}).Info("shutting down server")
s.bQueue.discard()
close(s.quit)
}
@ -273,10 +277,7 @@ func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) {
// handleBlockCmd processes the received block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *core.Block) error {
if !s.chain.HasBlock(block.Hash()) {
return s.chain.AddBlock(block)
}
return nil
return s.bQueue.putBlock(block)
}
// handleInvCmd will process the received inventory.