core: add Blockchain event subscription mechanism

A deep internal part of #895. Blockchainer interface is also extended for
various uses of these methods.
This commit is contained in:
Roman Khimov 2020-05-12 17:20:41 +03:00
parent ac40d357f0
commit 3ca71d5e8d
5 changed files with 330 additions and 0 deletions

View file

@ -127,6 +127,20 @@ type Blockchain struct {
lastBatch *storage.MemBatch lastBatch *storage.MemBatch
contracts native.Contracts contracts native.Contracts
// Notification subsystem.
events chan bcEvent
subCh chan interface{}
unsubCh chan interface{}
}
// bcEvent is an internal event generated by the Blockchain and then
// broadcasted to other parties. It joins the new block and associated
// invocation logs, all the other events visible from outside can be produced
// from this combination.
type bcEvent struct {
block *block.Block
appExecResults []*state.AppExecResult
} }
type headersOpFunc func(headerList *HeaderHashList) type headersOpFunc func(headerList *HeaderHashList)
@ -169,6 +183,9 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
memPool: mempool.NewMemPool(cfg.MemPoolSize), memPool: mempool.NewMemPool(cfg.MemPoolSize),
keyCache: make(map[util.Uint160]map[string]*keys.PublicKey), keyCache: make(map[util.Uint160]map[string]*keys.PublicKey),
log: log, log: log,
events: make(chan bcEvent),
subCh: make(chan interface{}),
unsubCh: make(chan interface{}),
generationAmount: genAmount, generationAmount: genAmount,
decrementInterval: decrementInterval, decrementInterval: decrementInterval,
@ -286,6 +303,7 @@ func (bc *Blockchain) Run() {
} }
close(bc.runToExitCh) close(bc.runToExitCh)
}() }()
go bc.notificationDispatcher()
for { for {
select { select {
case <-bc.stopCh: case <-bc.stopCh:
@ -305,6 +323,82 @@ func (bc *Blockchain) Run() {
} }
} }
// notificationDispatcher manages subscription to events and broadcasts new events.
func (bc *Blockchain) notificationDispatcher() {
var (
// These are just sets of subscribers, though modelled as maps
// for ease of management (not a lot of subscriptions is really
// expected, but maps are convenient for adding/deleting elements).
blockFeed = make(map[chan<- *block.Block]bool)
txFeed = make(map[chan<- *transaction.Transaction]bool)
notificationFeed = make(map[chan<- *state.NotificationEvent]bool)
executionFeed = make(map[chan<- *state.AppExecResult]bool)
)
for {
select {
case <-bc.stopCh:
return
case sub := <-bc.subCh:
switch ch := sub.(type) {
case chan<- *block.Block:
blockFeed[ch] = true
case chan<- *transaction.Transaction:
txFeed[ch] = true
case chan<- *state.NotificationEvent:
notificationFeed[ch] = true
case chan<- *state.AppExecResult:
executionFeed[ch] = true
default:
panic(fmt.Sprintf("bad subscription: %T", sub))
}
case unsub := <-bc.unsubCh:
switch ch := unsub.(type) {
case chan<- *block.Block:
delete(blockFeed, ch)
case chan<- *transaction.Transaction:
delete(txFeed, ch)
case chan<- *state.NotificationEvent:
delete(notificationFeed, ch)
case chan<- *state.AppExecResult:
delete(executionFeed, ch)
default:
panic(fmt.Sprintf("bad unsubscription: %T", unsub))
}
case event := <-bc.events:
// We don't want to waste time looping through transactions when there are no
// subscribers.
if len(txFeed) != 0 || len(notificationFeed) != 0 || len(executionFeed) != 0 {
var aerIdx int
for _, tx := range event.block.Transactions {
if tx.Type == transaction.InvocationType {
aer := event.appExecResults[aerIdx]
if !aer.TxHash.Equals(tx.Hash()) {
panic("inconsistent application execution results")
}
aerIdx++
for ch := range executionFeed {
ch <- aer
}
if aer.VMState == "HALT" {
for i := range aer.Events {
for ch := range notificationFeed {
ch <- &aer.Events[i]
}
}
}
}
for ch := range txFeed {
ch <- tx
}
}
}
for ch := range blockFeed {
ch <- event.block
}
}
}
}
// Close stops Blockchain's internal loop, syncs changes to persistent storage // Close stops Blockchain's internal loop, syncs changes to persistent storage
// and closes it. The Blockchain is no longer functional after the call to Close. // and closes it. The Blockchain is no longer functional after the call to Close.
func (bc *Blockchain) Close() { func (bc *Blockchain) Close() {
@ -468,6 +562,7 @@ func (bc *Blockchain) getSystemFeeAmount(h util.Uint256) uint32 {
// and all tests are in place, we can make a more optimized and cleaner implementation. // and all tests are in place, we can make a more optimized and cleaner implementation.
func (bc *Blockchain) storeBlock(block *block.Block) error { func (bc *Blockchain) storeBlock(block *block.Block) error {
cache := dao.NewCached(bc.dao) cache := dao.NewCached(bc.dao)
appExecResults := make([]*state.AppExecResult, 0, len(block.Transactions))
fee := bc.getSystemFeeAmount(block.PrevHash) fee := bc.getSystemFeeAmount(block.PrevHash)
for _, tx := range block.Transactions { for _, tx := range block.Transactions {
fee += uint32(tx.SystemFee.IntegralValue()) fee += uint32(tx.SystemFee.IntegralValue())
@ -700,6 +795,7 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
Stack: v.Estack().ToContractParameters(), Stack: v.Estack().ToContractParameters(),
Events: systemInterop.Notifications, Events: systemInterop.Notifications,
} }
appExecResults = append(appExecResults, aer)
err = cache.PutAppExecResult(aer) err = cache.PutAppExecResult(aer)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to Store notifications") return errors.Wrap(err, "failed to Store notifications")
@ -728,6 +824,12 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
atomic.StoreUint32(&bc.blockHeight, block.Index) atomic.StoreUint32(&bc.blockHeight, block.Index)
updateBlockHeightMetric(block.Index) updateBlockHeightMetric(block.Index)
bc.memPool.RemoveStale(bc.isTxStillRelevant, bc) bc.memPool.RemoveStale(bc.isTxStillRelevant, bc)
// Genesis block is stored when Blockchain is not yet running, so there
// is no one to read this event. And it doesn't make much sense as event
// anyway.
if block.Index != 0 {
bc.events <- bcEvent{block, appExecResults}
}
return nil return nil
} }
@ -1054,6 +1156,68 @@ func (bc *Blockchain) GetConfig() config.ProtocolConfiguration {
return bc.config return bc.config
} }
// SubscribeForBlocks adds given channel to new block event broadcasting, so when
// there is a new block added to the chain you'll receive it via this channel.
// Make sure it's read from regularly as not reading these events might affect
// other Blockchain functions.
func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) {
bc.subCh <- ch
}
// SubscribeForTransactions adds given channel to new transaction event
// broadcasting, so when there is a new transaction added to the chain (in a
// block) you'll receive it via this channel. Make sure it's read from regularly
// as not reading these events might affect other Blockchain functions.
func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) {
bc.subCh <- ch
}
// SubscribeForNotifications adds given channel to new notifications event
// broadcasting, so when an in-block transaction execution generates a
// notification you'll receive it via this channel. Only notifications from
// successful transactions are broadcasted, if you're interested in failed
// transactions use SubscribeForExecutions instead. Make sure this channel is
// read from regularly as not reading these events might affect other Blockchain
// functions.
func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.NotificationEvent) {
bc.subCh <- ch
}
// SubscribeForExecutions adds given channel to new transaction execution event
// broadcasting, so when an in-block transaction execution happens you'll receive
// the result of it via this channel. Make sure it's read from regularly as not
// reading these events might affect other Blockchain functions.
func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) {
bc.subCh <- ch
}
// UnsubscribeFromBlocks unsubscribes given channel from new block notifications,
// you can close it afterwards. Passing non-subscribed channel is a no-op.
func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
bc.unsubCh <- ch
}
// UnsubscribeFromTransactions unsubscribes given channel from new transaction
// notifications, you can close it afterwards. Passing non-subscribed channel is
// a no-op.
func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) {
bc.unsubCh <- ch
}
// UnsubscribeFromNotifications unsubscribes given channel from new
// execution-generated notifications, you can close it afterwards. Passing
// non-subscribed channel is a no-op.
func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent) {
bc.unsubCh <- ch
}
// UnsubscribeFromExecutions unsubscribes given channel from new execution
// notifications, you can close it afterwards. Passing non-subscribed channel is
// a no-op.
func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) {
bc.unsubCh <- ch
}
// CalculateClaimable calculates the amount of GAS which can be claimed for a transaction with value. // CalculateClaimable calculates the amount of GAS which can be claimed for a transaction with value.
// First return value is GAS generated between startHeight and endHeight. // First return value is GAS generated between startHeight and endHeight.
// Second return value is GAS returned from accumulated SystemFees between startHeight and endHeight. // Second return value is GAS returned from accumulated SystemFees between startHeight and endHeight.

View file

@ -2,12 +2,17 @@ package core
import ( import (
"testing" "testing"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -234,3 +239,106 @@ func TestClose(t *testing.T) {
// This should never be executed. // This should never be executed.
assert.Nil(t, t) assert.Nil(t, t)
} }
func TestSubscriptions(t *testing.T) {
// We use buffering here as a substitute for reader goroutines, events
// get queued up and we read them one by one here.
const chBufSize = 16
blockCh := make(chan *block.Block, chBufSize)
txCh := make(chan *transaction.Transaction, chBufSize)
notificationCh := make(chan *state.NotificationEvent, chBufSize)
executionCh := make(chan *state.AppExecResult, chBufSize)
bc := newTestChain(t)
bc.SubscribeForBlocks(blockCh)
bc.SubscribeForTransactions(txCh)
bc.SubscribeForNotifications(notificationCh)
bc.SubscribeForExecutions(executionCh)
assert.Empty(t, notificationCh)
assert.Empty(t, executionCh)
assert.Empty(t, blockCh)
assert.Empty(t, txCh)
blocks, err := bc.genBlocks(1)
require.NoError(t, err)
require.Eventually(t, func() bool { return len(blockCh) != 0 }, time.Second, 10*time.Millisecond)
assert.Empty(t, notificationCh)
assert.Empty(t, executionCh)
assert.Empty(t, txCh)
b := <-blockCh
assert.Equal(t, blocks[0], b)
assert.Empty(t, blockCh)
script := io.NewBufBinWriter()
emit.Bytes(script.BinWriter, []byte("yay!"))
emit.Syscall(script.BinWriter, "Neo.Runtime.Notify")
require.NoError(t, script.Err)
txGood1 := transaction.NewInvocationTX(script.Bytes(), 0)
txGood1.Sender = neoOwner
txGood1.Nonce = 1
txGood1.ValidUntilBlock = 100500
require.NoError(t, signTx(bc, txGood1))
// Reset() reuses the script buffer and we need to keep scripts.
script = io.NewBufBinWriter()
emit.Bytes(script.BinWriter, []byte("nay!"))
emit.Syscall(script.BinWriter, "Neo.Runtime.Notify")
emit.Opcode(script.BinWriter, opcode.THROW)
require.NoError(t, script.Err)
txBad := transaction.NewInvocationTX(script.Bytes(), 0)
txBad.Sender = neoOwner
txBad.Nonce = 2
txBad.ValidUntilBlock = 100500
require.NoError(t, signTx(bc, txBad))
script = io.NewBufBinWriter()
emit.Bytes(script.BinWriter, []byte("yay! yay! yay!"))
emit.Syscall(script.BinWriter, "Neo.Runtime.Notify")
require.NoError(t, script.Err)
txGood2 := transaction.NewInvocationTX(script.Bytes(), 0)
txGood2.Sender = neoOwner
txGood2.Nonce = 3
txGood2.ValidUntilBlock = 100500
require.NoError(t, signTx(bc, txGood2))
invBlock := newBlock(bc.config, bc.BlockHeight()+1, bc.CurrentHeaderHash(), txGood1, txBad, txGood2)
require.NoError(t, bc.AddBlock(invBlock))
require.Eventually(t, func() bool {
return len(blockCh) != 0 && len(txCh) != 0 &&
len(notificationCh) != 0 && len(executionCh) != 0
}, time.Second, 10*time.Millisecond)
b = <-blockCh
require.Equal(t, invBlock, b)
assert.Empty(t, blockCh)
// Follow in-block transaction order.
for _, txExpected := range invBlock.Transactions {
tx := <-txCh
require.Equal(t, txExpected, tx)
if txExpected.Type == transaction.InvocationType {
exec := <-executionCh
require.Equal(t, tx.Hash(), exec.TxHash)
if exec.VMState == "HALT" {
notif := <-notificationCh
inv := tx.Data.(*transaction.InvocationTX)
require.Equal(t, hash.Hash160(inv.Script), notif.ScriptHash)
}
}
}
assert.Empty(t, txCh)
assert.Empty(t, notificationCh)
assert.Empty(t, executionCh)
bc.UnsubscribeFromBlocks(blockCh)
bc.UnsubscribeFromTransactions(txCh)
bc.UnsubscribeFromNotifications(notificationCh)
bc.UnsubscribeFromExecutions(executionCh)
// Ensure that new blocks are processed correctly after unsubscription.
_, err = bc.genBlocks(2 * chBufSize)
require.NoError(t, err)
}

View file

@ -47,6 +47,14 @@ type Blockchainer interface {
References(t *transaction.Transaction) ([]transaction.InOut, error) References(t *transaction.Transaction) ([]transaction.InOut, error)
mempool.Feer // fee interface mempool.Feer // fee interface
PoolTx(*transaction.Transaction) error PoolTx(*transaction.Transaction) error
SubscribeForBlocks(ch chan<- *block.Block)
SubscribeForExecutions(ch chan<- *state.AppExecResult)
SubscribeForNotifications(ch chan<- *state.NotificationEvent)
SubscribeForTransactions(ch chan<- *transaction.Transaction)
VerifyTx(*transaction.Transaction, *block.Block) error VerifyTx(*transaction.Transaction, *block.Block) error
GetMemPool() *mempool.Pool GetMemPool() *mempool.Pool
UnsubscribeFromBlocks(ch chan<- *block.Block)
UnsubscribeFromExecutions(ch chan<- *state.AppExecResult)
UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent)
UnsubscribeFromTransactions(ch chan<- *transaction.Transaction)
} }

View file

@ -1,5 +1,29 @@
/* /*
Package core implements Neo ledger functionality. Package core implements Neo ledger functionality.
It's built around the Blockchain structure that maintains state of the ledger. It's built around the Blockchain structure that maintains state of the ledger.
Events
You can subscribe to Blockchain events using a set of Subscribe and Unsubscribe
methods. These methods accept channels that will be used to send appropriate
events, so you can control buffering. Channels are never closed by Blockchain,
you can close them after unsubscription.
Unlike RPC-level subscriptions these don't allow event filtering because it
doesn't improve overall efficiency much (when you're using Blockchain you're
in the same process with it and filtering on your side is not that different
from filtering on Blockchain side).
The same level of ordering guarantees as with RPC subscriptions is provided,
albeit for a set of event channels, so at first transaction execution is
announced via appropriate channels, then followed by notifications generated
during this execution, then followed by transaction announcement and then
followed by block announcement. Transaction announcements are ordered the same
way they're stored in the block.
Be careful using these subscriptions, this mechanism is not intended to be used
by lots of subscribers and failing to read from event channels can affect
other Blockchain operations.
*/ */
package core package core

View file

@ -145,10 +145,36 @@ func (chain testChain) PoolTx(*transaction.Transaction) error {
panic("TODO") panic("TODO")
} }
func (chain testChain) SubscribeForBlocks(ch chan<- *block.Block) {
panic("TODO")
}
func (chain testChain) SubscribeForExecutions(ch chan<- *state.AppExecResult) {
panic("TODO")
}
func (chain testChain) SubscribeForNotifications(ch chan<- *state.NotificationEvent) {
panic("TODO")
}
func (chain testChain) SubscribeForTransactions(ch chan<- *transaction.Transaction) {
panic("TODO")
}
func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error { func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error {
panic("TODO") panic("TODO")
} }
func (chain testChain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
panic("TODO")
}
func (chain testChain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) {
panic("TODO")
}
func (chain testChain) UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent) {
panic("TODO")
}
func (chain testChain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) {
panic("TODO")
}
type testDiscovery struct{} type testDiscovery struct{}
func (d testDiscovery) BackFill(addrs ...string) {} func (d testDiscovery) BackFill(addrs ...string) {}