diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index f7c93eb9d..54de3f1bd 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -127,6 +127,20 @@ type Blockchain struct { lastBatch *storage.MemBatch contracts native.Contracts + + // Notification subsystem. + events chan bcEvent + subCh chan interface{} + unsubCh chan interface{} +} + +// bcEvent is an internal event generated by the Blockchain and then +// broadcasted to other parties. It joins the new block and associated +// invocation logs, all the other events visible from outside can be produced +// from this combination. +type bcEvent struct { + block *block.Block + appExecResults []*state.AppExecResult } type headersOpFunc func(headerList *HeaderHashList) @@ -169,6 +183,9 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L memPool: mempool.NewMemPool(cfg.MemPoolSize), keyCache: make(map[util.Uint160]map[string]*keys.PublicKey), log: log, + events: make(chan bcEvent), + subCh: make(chan interface{}), + unsubCh: make(chan interface{}), generationAmount: genAmount, decrementInterval: decrementInterval, @@ -286,6 +303,7 @@ func (bc *Blockchain) Run() { } close(bc.runToExitCh) }() + go bc.notificationDispatcher() for { select { case <-bc.stopCh: @@ -305,6 +323,82 @@ func (bc *Blockchain) Run() { } } +// notificationDispatcher manages subscription to events and broadcasts new events. +func (bc *Blockchain) notificationDispatcher() { + var ( + // These are just sets of subscribers, though modelled as maps + // for ease of management (not a lot of subscriptions is really + // expected, but maps are convenient for adding/deleting elements). + blockFeed = make(map[chan<- *block.Block]bool) + txFeed = make(map[chan<- *transaction.Transaction]bool) + notificationFeed = make(map[chan<- *state.NotificationEvent]bool) + executionFeed = make(map[chan<- *state.AppExecResult]bool) + ) + for { + select { + case <-bc.stopCh: + return + case sub := <-bc.subCh: + switch ch := sub.(type) { + case chan<- *block.Block: + blockFeed[ch] = true + case chan<- *transaction.Transaction: + txFeed[ch] = true + case chan<- *state.NotificationEvent: + notificationFeed[ch] = true + case chan<- *state.AppExecResult: + executionFeed[ch] = true + default: + panic(fmt.Sprintf("bad subscription: %T", sub)) + } + case unsub := <-bc.unsubCh: + switch ch := unsub.(type) { + case chan<- *block.Block: + delete(blockFeed, ch) + case chan<- *transaction.Transaction: + delete(txFeed, ch) + case chan<- *state.NotificationEvent: + delete(notificationFeed, ch) + case chan<- *state.AppExecResult: + delete(executionFeed, ch) + default: + panic(fmt.Sprintf("bad unsubscription: %T", unsub)) + } + case event := <-bc.events: + // We don't want to waste time looping through transactions when there are no + // subscribers. + if len(txFeed) != 0 || len(notificationFeed) != 0 || len(executionFeed) != 0 { + var aerIdx int + for _, tx := range event.block.Transactions { + if tx.Type == transaction.InvocationType { + aer := event.appExecResults[aerIdx] + if !aer.TxHash.Equals(tx.Hash()) { + panic("inconsistent application execution results") + } + aerIdx++ + for ch := range executionFeed { + ch <- aer + } + if aer.VMState == "HALT" { + for i := range aer.Events { + for ch := range notificationFeed { + ch <- &aer.Events[i] + } + } + } + } + for ch := range txFeed { + ch <- tx + } + } + } + for ch := range blockFeed { + ch <- event.block + } + } + } +} + // Close stops Blockchain's internal loop, syncs changes to persistent storage // and closes it. The Blockchain is no longer functional after the call to Close. func (bc *Blockchain) Close() { @@ -468,6 +562,7 @@ func (bc *Blockchain) getSystemFeeAmount(h util.Uint256) uint32 { // and all tests are in place, we can make a more optimized and cleaner implementation. func (bc *Blockchain) storeBlock(block *block.Block) error { cache := dao.NewCached(bc.dao) + appExecResults := make([]*state.AppExecResult, 0, len(block.Transactions)) fee := bc.getSystemFeeAmount(block.PrevHash) for _, tx := range block.Transactions { fee += uint32(tx.SystemFee.IntegralValue()) @@ -700,6 +795,7 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { Stack: v.Estack().ToContractParameters(), Events: systemInterop.Notifications, } + appExecResults = append(appExecResults, aer) err = cache.PutAppExecResult(aer) if err != nil { return errors.Wrap(err, "failed to Store notifications") @@ -728,6 +824,12 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { atomic.StoreUint32(&bc.blockHeight, block.Index) updateBlockHeightMetric(block.Index) bc.memPool.RemoveStale(bc.isTxStillRelevant, bc) + // Genesis block is stored when Blockchain is not yet running, so there + // is no one to read this event. And it doesn't make much sense as event + // anyway. + if block.Index != 0 { + bc.events <- bcEvent{block, appExecResults} + } return nil } @@ -1054,6 +1156,68 @@ func (bc *Blockchain) GetConfig() config.ProtocolConfiguration { return bc.config } +// SubscribeForBlocks adds given channel to new block event broadcasting, so when +// there is a new block added to the chain you'll receive it via this channel. +// Make sure it's read from regularly as not reading these events might affect +// other Blockchain functions. +func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) { + bc.subCh <- ch +} + +// SubscribeForTransactions adds given channel to new transaction event +// broadcasting, so when there is a new transaction added to the chain (in a +// block) you'll receive it via this channel. Make sure it's read from regularly +// as not reading these events might affect other Blockchain functions. +func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { + bc.subCh <- ch +} + +// SubscribeForNotifications adds given channel to new notifications event +// broadcasting, so when an in-block transaction execution generates a +// notification you'll receive it via this channel. Only notifications from +// successful transactions are broadcasted, if you're interested in failed +// transactions use SubscribeForExecutions instead. Make sure this channel is +// read from regularly as not reading these events might affect other Blockchain +// functions. +func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.NotificationEvent) { + bc.subCh <- ch +} + +// SubscribeForExecutions adds given channel to new transaction execution event +// broadcasting, so when an in-block transaction execution happens you'll receive +// the result of it via this channel. Make sure it's read from regularly as not +// reading these events might affect other Blockchain functions. +func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { + bc.subCh <- ch +} + +// UnsubscribeFromBlocks unsubscribes given channel from new block notifications, +// you can close it afterwards. Passing non-subscribed channel is a no-op. +func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) { + bc.unsubCh <- ch +} + +// UnsubscribeFromTransactions unsubscribes given channel from new transaction +// notifications, you can close it afterwards. Passing non-subscribed channel is +// a no-op. +func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { + bc.unsubCh <- ch +} + +// UnsubscribeFromNotifications unsubscribes given channel from new +// execution-generated notifications, you can close it afterwards. Passing +// non-subscribed channel is a no-op. +func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent) { + bc.unsubCh <- ch +} + +// UnsubscribeFromExecutions unsubscribes given channel from new execution +// notifications, you can close it afterwards. Passing non-subscribed channel is +// a no-op. +func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { + bc.unsubCh <- ch +} + // CalculateClaimable calculates the amount of GAS which can be claimed for a transaction with value. // First return value is GAS generated between startHeight and endHeight. // Second return value is GAS returned from accumulated SystemFees between startHeight and endHeight. diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index a106d4fba..77a5d697e 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -2,12 +2,17 @@ package core import ( "testing" + "time" "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" + "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/vm/emit" + "github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -234,3 +239,106 @@ func TestClose(t *testing.T) { // This should never be executed. assert.Nil(t, t) } + +func TestSubscriptions(t *testing.T) { + // We use buffering here as a substitute for reader goroutines, events + // get queued up and we read them one by one here. + const chBufSize = 16 + blockCh := make(chan *block.Block, chBufSize) + txCh := make(chan *transaction.Transaction, chBufSize) + notificationCh := make(chan *state.NotificationEvent, chBufSize) + executionCh := make(chan *state.AppExecResult, chBufSize) + + bc := newTestChain(t) + bc.SubscribeForBlocks(blockCh) + bc.SubscribeForTransactions(txCh) + bc.SubscribeForNotifications(notificationCh) + bc.SubscribeForExecutions(executionCh) + + assert.Empty(t, notificationCh) + assert.Empty(t, executionCh) + assert.Empty(t, blockCh) + assert.Empty(t, txCh) + + blocks, err := bc.genBlocks(1) + require.NoError(t, err) + require.Eventually(t, func() bool { return len(blockCh) != 0 }, time.Second, 10*time.Millisecond) + assert.Empty(t, notificationCh) + assert.Empty(t, executionCh) + assert.Empty(t, txCh) + + b := <-blockCh + assert.Equal(t, blocks[0], b) + assert.Empty(t, blockCh) + + script := io.NewBufBinWriter() + emit.Bytes(script.BinWriter, []byte("yay!")) + emit.Syscall(script.BinWriter, "Neo.Runtime.Notify") + require.NoError(t, script.Err) + txGood1 := transaction.NewInvocationTX(script.Bytes(), 0) + txGood1.Sender = neoOwner + txGood1.Nonce = 1 + txGood1.ValidUntilBlock = 100500 + require.NoError(t, signTx(bc, txGood1)) + + // Reset() reuses the script buffer and we need to keep scripts. + script = io.NewBufBinWriter() + emit.Bytes(script.BinWriter, []byte("nay!")) + emit.Syscall(script.BinWriter, "Neo.Runtime.Notify") + emit.Opcode(script.BinWriter, opcode.THROW) + require.NoError(t, script.Err) + txBad := transaction.NewInvocationTX(script.Bytes(), 0) + txBad.Sender = neoOwner + txBad.Nonce = 2 + txBad.ValidUntilBlock = 100500 + require.NoError(t, signTx(bc, txBad)) + + script = io.NewBufBinWriter() + emit.Bytes(script.BinWriter, []byte("yay! yay! yay!")) + emit.Syscall(script.BinWriter, "Neo.Runtime.Notify") + require.NoError(t, script.Err) + txGood2 := transaction.NewInvocationTX(script.Bytes(), 0) + txGood2.Sender = neoOwner + txGood2.Nonce = 3 + txGood2.ValidUntilBlock = 100500 + require.NoError(t, signTx(bc, txGood2)) + + invBlock := newBlock(bc.config, bc.BlockHeight()+1, bc.CurrentHeaderHash(), txGood1, txBad, txGood2) + require.NoError(t, bc.AddBlock(invBlock)) + + require.Eventually(t, func() bool { + return len(blockCh) != 0 && len(txCh) != 0 && + len(notificationCh) != 0 && len(executionCh) != 0 + }, time.Second, 10*time.Millisecond) + + b = <-blockCh + require.Equal(t, invBlock, b) + assert.Empty(t, blockCh) + + // Follow in-block transaction order. + for _, txExpected := range invBlock.Transactions { + tx := <-txCh + require.Equal(t, txExpected, tx) + if txExpected.Type == transaction.InvocationType { + exec := <-executionCh + require.Equal(t, tx.Hash(), exec.TxHash) + if exec.VMState == "HALT" { + notif := <-notificationCh + inv := tx.Data.(*transaction.InvocationTX) + require.Equal(t, hash.Hash160(inv.Script), notif.ScriptHash) + } + } + } + assert.Empty(t, txCh) + assert.Empty(t, notificationCh) + assert.Empty(t, executionCh) + + bc.UnsubscribeFromBlocks(blockCh) + bc.UnsubscribeFromTransactions(txCh) + bc.UnsubscribeFromNotifications(notificationCh) + bc.UnsubscribeFromExecutions(executionCh) + + // Ensure that new blocks are processed correctly after unsubscription. + _, err = bc.genBlocks(2 * chBufSize) + require.NoError(t, err) +} diff --git a/pkg/core/blockchainer/blockchainer.go b/pkg/core/blockchainer/blockchainer.go index d749ec995..dacb0f2fd 100644 --- a/pkg/core/blockchainer/blockchainer.go +++ b/pkg/core/blockchainer/blockchainer.go @@ -47,6 +47,14 @@ type Blockchainer interface { References(t *transaction.Transaction) ([]transaction.InOut, error) mempool.Feer // fee interface PoolTx(*transaction.Transaction) error + SubscribeForBlocks(ch chan<- *block.Block) + SubscribeForExecutions(ch chan<- *state.AppExecResult) + SubscribeForNotifications(ch chan<- *state.NotificationEvent) + SubscribeForTransactions(ch chan<- *transaction.Transaction) VerifyTx(*transaction.Transaction, *block.Block) error GetMemPool() *mempool.Pool + UnsubscribeFromBlocks(ch chan<- *block.Block) + UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) + UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent) + UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) } diff --git a/pkg/core/doc.go b/pkg/core/doc.go index b0364db7f..c459ea132 100644 --- a/pkg/core/doc.go +++ b/pkg/core/doc.go @@ -1,5 +1,29 @@ /* Package core implements Neo ledger functionality. It's built around the Blockchain structure that maintains state of the ledger. + +Events + +You can subscribe to Blockchain events using a set of Subscribe and Unsubscribe +methods. These methods accept channels that will be used to send appropriate +events, so you can control buffering. Channels are never closed by Blockchain, +you can close them after unsubscription. + +Unlike RPC-level subscriptions these don't allow event filtering because it +doesn't improve overall efficiency much (when you're using Blockchain you're +in the same process with it and filtering on your side is not that different +from filtering on Blockchain side). + +The same level of ordering guarantees as with RPC subscriptions is provided, +albeit for a set of event channels, so at first transaction execution is +announced via appropriate channels, then followed by notifications generated +during this execution, then followed by transaction announcement and then +followed by block announcement. Transaction announcements are ordered the same +way they're stored in the block. + +Be careful using these subscriptions, this mechanism is not intended to be used +by lots of subscribers and failing to read from event channels can affect +other Blockchain operations. + */ package core diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 411afc88d..b29764c5b 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -145,10 +145,36 @@ func (chain testChain) PoolTx(*transaction.Transaction) error { panic("TODO") } +func (chain testChain) SubscribeForBlocks(ch chan<- *block.Block) { + panic("TODO") +} +func (chain testChain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { + panic("TODO") +} +func (chain testChain) SubscribeForNotifications(ch chan<- *state.NotificationEvent) { + panic("TODO") +} +func (chain testChain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { + panic("TODO") +} + func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error { panic("TODO") } +func (chain testChain) UnsubscribeFromBlocks(ch chan<- *block.Block) { + panic("TODO") +} +func (chain testChain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { + panic("TODO") +} +func (chain testChain) UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent) { + panic("TODO") +} +func (chain testChain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { + panic("TODO") +} + type testDiscovery struct{} func (d testDiscovery) BackFill(addrs ...string) {}