From 1ac4f8528d644c4086274e7e6e09f515e3679e4f Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 12 May 2020 17:20:41 +0300 Subject: [PATCH] core: add Blockchain event subscription mechanism A deep internal part of #895. Blockchainer interface is also extended for various uses of these methods. --- pkg/core/blockchain.go | 164 ++++++++++++++++++++++++++++++++++++ pkg/core/blockchain_test.go | 111 ++++++++++++++++++++++++ pkg/core/blockchainer.go | 8 ++ pkg/core/doc.go | 24 ++++++ pkg/network/helper_test.go | 26 ++++++ 5 files changed, 333 insertions(+) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index a48ef00d6..ab54b8d09 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -124,6 +124,20 @@ type Blockchain struct { log *zap.Logger lastBatch *storage.MemBatch + + // Notification subsystem. + events chan bcEvent + subCh chan interface{} + unsubCh chan interface{} +} + +// bcEvent is an internal event generated by the Blockchain and then +// broadcasted to other parties. It joins the new block and associated +// invocation logs, all the other events visible from outside can be produced +// from this combination. +type bcEvent struct { + block *block.Block + appExecResults []*state.AppExecResult } type headersOpFunc func(headerList *HeaderHashList) @@ -166,6 +180,9 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L memPool: mempool.NewMemPool(cfg.MemPoolSize), keyCache: make(map[util.Uint160]map[string]*keys.PublicKey), log: log, + events: make(chan bcEvent), + subCh: make(chan interface{}), + unsubCh: make(chan interface{}), generationAmount: genAmount, decrementInterval: decrementInterval, @@ -281,6 +298,7 @@ func (bc *Blockchain) Run() { } close(bc.runToExitCh) }() + go bc.notificationDispatcher() for { select { case <-bc.stopCh: @@ -300,6 +318,82 @@ func (bc *Blockchain) Run() { } } +// notificationDispatcher manages subscription to events and broadcasts new events. +func (bc *Blockchain) notificationDispatcher() { + var ( + // These are just sets of subscribers, though modelled as maps + // for ease of management (not a lot of subscriptions is really + // expected, but maps are convenient for adding/deleting elements). + blockFeed = make(map[chan<- *block.Block]bool) + txFeed = make(map[chan<- *transaction.Transaction]bool) + notificationFeed = make(map[chan<- *state.NotificationEvent]bool) + executionFeed = make(map[chan<- *state.AppExecResult]bool) + ) + for { + select { + case <-bc.stopCh: + return + case sub := <-bc.subCh: + switch ch := sub.(type) { + case chan<- *block.Block: + blockFeed[ch] = true + case chan<- *transaction.Transaction: + txFeed[ch] = true + case chan<- *state.NotificationEvent: + notificationFeed[ch] = true + case chan<- *state.AppExecResult: + executionFeed[ch] = true + default: + panic(fmt.Sprintf("bad subscription: %T", sub)) + } + case unsub := <-bc.unsubCh: + switch ch := unsub.(type) { + case chan<- *block.Block: + delete(blockFeed, ch) + case chan<- *transaction.Transaction: + delete(txFeed, ch) + case chan<- *state.NotificationEvent: + delete(notificationFeed, ch) + case chan<- *state.AppExecResult: + delete(executionFeed, ch) + default: + panic(fmt.Sprintf("bad unsubscription: %T", unsub)) + } + case event := <-bc.events: + // We don't want to waste time looping through transactions when there are no + // subscribers. + if len(txFeed) != 0 || len(notificationFeed) != 0 || len(executionFeed) != 0 { + var aerIdx int + for _, tx := range event.block.Transactions { + if tx.Type == transaction.InvocationType { + aer := event.appExecResults[aerIdx] + if !aer.TxHash.Equals(tx.Hash()) { + panic("inconsistent application execution results") + } + aerIdx++ + for ch := range executionFeed { + ch <- aer + } + if aer.VMState == "HALT" { + for i := range aer.Events { + for ch := range notificationFeed { + ch <- &aer.Events[i] + } + } + } + } + for ch := range txFeed { + ch <- tx + } + } + } + for ch := range blockFeed { + ch <- event.block + } + } + } +} + // Close stops Blockchain's internal loop, syncs changes to persistent storage // and closes it. The Blockchain is no longer functional after the call to Close. func (bc *Blockchain) Close() { @@ -463,6 +557,7 @@ func (bc *Blockchain) getSystemFeeAmount(h util.Uint256) uint32 { // and all tests are in place, we can make a more optimized and cleaner implementation. func (bc *Blockchain) storeBlock(block *block.Block) error { cache := dao.NewCached(bc.dao) + appExecResults := make([]*state.AppExecResult, 0, len(block.Transactions)) fee := bc.getSystemFeeAmount(block.PrevHash) for _, tx := range block.Transactions { fee += uint32(bc.SystemFee(tx).IntegralValue()) @@ -716,6 +811,7 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { Stack: v.Estack().ToContractParameters(), Events: systemInterop.notifications, } + appExecResults = append(appExecResults, aer) err = cache.PutAppExecResult(aer) if err != nil { return errors.Wrap(err, "failed to Store notifications") @@ -737,6 +833,12 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { atomic.StoreUint32(&bc.blockHeight, block.Index) updateBlockHeightMetric(block.Index) bc.memPool.RemoveStale(bc.isTxStillRelevant) + // Genesis block is stored when Blockchain is not yet running, so there + // is no one to read this event. And it doesn't make much sense as event + // anyway. + if block.Index != 0 { + bc.events <- bcEvent{block, appExecResults} + } return nil } @@ -1183,6 +1285,68 @@ func (bc *Blockchain) GetConfig() config.ProtocolConfiguration { return bc.config } +// SubscribeForBlocks adds given channel to new block event broadcasting, so when +// there is a new block added to the chain you'll receive it via this channel. +// Make sure it's read from regularly as not reading these events might affect +// other Blockchain functions. +func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) { + bc.subCh <- ch +} + +// SubscribeForTransactions adds given channel to new transaction event +// broadcasting, so when there is a new transaction added to the chain (in a +// block) you'll receive it via this channel. Make sure it's read from regularly +// as not reading these events might affect other Blockchain functions. +func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { + bc.subCh <- ch +} + +// SubscribeForNotifications adds given channel to new notifications event +// broadcasting, so when an in-block transaction execution generates a +// notification you'll receive it via this channel. Only notifications from +// successful transactions are broadcasted, if you're interested in failed +// transactions use SubscribeForExecutions instead. Make sure this channel is +// read from regularly as not reading these events might affect other Blockchain +// functions. +func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.NotificationEvent) { + bc.subCh <- ch +} + +// SubscribeForExecutions adds given channel to new transaction execution event +// broadcasting, so when an in-block transaction execution happens you'll receive +// the result of it via this channel. Make sure it's read from regularly as not +// reading these events might affect other Blockchain functions. +func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { + bc.subCh <- ch +} + +// UnsubscribeFromBlocks unsubscribes given channel from new block notifications, +// you can close it afterwards. Passing non-subscribed channel is a no-op. +func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) { + bc.unsubCh <- ch +} + +// UnsubscribeFromTransactions unsubscribes given channel from new transaction +// notifications, you can close it afterwards. Passing non-subscribed channel is +// a no-op. +func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { + bc.unsubCh <- ch +} + +// UnsubscribeFromNotifications unsubscribes given channel from new +// execution-generated notifications, you can close it afterwards. Passing +// non-subscribed channel is a no-op. +func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent) { + bc.unsubCh <- ch +} + +// UnsubscribeFromExecutions unsubscribes given channel from new execution +// notifications, you can close it afterwards. Passing non-subscribed channel is +// a no-op. +func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { + bc.unsubCh <- ch +} + // CalculateClaimable calculates the amount of GAS which can be claimed for a transaction with value. // First return value is GAS generated between startHeight and endHeight. // Second return value is GAS returned from accumulated SystemFees between startHeight and endHeight. diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 0313aa63e..d344671b5 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -2,13 +2,19 @@ package core import ( "testing" + "time" "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" + "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/vm/emit" + "github.com/nspcc-dev/neo-go/pkg/vm/opcode" + "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -229,3 +235,108 @@ func TestClose(t *testing.T) { // This should never be executed. assert.Nil(t, t) } + +func TestSubscriptions(t *testing.T) { + // We use buffering here as a substitute for reader goroutines, events + // get queued up and we read them one by one here. + const chBufSize = 16 + blockCh := make(chan *block.Block, chBufSize) + txCh := make(chan *transaction.Transaction, chBufSize) + notificationCh := make(chan *state.NotificationEvent, chBufSize) + executionCh := make(chan *state.AppExecResult, chBufSize) + + bc := newTestChain(t) + bc.SubscribeForBlocks(blockCh) + bc.SubscribeForTransactions(txCh) + bc.SubscribeForNotifications(notificationCh) + bc.SubscribeForExecutions(executionCh) + + assert.Empty(t, notificationCh) + assert.Empty(t, executionCh) + assert.Empty(t, blockCh) + assert.Empty(t, txCh) + + blocks, err := bc.genBlocks(1) + require.NoError(t, err) + assert.Eventually(t, func() bool { return len(blockCh) != 0 && len(txCh) != 0 }, time.Second, 10*time.Millisecond) + assert.Empty(t, notificationCh) + assert.Empty(t, executionCh) + + b := <-blockCh + tx := <-txCh + assert.Equal(t, blocks[0], b) + assert.Equal(t, blocks[0].Transactions[0], tx) + assert.Empty(t, blockCh) + assert.Empty(t, txCh) + + acc0, err := wallet.NewAccountFromWIF(privNetKeys[0]) + require.NoError(t, err) + addr0, err := address.StringToUint160(acc0.Address) + require.NoError(t, err) + + script := io.NewBufBinWriter() + emit.Bytes(script.BinWriter, []byte("yay!")) + emit.Syscall(script.BinWriter, "Neo.Runtime.Notify") + require.NoError(t, script.Err) + txGood1 := transaction.NewInvocationTX(script.Bytes(), 0) + txGood1.AddVerificationHash(addr0) + require.NoError(t, acc0.SignTx(txGood1)) + + // Reset() reuses the script buffer and we need to keep scripts. + script = io.NewBufBinWriter() + emit.Bytes(script.BinWriter, []byte("nay!")) + emit.Syscall(script.BinWriter, "Neo.Runtime.Notify") + emit.Opcode(script.BinWriter, opcode.THROW) + require.NoError(t, script.Err) + txBad := transaction.NewInvocationTX(script.Bytes(), 0) + txBad.AddVerificationHash(addr0) + require.NoError(t, acc0.SignTx(txBad)) + + script = io.NewBufBinWriter() + emit.Bytes(script.BinWriter, []byte("yay! yay! yay!")) + emit.Syscall(script.BinWriter, "Neo.Runtime.Notify") + require.NoError(t, script.Err) + txGood2 := transaction.NewInvocationTX(script.Bytes(), 0) + txGood2.AddVerificationHash(addr0) + require.NoError(t, acc0.SignTx(txGood2)) + + txMiner := newMinerTX() + invBlock := newBlock(bc.config, bc.BlockHeight()+1, bc.CurrentHeaderHash(), txMiner, txGood1, txBad, txGood2) + require.NoError(t, bc.AddBlock(invBlock)) + + require.Eventually(t, func() bool { + return len(blockCh) != 0 && len(txCh) != 0 && + len(notificationCh) != 0 && len(executionCh) != 0 + }, time.Second, 10*time.Millisecond) + + b = <-blockCh + require.Equal(t, invBlock, b) + assert.Empty(t, blockCh) + + // Follow in-block transaction order. + for _, txExpected := range invBlock.Transactions { + tx = <-txCh + require.Equal(t, txExpected, tx) + if txExpected.Type == transaction.InvocationType { + exec := <-executionCh + require.Equal(t, tx.Hash(), exec.TxHash) + if exec.VMState == "HALT" { + notif := <-notificationCh + inv := tx.Data.(*transaction.InvocationTX) + require.Equal(t, hash.Hash160(inv.Script), notif.ScriptHash) + } + } + } + assert.Empty(t, txCh) + assert.Empty(t, notificationCh) + assert.Empty(t, executionCh) + + bc.UnsubscribeFromBlocks(blockCh) + bc.UnsubscribeFromTransactions(txCh) + bc.UnsubscribeFromNotifications(notificationCh) + bc.UnsubscribeFromExecutions(executionCh) + + // Ensure that new blocks are processed correctly after unsubscription. + _, err = bc.genBlocks(2 * chBufSize) + require.NoError(t, err) +} diff --git a/pkg/core/blockchainer.go b/pkg/core/blockchainer.go index a633bb523..d3e0309de 100644 --- a/pkg/core/blockchainer.go +++ b/pkg/core/blockchainer.go @@ -46,6 +46,14 @@ type Blockchainer interface { References(t *transaction.Transaction) ([]transaction.InOut, error) mempool.Feer // fee interface PoolTx(*transaction.Transaction) error + SubscribeForBlocks(ch chan<- *block.Block) + SubscribeForExecutions(ch chan<- *state.AppExecResult) + SubscribeForNotifications(ch chan<- *state.NotificationEvent) + SubscribeForTransactions(ch chan<- *transaction.Transaction) VerifyTx(*transaction.Transaction, *block.Block) error GetMemPool() *mempool.Pool + UnsubscribeFromBlocks(ch chan<- *block.Block) + UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) + UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent) + UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) } diff --git a/pkg/core/doc.go b/pkg/core/doc.go index b0364db7f..c459ea132 100644 --- a/pkg/core/doc.go +++ b/pkg/core/doc.go @@ -1,5 +1,29 @@ /* Package core implements Neo ledger functionality. It's built around the Blockchain structure that maintains state of the ledger. + +Events + +You can subscribe to Blockchain events using a set of Subscribe and Unsubscribe +methods. These methods accept channels that will be used to send appropriate +events, so you can control buffering. Channels are never closed by Blockchain, +you can close them after unsubscription. + +Unlike RPC-level subscriptions these don't allow event filtering because it +doesn't improve overall efficiency much (when you're using Blockchain you're +in the same process with it and filtering on your side is not that different +from filtering on Blockchain side). + +The same level of ordering guarantees as with RPC subscriptions is provided, +albeit for a set of event channels, so at first transaction execution is +announced via appropriate channels, then followed by notifications generated +during this execution, then followed by transaction announcement and then +followed by block announcement. Transaction announcements are ordered the same +way they're stored in the block. + +Be careful using these subscriptions, this mechanism is not intended to be used +by lots of subscribers and failing to read from event channels can affect +other Blockchain operations. + */ package core diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 25e1fed7d..a719d012d 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -146,10 +146,36 @@ func (chain testChain) PoolTx(*transaction.Transaction) error { panic("TODO") } +func (chain testChain) SubscribeForBlocks(ch chan<- *block.Block) { + panic("TODO") +} +func (chain testChain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { + panic("TODO") +} +func (chain testChain) SubscribeForNotifications(ch chan<- *state.NotificationEvent) { + panic("TODO") +} +func (chain testChain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { + panic("TODO") +} + func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error { panic("TODO") } +func (chain testChain) UnsubscribeFromBlocks(ch chan<- *block.Block) { + panic("TODO") +} +func (chain testChain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { + panic("TODO") +} +func (chain testChain) UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent) { + panic("TODO") +} +func (chain testChain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { + panic("TODO") +} + type testDiscovery struct{} func (d testDiscovery) BackFill(addrs ...string) {}