core: add Blockchain event subscription mechanism
A deep internal part of #895. Blockchainer interface is also extended for various uses of these methods.
This commit is contained in:
parent
2e58a14978
commit
1ac4f8528d
5 changed files with 333 additions and 0 deletions
|
@ -124,6 +124,20 @@ type Blockchain struct {
|
|||
log *zap.Logger
|
||||
|
||||
lastBatch *storage.MemBatch
|
||||
|
||||
// Notification subsystem.
|
||||
events chan bcEvent
|
||||
subCh chan interface{}
|
||||
unsubCh chan interface{}
|
||||
}
|
||||
|
||||
// bcEvent is an internal event generated by the Blockchain and then
|
||||
// broadcasted to other parties. It joins the new block and associated
|
||||
// invocation logs, all the other events visible from outside can be produced
|
||||
// from this combination.
|
||||
type bcEvent struct {
|
||||
block *block.Block
|
||||
appExecResults []*state.AppExecResult
|
||||
}
|
||||
|
||||
type headersOpFunc func(headerList *HeaderHashList)
|
||||
|
@ -166,6 +180,9 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
|
|||
memPool: mempool.NewMemPool(cfg.MemPoolSize),
|
||||
keyCache: make(map[util.Uint160]map[string]*keys.PublicKey),
|
||||
log: log,
|
||||
events: make(chan bcEvent),
|
||||
subCh: make(chan interface{}),
|
||||
unsubCh: make(chan interface{}),
|
||||
|
||||
generationAmount: genAmount,
|
||||
decrementInterval: decrementInterval,
|
||||
|
@ -281,6 +298,7 @@ func (bc *Blockchain) Run() {
|
|||
}
|
||||
close(bc.runToExitCh)
|
||||
}()
|
||||
go bc.notificationDispatcher()
|
||||
for {
|
||||
select {
|
||||
case <-bc.stopCh:
|
||||
|
@ -300,6 +318,82 @@ func (bc *Blockchain) Run() {
|
|||
}
|
||||
}
|
||||
|
||||
// notificationDispatcher manages subscription to events and broadcasts new events.
|
||||
func (bc *Blockchain) notificationDispatcher() {
|
||||
var (
|
||||
// These are just sets of subscribers, though modelled as maps
|
||||
// for ease of management (not a lot of subscriptions is really
|
||||
// expected, but maps are convenient for adding/deleting elements).
|
||||
blockFeed = make(map[chan<- *block.Block]bool)
|
||||
txFeed = make(map[chan<- *transaction.Transaction]bool)
|
||||
notificationFeed = make(map[chan<- *state.NotificationEvent]bool)
|
||||
executionFeed = make(map[chan<- *state.AppExecResult]bool)
|
||||
)
|
||||
for {
|
||||
select {
|
||||
case <-bc.stopCh:
|
||||
return
|
||||
case sub := <-bc.subCh:
|
||||
switch ch := sub.(type) {
|
||||
case chan<- *block.Block:
|
||||
blockFeed[ch] = true
|
||||
case chan<- *transaction.Transaction:
|
||||
txFeed[ch] = true
|
||||
case chan<- *state.NotificationEvent:
|
||||
notificationFeed[ch] = true
|
||||
case chan<- *state.AppExecResult:
|
||||
executionFeed[ch] = true
|
||||
default:
|
||||
panic(fmt.Sprintf("bad subscription: %T", sub))
|
||||
}
|
||||
case unsub := <-bc.unsubCh:
|
||||
switch ch := unsub.(type) {
|
||||
case chan<- *block.Block:
|
||||
delete(blockFeed, ch)
|
||||
case chan<- *transaction.Transaction:
|
||||
delete(txFeed, ch)
|
||||
case chan<- *state.NotificationEvent:
|
||||
delete(notificationFeed, ch)
|
||||
case chan<- *state.AppExecResult:
|
||||
delete(executionFeed, ch)
|
||||
default:
|
||||
panic(fmt.Sprintf("bad unsubscription: %T", unsub))
|
||||
}
|
||||
case event := <-bc.events:
|
||||
// We don't want to waste time looping through transactions when there are no
|
||||
// subscribers.
|
||||
if len(txFeed) != 0 || len(notificationFeed) != 0 || len(executionFeed) != 0 {
|
||||
var aerIdx int
|
||||
for _, tx := range event.block.Transactions {
|
||||
if tx.Type == transaction.InvocationType {
|
||||
aer := event.appExecResults[aerIdx]
|
||||
if !aer.TxHash.Equals(tx.Hash()) {
|
||||
panic("inconsistent application execution results")
|
||||
}
|
||||
aerIdx++
|
||||
for ch := range executionFeed {
|
||||
ch <- aer
|
||||
}
|
||||
if aer.VMState == "HALT" {
|
||||
for i := range aer.Events {
|
||||
for ch := range notificationFeed {
|
||||
ch <- &aer.Events[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for ch := range txFeed {
|
||||
ch <- tx
|
||||
}
|
||||
}
|
||||
}
|
||||
for ch := range blockFeed {
|
||||
ch <- event.block
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops Blockchain's internal loop, syncs changes to persistent storage
|
||||
// and closes it. The Blockchain is no longer functional after the call to Close.
|
||||
func (bc *Blockchain) Close() {
|
||||
|
@ -463,6 +557,7 @@ func (bc *Blockchain) getSystemFeeAmount(h util.Uint256) uint32 {
|
|||
// and all tests are in place, we can make a more optimized and cleaner implementation.
|
||||
func (bc *Blockchain) storeBlock(block *block.Block) error {
|
||||
cache := dao.NewCached(bc.dao)
|
||||
appExecResults := make([]*state.AppExecResult, 0, len(block.Transactions))
|
||||
fee := bc.getSystemFeeAmount(block.PrevHash)
|
||||
for _, tx := range block.Transactions {
|
||||
fee += uint32(bc.SystemFee(tx).IntegralValue())
|
||||
|
@ -716,6 +811,7 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
|
|||
Stack: v.Estack().ToContractParameters(),
|
||||
Events: systemInterop.notifications,
|
||||
}
|
||||
appExecResults = append(appExecResults, aer)
|
||||
err = cache.PutAppExecResult(aer)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to Store notifications")
|
||||
|
@ -737,6 +833,12 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
|
|||
atomic.StoreUint32(&bc.blockHeight, block.Index)
|
||||
updateBlockHeightMetric(block.Index)
|
||||
bc.memPool.RemoveStale(bc.isTxStillRelevant)
|
||||
// Genesis block is stored when Blockchain is not yet running, so there
|
||||
// is no one to read this event. And it doesn't make much sense as event
|
||||
// anyway.
|
||||
if block.Index != 0 {
|
||||
bc.events <- bcEvent{block, appExecResults}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1183,6 +1285,68 @@ func (bc *Blockchain) GetConfig() config.ProtocolConfiguration {
|
|||
return bc.config
|
||||
}
|
||||
|
||||
// SubscribeForBlocks adds given channel to new block event broadcasting, so when
|
||||
// there is a new block added to the chain you'll receive it via this channel.
|
||||
// Make sure it's read from regularly as not reading these events might affect
|
||||
// other Blockchain functions.
|
||||
func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) {
|
||||
bc.subCh <- ch
|
||||
}
|
||||
|
||||
// SubscribeForTransactions adds given channel to new transaction event
|
||||
// broadcasting, so when there is a new transaction added to the chain (in a
|
||||
// block) you'll receive it via this channel. Make sure it's read from regularly
|
||||
// as not reading these events might affect other Blockchain functions.
|
||||
func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) {
|
||||
bc.subCh <- ch
|
||||
}
|
||||
|
||||
// SubscribeForNotifications adds given channel to new notifications event
|
||||
// broadcasting, so when an in-block transaction execution generates a
|
||||
// notification you'll receive it via this channel. Only notifications from
|
||||
// successful transactions are broadcasted, if you're interested in failed
|
||||
// transactions use SubscribeForExecutions instead. Make sure this channel is
|
||||
// read from regularly as not reading these events might affect other Blockchain
|
||||
// functions.
|
||||
func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.NotificationEvent) {
|
||||
bc.subCh <- ch
|
||||
}
|
||||
|
||||
// SubscribeForExecutions adds given channel to new transaction execution event
|
||||
// broadcasting, so when an in-block transaction execution happens you'll receive
|
||||
// the result of it via this channel. Make sure it's read from regularly as not
|
||||
// reading these events might affect other Blockchain functions.
|
||||
func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) {
|
||||
bc.subCh <- ch
|
||||
}
|
||||
|
||||
// UnsubscribeFromBlocks unsubscribes given channel from new block notifications,
|
||||
// you can close it afterwards. Passing non-subscribed channel is a no-op.
|
||||
func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
|
||||
bc.unsubCh <- ch
|
||||
}
|
||||
|
||||
// UnsubscribeFromTransactions unsubscribes given channel from new transaction
|
||||
// notifications, you can close it afterwards. Passing non-subscribed channel is
|
||||
// a no-op.
|
||||
func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) {
|
||||
bc.unsubCh <- ch
|
||||
}
|
||||
|
||||
// UnsubscribeFromNotifications unsubscribes given channel from new
|
||||
// execution-generated notifications, you can close it afterwards. Passing
|
||||
// non-subscribed channel is a no-op.
|
||||
func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent) {
|
||||
bc.unsubCh <- ch
|
||||
}
|
||||
|
||||
// UnsubscribeFromExecutions unsubscribes given channel from new execution
|
||||
// notifications, you can close it afterwards. Passing non-subscribed channel is
|
||||
// a no-op.
|
||||
func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) {
|
||||
bc.unsubCh <- ch
|
||||
}
|
||||
|
||||
// CalculateClaimable calculates the amount of GAS which can be claimed for a transaction with value.
|
||||
// First return value is GAS generated between startHeight and endHeight.
|
||||
// Second return value is GAS returned from accumulated SystemFees between startHeight and endHeight.
|
||||
|
|
|
@ -2,13 +2,19 @@ package core
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/storage"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
|
||||
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -229,3 +235,108 @@ func TestClose(t *testing.T) {
|
|||
// This should never be executed.
|
||||
assert.Nil(t, t)
|
||||
}
|
||||
|
||||
func TestSubscriptions(t *testing.T) {
|
||||
// We use buffering here as a substitute for reader goroutines, events
|
||||
// get queued up and we read them one by one here.
|
||||
const chBufSize = 16
|
||||
blockCh := make(chan *block.Block, chBufSize)
|
||||
txCh := make(chan *transaction.Transaction, chBufSize)
|
||||
notificationCh := make(chan *state.NotificationEvent, chBufSize)
|
||||
executionCh := make(chan *state.AppExecResult, chBufSize)
|
||||
|
||||
bc := newTestChain(t)
|
||||
bc.SubscribeForBlocks(blockCh)
|
||||
bc.SubscribeForTransactions(txCh)
|
||||
bc.SubscribeForNotifications(notificationCh)
|
||||
bc.SubscribeForExecutions(executionCh)
|
||||
|
||||
assert.Empty(t, notificationCh)
|
||||
assert.Empty(t, executionCh)
|
||||
assert.Empty(t, blockCh)
|
||||
assert.Empty(t, txCh)
|
||||
|
||||
blocks, err := bc.genBlocks(1)
|
||||
require.NoError(t, err)
|
||||
assert.Eventually(t, func() bool { return len(blockCh) != 0 && len(txCh) != 0 }, time.Second, 10*time.Millisecond)
|
||||
assert.Empty(t, notificationCh)
|
||||
assert.Empty(t, executionCh)
|
||||
|
||||
b := <-blockCh
|
||||
tx := <-txCh
|
||||
assert.Equal(t, blocks[0], b)
|
||||
assert.Equal(t, blocks[0].Transactions[0], tx)
|
||||
assert.Empty(t, blockCh)
|
||||
assert.Empty(t, txCh)
|
||||
|
||||
acc0, err := wallet.NewAccountFromWIF(privNetKeys[0])
|
||||
require.NoError(t, err)
|
||||
addr0, err := address.StringToUint160(acc0.Address)
|
||||
require.NoError(t, err)
|
||||
|
||||
script := io.NewBufBinWriter()
|
||||
emit.Bytes(script.BinWriter, []byte("yay!"))
|
||||
emit.Syscall(script.BinWriter, "Neo.Runtime.Notify")
|
||||
require.NoError(t, script.Err)
|
||||
txGood1 := transaction.NewInvocationTX(script.Bytes(), 0)
|
||||
txGood1.AddVerificationHash(addr0)
|
||||
require.NoError(t, acc0.SignTx(txGood1))
|
||||
|
||||
// Reset() reuses the script buffer and we need to keep scripts.
|
||||
script = io.NewBufBinWriter()
|
||||
emit.Bytes(script.BinWriter, []byte("nay!"))
|
||||
emit.Syscall(script.BinWriter, "Neo.Runtime.Notify")
|
||||
emit.Opcode(script.BinWriter, opcode.THROW)
|
||||
require.NoError(t, script.Err)
|
||||
txBad := transaction.NewInvocationTX(script.Bytes(), 0)
|
||||
txBad.AddVerificationHash(addr0)
|
||||
require.NoError(t, acc0.SignTx(txBad))
|
||||
|
||||
script = io.NewBufBinWriter()
|
||||
emit.Bytes(script.BinWriter, []byte("yay! yay! yay!"))
|
||||
emit.Syscall(script.BinWriter, "Neo.Runtime.Notify")
|
||||
require.NoError(t, script.Err)
|
||||
txGood2 := transaction.NewInvocationTX(script.Bytes(), 0)
|
||||
txGood2.AddVerificationHash(addr0)
|
||||
require.NoError(t, acc0.SignTx(txGood2))
|
||||
|
||||
txMiner := newMinerTX()
|
||||
invBlock := newBlock(bc.config, bc.BlockHeight()+1, bc.CurrentHeaderHash(), txMiner, txGood1, txBad, txGood2)
|
||||
require.NoError(t, bc.AddBlock(invBlock))
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return len(blockCh) != 0 && len(txCh) != 0 &&
|
||||
len(notificationCh) != 0 && len(executionCh) != 0
|
||||
}, time.Second, 10*time.Millisecond)
|
||||
|
||||
b = <-blockCh
|
||||
require.Equal(t, invBlock, b)
|
||||
assert.Empty(t, blockCh)
|
||||
|
||||
// Follow in-block transaction order.
|
||||
for _, txExpected := range invBlock.Transactions {
|
||||
tx = <-txCh
|
||||
require.Equal(t, txExpected, tx)
|
||||
if txExpected.Type == transaction.InvocationType {
|
||||
exec := <-executionCh
|
||||
require.Equal(t, tx.Hash(), exec.TxHash)
|
||||
if exec.VMState == "HALT" {
|
||||
notif := <-notificationCh
|
||||
inv := tx.Data.(*transaction.InvocationTX)
|
||||
require.Equal(t, hash.Hash160(inv.Script), notif.ScriptHash)
|
||||
}
|
||||
}
|
||||
}
|
||||
assert.Empty(t, txCh)
|
||||
assert.Empty(t, notificationCh)
|
||||
assert.Empty(t, executionCh)
|
||||
|
||||
bc.UnsubscribeFromBlocks(blockCh)
|
||||
bc.UnsubscribeFromTransactions(txCh)
|
||||
bc.UnsubscribeFromNotifications(notificationCh)
|
||||
bc.UnsubscribeFromExecutions(executionCh)
|
||||
|
||||
// Ensure that new blocks are processed correctly after unsubscription.
|
||||
_, err = bc.genBlocks(2 * chBufSize)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
|
@ -46,6 +46,14 @@ type Blockchainer interface {
|
|||
References(t *transaction.Transaction) ([]transaction.InOut, error)
|
||||
mempool.Feer // fee interface
|
||||
PoolTx(*transaction.Transaction) error
|
||||
SubscribeForBlocks(ch chan<- *block.Block)
|
||||
SubscribeForExecutions(ch chan<- *state.AppExecResult)
|
||||
SubscribeForNotifications(ch chan<- *state.NotificationEvent)
|
||||
SubscribeForTransactions(ch chan<- *transaction.Transaction)
|
||||
VerifyTx(*transaction.Transaction, *block.Block) error
|
||||
GetMemPool() *mempool.Pool
|
||||
UnsubscribeFromBlocks(ch chan<- *block.Block)
|
||||
UnsubscribeFromExecutions(ch chan<- *state.AppExecResult)
|
||||
UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent)
|
||||
UnsubscribeFromTransactions(ch chan<- *transaction.Transaction)
|
||||
}
|
||||
|
|
|
@ -1,5 +1,29 @@
|
|||
/*
|
||||
Package core implements Neo ledger functionality.
|
||||
It's built around the Blockchain structure that maintains state of the ledger.
|
||||
|
||||
Events
|
||||
|
||||
You can subscribe to Blockchain events using a set of Subscribe and Unsubscribe
|
||||
methods. These methods accept channels that will be used to send appropriate
|
||||
events, so you can control buffering. Channels are never closed by Blockchain,
|
||||
you can close them after unsubscription.
|
||||
|
||||
Unlike RPC-level subscriptions these don't allow event filtering because it
|
||||
doesn't improve overall efficiency much (when you're using Blockchain you're
|
||||
in the same process with it and filtering on your side is not that different
|
||||
from filtering on Blockchain side).
|
||||
|
||||
The same level of ordering guarantees as with RPC subscriptions is provided,
|
||||
albeit for a set of event channels, so at first transaction execution is
|
||||
announced via appropriate channels, then followed by notifications generated
|
||||
during this execution, then followed by transaction announcement and then
|
||||
followed by block announcement. Transaction announcements are ordered the same
|
||||
way they're stored in the block.
|
||||
|
||||
Be careful using these subscriptions, this mechanism is not intended to be used
|
||||
by lots of subscribers and failing to read from event channels can affect
|
||||
other Blockchain operations.
|
||||
|
||||
*/
|
||||
package core
|
||||
|
|
|
@ -146,10 +146,36 @@ func (chain testChain) PoolTx(*transaction.Transaction) error {
|
|||
panic("TODO")
|
||||
}
|
||||
|
||||
func (chain testChain) SubscribeForBlocks(ch chan<- *block.Block) {
|
||||
panic("TODO")
|
||||
}
|
||||
func (chain testChain) SubscribeForExecutions(ch chan<- *state.AppExecResult) {
|
||||
panic("TODO")
|
||||
}
|
||||
func (chain testChain) SubscribeForNotifications(ch chan<- *state.NotificationEvent) {
|
||||
panic("TODO")
|
||||
}
|
||||
func (chain testChain) SubscribeForTransactions(ch chan<- *transaction.Transaction) {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
func (chain testChain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
|
||||
panic("TODO")
|
||||
}
|
||||
func (chain testChain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) {
|
||||
panic("TODO")
|
||||
}
|
||||
func (chain testChain) UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent) {
|
||||
panic("TODO")
|
||||
}
|
||||
func (chain testChain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
type testDiscovery struct{}
|
||||
|
||||
func (d testDiscovery) BackFill(addrs ...string) {}
|
||||
|
|
Loading…
Reference in a new issue