Merge pull request #2930 from nspcc-dev/addblock-lock

consensus: prevent AddBlock lock caused by consensus service
This commit is contained in:
Roman Khimov 2023-03-15 17:59:48 +03:00 committed by GitHub
commit a8aa29727b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 159 additions and 88 deletions

View file

@ -384,6 +384,7 @@ func mkConsensus(config config.Consensus, tpb time.Duration, chain *core.Blockch
Logger: log,
Broadcast: serv.BroadcastExtensible,
Chain: chain,
BlockQueue: serv.GetBlockQueue(),
ProtocolConfiguration: chain.GetConfig().ProtocolConfiguration,
RequestTx: serv.RequestTx,
StopTxFlow: serv.StopTxFlow,

2
go.mod
View file

@ -10,7 +10,7 @@ require (
github.com/holiman/uint256 v1.2.0
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/mr-tron/base58 v1.2.0
github.com/nspcc-dev/dbft v0.0.0-20221020093431-31c1bbdc74f2
github.com/nspcc-dev/dbft v0.0.0-20230314095711-114b11c42cf6
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20221202075445-cb5c18dc73eb
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20220113123743-7f3162110659

9
go.sum
View file

@ -251,8 +251,8 @@ github.com/nspcc-dev/dbft v0.0.0-20191209120240-0d6b7568d9ae/go.mod h1:3FjXOoHmA
github.com/nspcc-dev/dbft v0.0.0-20200117124306-478e5cfbf03a/go.mod h1:/YFK+XOxxg0Bfm6P92lY5eDSLYfp06XOdL8KAVgXjVk=
github.com/nspcc-dev/dbft v0.0.0-20200219114139-199d286ed6c1/go.mod h1:O0qtn62prQSqizzoagHmuuKoz8QMkU3SzBoKdEvm3aQ=
github.com/nspcc-dev/dbft v0.0.0-20210721160347-1b03241391ac/go.mod h1:U8MSnEShH+o5hexfWJdze6uMFJteP0ko7J2frO7Yu1Y=
github.com/nspcc-dev/dbft v0.0.0-20221020093431-31c1bbdc74f2 h1:2soBy8en5W4/1Gvbog8RyVpEbarGWZwPxppZjffWzZE=
github.com/nspcc-dev/dbft v0.0.0-20221020093431-31c1bbdc74f2/go.mod h1:g9xisXmX9NP9MjioaTe862n9SlZTrP+6PVUWLBYOr98=
github.com/nspcc-dev/dbft v0.0.0-20230314095711-114b11c42cf6 h1:K4AcGPB0JDLloGwRY2BPJJeWyNQw+n0BnrdgFjSEBIs=
github.com/nspcc-dev/dbft v0.0.0-20230314095711-114b11c42cf6/go.mod h1:IsUsZqxQkav23pFrCyZEibz0VukpI787XnbcsFkekI8=
github.com/nspcc-dev/go-ordered-json v0.0.0-20210915112629-e1b6cce73d02/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U=
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 h1:n4ZaFCKt1pQJd7PXoMJabZWK9ejjbLOVrkl/lOUmshg=
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U=
@ -403,6 +403,7 @@ golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.4.0 h1:UVQgzMY87xqpKNgb+kDsll2Igd33HszWHFLmpaRMq/8=
golang.org/x/crypto v0.4.0/go.mod h1:3quD/ATkf6oY+rnes5c3ExXTbLc8mueNue5/DoinL80=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -477,6 +478,7 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
@ -557,12 +559,14 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210429154555-c04ba851c2a4/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@ -573,6 +577,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=

View file

@ -155,6 +155,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
Logger: zap.NewNop(),
Broadcast: netSrv.BroadcastExtensible,
Chain: chain,
BlockQueue: netSrv.GetBlockQueue(),
ProtocolConfiguration: cfg.ProtocolConfiguration,
RequestTx: netSrv.RequestTx,
StopTxFlow: netSrv.StopTxFlow,

View file

@ -42,7 +42,6 @@ const nsInMs = 1000000
// Ledger is the interface to Blockchain sufficient for Service.
type Ledger interface {
AddBlock(block *coreb.Block) error
ApplyPolicyToTxSet([]*transaction.Transaction) []*transaction.Transaction
GetConfig() config.Blockchain
GetMemPool() *mempool.Pool
@ -58,6 +57,11 @@ type Ledger interface {
mempool.Feer
}
// BlockQueuer is an interface to the block queue manager sufficient for Service.
type BlockQueuer interface {
PutBlock(block *coreb.Block) error
}
// Service represents a consensus instance.
type Service interface {
// Name returns service name.
@ -89,7 +93,8 @@ type service struct {
messages chan Payload
transactions chan *transaction.Transaction
// blockEvents is used to pass a new block event to the consensus
// process.
// process. It has a tiny buffer in order to avoid Blockchain blocking
// on block addition under the high load.
blockEvents chan *coreb.Block
lastProposal []util.Uint256
wallet *wallet.Wallet
@ -114,6 +119,8 @@ type Config struct {
Broadcast func(p *npayload.Extensible)
// Chain is a Ledger instance.
Chain Ledger
// BlockQueue is a BlockQueuer instance.
BlockQueue BlockQueuer
// ProtocolConfiguration contains protocol settings.
ProtocolConfiguration config.ProtocolConfiguration
// RequestTx is a callback to which will be called
@ -336,11 +343,19 @@ events:
case b := <-s.blockEvents:
s.handleChainBlock(b)
}
// Always process block event if there is any, we can add one above.
// Always process block event if there is any, we can add one above or external
// services can add several blocks during message processing.
var latestBlock *coreb.Block
syncLoop:
for {
select {
case b := <-s.blockEvents:
s.handleChainBlock(b)
case latestBlock = <-s.blockEvents:
default:
break syncLoop
}
}
if latestBlock != nil {
s.handleChainBlock(latestBlock)
}
}
drainLoop:
@ -564,11 +579,11 @@ func (s *service) processBlock(b block.Block) {
bb := &b.(*neoBlock).Block
bb.Script = *(s.getBlockWitness(bb))
if err := s.Chain.AddBlock(bb); err != nil {
if err := s.BlockQueue.PutBlock(bb); err != nil {
// The block might already be added via the regular network
// interaction.
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {
s.log.Warn("error on add block", zap.Error(err))
s.log.Warn("error on enqueue block", zap.Error(err))
}
}
s.postBlock(bb)

View file

@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/core"
coreb "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/fee"
"github.com/nspcc-dev/neo-go/pkg/core/native"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
@ -50,6 +51,7 @@ func TestNewWatchingService(t *testing.T) {
Logger: zaptest.NewLogger(t),
Broadcast: func(*npayload.Extensible) {},
Chain: bc,
BlockQueue: testBlockQueuer{bc: bc},
ProtocolConfiguration: bc.GetConfig().ProtocolConfiguration,
RequestTx: func(...util.Uint256) {},
StopTxFlow: func() {},
@ -62,6 +64,14 @@ func TestNewWatchingService(t *testing.T) {
require.NotPanics(t, srv.Shutdown)
}
func collectBlock(t *testing.T, bc *core.Blockchain, srv *service) {
h := bc.BlockHeight()
srv.dbft.OnTimeout(timer.HV{Height: srv.dbft.Context.BlockIndex}) // Collect and add block to the chain.
header, err := bc.GetHeader(bc.GetHeaderHash(h + 1))
require.NoError(t, err)
srv.dbft.InitializeConsensus(0, header.Timestamp*nsInMs) // Init consensus manually at the next height, as we don't run the consensus service.
}
func initServiceNextConsensus(t *testing.T, newAcc *wallet.Account, offset uint32) (*service, *wallet.Account) {
acc, err := wallet.NewAccountFromWIF(testchain.WIF(testchain.IDToOrder(0)))
require.NoError(t, err)
@ -89,7 +99,11 @@ func initServiceNextConsensus(t *testing.T, newAcc *wallet.Account, offset uint3
require.NoError(t, bc.PoolTx(tx))
srv := newTestServiceWithChain(t, bc)
h := bc.BlockHeight()
srv.dbft.Start(0)
header, err := bc.GetHeader(bc.GetHeaderHash(h + 1))
require.NoError(t, err)
srv.dbft.InitializeConsensus(0, header.Timestamp*nsInMs) // Init consensus manually at the next height, as we don't run the consensus service.
// Register new candidate.
b.Reset()
@ -104,11 +118,11 @@ func initServiceNextConsensus(t *testing.T, newAcc *wallet.Account, offset uint3
require.NoError(t, newAcc.SignTx(netmode.UnitTestNet, tx))
require.NoError(t, bc.PoolTx(tx))
srv.dbft.OnTimeout(timer.HV{Height: srv.dbft.Context.BlockIndex})
collectBlock(t, bc, srv)
cfg := bc.GetConfig()
for i := srv.dbft.BlockIndex; !cfg.ShouldUpdateCommitteeAt(i + offset); i++ {
srv.dbft.OnTimeout(timer.HV{Height: srv.dbft.Context.BlockIndex})
collectBlock(t, bc, srv)
}
// Vote for new candidate.
@ -125,7 +139,7 @@ func initServiceNextConsensus(t *testing.T, newAcc *wallet.Account, offset uint3
require.NoError(t, newAcc.SignTx(netmode.UnitTestNet, tx))
require.NoError(t, bc.PoolTx(tx))
srv.dbft.OnTimeout(timer.HV{Height: srv.dbft.BlockIndex})
collectBlock(t, bc, srv)
return srv, acc
}
@ -153,7 +167,7 @@ func TestService_NextConsensus(t *testing.T) {
// OnPersist <- update committee
// Block <-
srv.dbft.OnTimeout(timer.HV{Height: srv.dbft.BlockIndex})
collectBlock(t, bc, srv)
checkNextConsensus(t, bc, height+1, hash.Hash160(script))
})
/*
@ -495,6 +509,7 @@ func newTestServiceWithChain(t *testing.T, bc *core.Blockchain) *service {
Logger: zaptest.NewLogger(t),
Broadcast: func(*npayload.Extensible) {},
Chain: bc,
BlockQueue: testBlockQueuer{bc: bc},
ProtocolConfiguration: bc.GetConfig().ProtocolConfiguration,
RequestTx: func(...util.Uint256) {},
StopTxFlow: func() {},
@ -509,6 +524,17 @@ func newTestServiceWithChain(t *testing.T, bc *core.Blockchain) *service {
return srv.(*service)
}
type testBlockQueuer struct {
bc *core.Blockchain
}
var _ = BlockQueuer(testBlockQueuer{})
// PutBlock implements BlockQueuer interface.
func (bq testBlockQueuer) PutBlock(b *coreb.Block) error {
return bq.bc.AddBlock(b)
}
func getTestValidator(i int) (*privateKey, *publicKey) {
key := testchain.PrivateKey(i)
return &privateKey{PrivateKey: key}, &publicKey{PublicKey: key.PublicKey()}

View file

@ -1,4 +1,4 @@
package network
package bqueue
import (
"sync"
@ -15,7 +15,8 @@ type Blockqueuer interface {
BlockHeight() uint32
}
type blockQueue struct {
// Queue is the block queue.
type Queue struct {
log *zap.Logger
queueLock sync.RWMutex
queue []*block.Block
@ -25,34 +26,36 @@ type blockQueue struct {
relayF func(*block.Block)
discarded *atomic.Bool
len int
lenUpdateF func(int)
}
const (
// blockCacheSize is the amount of blocks above the current height
// which are stored in the queue.
blockCacheSize = 2000
)
// CacheSize is the amount of blocks above the current height
// which are stored in the queue.
const CacheSize = 2000
func indexToPosition(i uint32) int {
return int(i) % blockCacheSize
return int(i) % CacheSize
}
func newBlockQueue(capacity int, bc Blockqueuer, log *zap.Logger, relayer func(*block.Block)) *blockQueue {
// New creates an instance of BlockQueue.
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), lenMetricsUpdater func(l int)) *Queue {
if log == nil {
return nil
}
return &blockQueue{
return &Queue{
log: log,
queue: make([]*block.Block, blockCacheSize),
queue: make([]*block.Block, CacheSize),
checkBlocks: make(chan struct{}, 1),
chain: bc,
relayF: relayer,
discarded: atomic.NewBool(false),
lenUpdateF: lenMetricsUpdater,
}
}
func (bq *blockQueue) run() {
// Run runs the BlockQueue queueing loop. It must be called in a separate routine.
func (bq *Queue) Run() {
var lastHeight = bq.chain.BlockHeight()
for {
_, ok := <-bq.checkBlocks
@ -97,19 +100,22 @@ func (bq *blockQueue) run() {
bq.queue[pos] = nil
}
bq.queueLock.Unlock()
updateBlockQueueLenMetric(l)
if bq.lenUpdateF != nil {
bq.lenUpdateF(l)
}
}
}
}
func (bq *blockQueue) putBlock(block *block.Block) error {
// PutBlock enqueues block to be added to the chain.
func (bq *Queue) PutBlock(block *block.Block) error {
h := bq.chain.BlockHeight()
bq.queueLock.Lock()
defer bq.queueLock.Unlock()
if bq.discarded.Load() {
return nil
}
if block.Index <= h || h+blockCacheSize < block.Index {
if block.Index <= h || h+CacheSize < block.Index {
// can easily happen when fetching the same blocks from
// different peers, thus not considered as error
return nil
@ -119,14 +125,15 @@ func (bq *blockQueue) putBlock(block *block.Block) error {
if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {
bq.len++
bq.queue[pos] = block
for pos < blockCacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
for pos < CacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
bq.lastQ = bq.queue[pos].Index
pos++
}
}
l := bq.len
// update metrics
updateBlockQueueLenMetric(l)
if bq.lenUpdateF != nil {
bq.lenUpdateF(bq.len)
}
select {
case bq.checkBlocks <- struct{}{}:
// ok, signalled to goroutine processing queue
@ -136,20 +143,21 @@ func (bq *blockQueue) putBlock(block *block.Block) error {
return nil
}
// lastQueued returns the index of the last queued block and the queue's capacity
// LastQueued returns the index of the last queued block and the queue's capacity
// left.
func (bq *blockQueue) lastQueued() (uint32, int) {
func (bq *Queue) LastQueued() (uint32, int) {
bq.queueLock.RLock()
defer bq.queueLock.RUnlock()
return bq.lastQ, blockCacheSize - bq.len
return bq.lastQ, CacheSize - bq.len
}
func (bq *blockQueue) discard() {
// Discard stops the queue and prevents it from accepting more blocks to enqueue.
func (bq *Queue) Discard() {
if bq.discarded.CAS(false, true) {
bq.queueLock.Lock()
close(bq.checkBlocks)
// Technically we could bq.queue = nil, but this would cost
// another if in run().
// another if in Run().
for i := 0; i < len(bq.queue); i++ {
bq.queue[i] = nil
}

View file

@ -1,4 +1,4 @@
package network
package bqueue
import (
"testing"
@ -13,77 +13,77 @@ import (
func TestBlockQueue(t *testing.T) {
chain := fakechain.NewFakeChain()
// notice, it's not yet running
bq := newBlockQueue(0, chain, zaptest.NewLogger(t), nil)
bq := New(chain, zaptest.NewLogger(t), nil, nil)
blocks := make([]*block.Block, 11)
for i := 1; i < 11; i++ {
blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}}
}
// not the ones expected currently
for i := 3; i < 5; i++ {
assert.NoError(t, bq.putBlock(blocks[i]))
assert.NoError(t, bq.PutBlock(blocks[i]))
}
last, capLeft := bq.lastQueued()
last, capLeft := bq.LastQueued()
assert.Equal(t, uint32(0), last)
assert.Equal(t, blockCacheSize-2, capLeft)
assert.Equal(t, CacheSize-2, capLeft)
// nothing should be put into the blockchain
assert.Equal(t, uint32(0), chain.BlockHeight())
assert.Equal(t, 2, bq.length())
// now added the expected ones (with duplicates)
for i := 1; i < 5; i++ {
assert.NoError(t, bq.putBlock(blocks[i]))
assert.NoError(t, bq.PutBlock(blocks[i]))
}
// but they're still not put into the blockchain, because bq isn't running
last, capLeft = bq.lastQueued()
last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(4), last)
assert.Equal(t, blockCacheSize-4, capLeft)
assert.Equal(t, CacheSize-4, capLeft)
assert.Equal(t, uint32(0), chain.BlockHeight())
assert.Equal(t, 4, bq.length())
// block with too big index is dropped
assert.NoError(t, bq.putBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + blockCacheSize + 1}}))
assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + CacheSize + 1}}))
assert.Equal(t, 4, bq.length())
go bq.run()
go bq.Run()
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond)
last, capLeft = bq.lastQueued()
last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(4), last)
assert.Equal(t, blockCacheSize, capLeft)
assert.Equal(t, CacheSize, capLeft)
assert.Equal(t, 0, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
// put some old blocks
for i := 1; i < 5; i++ {
assert.NoError(t, bq.putBlock(blocks[i]))
assert.NoError(t, bq.PutBlock(blocks[i]))
}
last, capLeft = bq.lastQueued()
last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(4), last)
assert.Equal(t, blockCacheSize, capLeft)
assert.Equal(t, CacheSize, capLeft)
assert.Equal(t, 0, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
// unexpected blocks with run() active
assert.NoError(t, bq.putBlock(blocks[8]))
assert.NoError(t, bq.PutBlock(blocks[8]))
assert.Equal(t, 1, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
assert.NoError(t, bq.putBlock(blocks[7]))
assert.NoError(t, bq.PutBlock(blocks[7]))
assert.Equal(t, 2, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
// sparse put
assert.NoError(t, bq.putBlock(blocks[10]))
assert.NoError(t, bq.PutBlock(blocks[10]))
assert.Equal(t, 3, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
assert.NoError(t, bq.putBlock(blocks[6]))
assert.NoError(t, bq.putBlock(blocks[5]))
assert.NoError(t, bq.PutBlock(blocks[6]))
assert.NoError(t, bq.PutBlock(blocks[5]))
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond)
last, capLeft = bq.lastQueued()
last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(8), last)
assert.Equal(t, blockCacheSize-1, capLeft)
assert.Equal(t, CacheSize-1, capLeft)
assert.Equal(t, 1, bq.length())
assert.Equal(t, uint32(8), chain.BlockHeight())
bq.discard()
bq.Discard()
assert.Equal(t, 0, bq.length())
}
// length wraps len access for tests to make them thread-safe.
func (bq *blockQueue) length() int {
func (bq *Queue) length() int {
bq.queueLock.Lock()
defer bq.queueLock.Unlock()
return bq.len

View file

@ -24,6 +24,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
@ -57,7 +58,7 @@ type (
Ledger interface {
extpool.Ledger
mempool.Feer
Blockqueuer
bqueue.Blockqueuer
GetBlock(hash util.Uint256) (*block.Block, error)
GetConfig() config.Blockchain
GetHeader(hash util.Uint256) (*block.Header, error)
@ -100,8 +101,8 @@ type (
transports []Transporter
discovery Discoverer
chain Ledger
bQueue *blockQueue
bSyncQueue *blockQueue
bQueue *bqueue.Queue
bSyncQueue *bqueue.Queue
mempool *mempool.Pool
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
@ -204,11 +205,11 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}, s.notaryFeer)
})
}
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) {
s.bQueue = bqueue.New(chain, log, func(b *block.Block) {
s.tryStartServices()
})
}, updateBlockQueueLenMetric)
s.bSyncQueue = newBlockQueue(maxBlockBatch, s.stateSync, log, nil)
s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric)
if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",
@ -278,8 +279,8 @@ func (s *Server) Start(errChan chan error) {
}
go s.broadcastTxLoop()
go s.relayBlocksLoop()
go s.bQueue.run()
go s.bSyncQueue.run()
go s.bQueue.Run()
go s.bSyncQueue.Run()
for _, tr := range s.transports {
go tr.Accept()
}
@ -297,8 +298,8 @@ func (s *Server) Shutdown() {
for _, p := range s.getPeers(nil) {
p.Disconnect(errServerShutdown)
}
s.bQueue.discard()
s.bSyncQueue.discard()
s.bQueue.Discard()
s.bSyncQueue.Discard()
s.serviceLock.RLock()
for _, svc := range s.services {
svc.Shutdown()
@ -323,6 +324,11 @@ func (s *Server) addService(svc Service) {
s.services[svc.Name()] = svc
}
// GetBlockQueue returns the block queue instance managed by Server.
func (s *Server) GetBlockQueue() *bqueue.Queue {
return s.bQueue
}
// AddExtensibleService register a service that handles an extensible payload of some kind.
func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) {
s.serviceLock.Lock()
@ -723,9 +729,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
// handleBlockCmd processes the block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
if s.stateSync.IsActive() {
return s.bSyncQueue.putBlock(block)
return s.bSyncQueue.PutBlock(block)
}
return s.bQueue.putBlock(block)
return s.bQueue.PutBlock(block)
}
// handlePing processes a ping request.
@ -749,7 +755,7 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error {
return nil
}
var (
bq Blockqueuer = s.chain
bq bqueue.Blockqueuer = s.chain
requestMPTNodes bool
)
if s.stateSync.IsActive() {
@ -1247,9 +1253,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
// 1. Block range is divided into chunks of payload.MaxHashesCount.
// 2. Send requests for chunk in increasing order.
// 3. After all requests have been sent, request random height.
func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error {
func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error {
pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock)
lq, capLeft := s.bQueue.lastQueued()
lq, capLeft := s.bQueue.LastQueued()
if capLeft == 0 {
// No more blocks will fit into the queue.
return nil
@ -1274,7 +1280,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato
if !lastRequestedHeight.CAS(old, needHeight) {
continue
}
} else if old < currHeight+(blockCacheSize-payload.MaxHashesCount) {
} else if old < currHeight+(bqueue.CacheSize-payload.MaxHashesCount) {
needHeight = currHeight + 1
if peerHeight > old+payload.MaxHashesCount {
needHeight = old + payload.MaxHashesCount
@ -1283,7 +1289,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato
}
}
} else {
index := mrand.Intn(blockCacheSize / payload.MaxHashesCount)
index := mrand.Intn(bqueue.CacheSize / payload.MaxHashesCount)
needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount)
}
break
@ -1381,7 +1387,7 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
func (s *Server) tryInitStateSync() {
if !s.stateSync.IsActive() {
s.bSyncQueue.discard()
s.bSyncQueue.Discard()
return
}
@ -1421,7 +1427,7 @@ func (s *Server) tryInitStateSync() {
// module can be inactive after init (i.e. full state is collected and ordinary block processing is needed)
if !s.stateSync.IsActive() {
s.bSyncQueue.discard()
s.bSyncQueue.Discard()
}
}
}

View file

@ -2,13 +2,14 @@ package network
import (
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/util"
)
// StateSync represents state sync module.
type StateSync interface {
AddMPTNodes([][]byte) error
Blockqueuer
bqueue.Blockqueuer
Init(currChainHeight uint32) error
IsActive() bool
IsInitialized() bool

View file

@ -65,6 +65,11 @@ type (
mp *mempool.Pool
// requests channel
reqCh chan mempoolevent.Event
// blocksCh is a channel used to receive block notifications from the
// Blockchain. It is not buffered intentionally, as it's important to keep
// the notary request pool in sync with the current blockchain heigh, thus,
// it's not recommended to use a large size of notary requests pool as it may
// slow down the block processing.
blocksCh chan *block.Block
stopCh chan struct{}
done chan struct{}

View file

@ -63,6 +63,9 @@ type (
timePerBlock time.Duration
maxRetries int
relayExtensible RelayCallback
// blockCh is a channel used to receive block notifications from the
// Blockchain. It has a tiny buffer in order to avoid Blockchain blocking
// on block addition under the high load.
blockCh chan *block.Block
stopCh chan struct{}
done chan struct{}
@ -84,7 +87,7 @@ func New(cfg config.StateRoot, sm *stateroot.Module, log *zap.Logger, bc Ledger,
chain: bc,
log: log,
incompleteRoots: make(map[uint32]*incompleteRoot),
blockCh: make(chan *block.Block),
blockCh: make(chan *block.Block, 1),
stopCh: make(chan struct{}),
done: make(chan struct{}),
timePerBlock: bcConf.TimePerBlock,