diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index 0ac04a4c5..3ae130061 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -26,7 +26,7 @@ import ( type FakeChain struct { config.ProtocolConfiguration *mempool.Pool - blocksCh []chan<- *block.Block + blocksCh []chan *block.Block Blockheight uint32 PoolTxF func(*transaction.Transaction) error poolTxWithData func(*transaction.Transaction, interface{}, *mempool.Pool) error @@ -351,22 +351,22 @@ func (chain *FakeChain) PoolTx(tx *transaction.Transaction, _ ...*mempool.Pool) } // SubscribeForBlocks implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForBlocks(ch chan<- *block.Block) { +func (chain *FakeChain) SubscribeForBlocks(ch chan *block.Block) { chain.blocksCh = append(chain.blocksCh, ch) } // SubscribeForExecutions implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { +func (chain *FakeChain) SubscribeForExecutions(ch chan *state.AppExecResult) { panic("TODO") } // SubscribeForNotifications implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) { +func (chain *FakeChain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) { panic("TODO") } // SubscribeForTransactions implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { +func (chain *FakeChain) SubscribeForTransactions(ch chan *transaction.Transaction) { panic("TODO") } @@ -384,7 +384,7 @@ func (chain *FakeChain) VerifyWitness(util.Uint160, hash.Hashable, *transaction. } // UnsubscribeFromBlocks implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) { +func (chain *FakeChain) UnsubscribeFromBlocks(ch chan *block.Block) { for i, c := range chain.blocksCh { if c == ch { if i < len(chain.blocksCh) { @@ -396,17 +396,17 @@ func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) { } // UnsubscribeFromExecutions implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { +func (chain *FakeChain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) { panic("TODO") } // UnsubscribeFromNotifications implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) { +func (chain *FakeChain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) { panic("TODO") } // UnsubscribeFromTransactions implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { +func (chain *FakeChain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) { panic("TODO") } diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index dca2b80e6..f4cec74fb 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -51,8 +51,8 @@ type Ledger interface { GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) GetValidators() ([]*keys.PublicKey, error) PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error - SubscribeForBlocks(ch chan<- *coreb.Block) - UnsubscribeFromBlocks(ch chan<- *coreb.Block) + SubscribeForBlocks(ch chan *coreb.Block) + UnsubscribeFromBlocks(ch chan *coreb.Block) GetBaseExecFee() int64 interop.Ledger mempool.Feer diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 73bb05915..17fb7b3db 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -784,10 +784,10 @@ func (bc *Blockchain) notificationDispatcher() { // These are just sets of subscribers, though modelled as maps // for ease of management (not a lot of subscriptions is really // expected, but maps are convenient for adding/deleting elements). - blockFeed = make(map[chan<- *block.Block]bool) - txFeed = make(map[chan<- *transaction.Transaction]bool) - notificationFeed = make(map[chan<- *state.ContainedNotificationEvent]bool) - executionFeed = make(map[chan<- *state.AppExecResult]bool) + blockFeed = make(map[chan *block.Block]bool) + txFeed = make(map[chan *transaction.Transaction]bool) + notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool) + executionFeed = make(map[chan *state.AppExecResult]bool) ) for { select { @@ -795,26 +795,26 @@ func (bc *Blockchain) notificationDispatcher() { return case sub := <-bc.subCh: switch ch := sub.(type) { - case chan<- *block.Block: + case chan *block.Block: blockFeed[ch] = true - case chan<- *transaction.Transaction: + case chan *transaction.Transaction: txFeed[ch] = true - case chan<- *state.ContainedNotificationEvent: + case chan *state.ContainedNotificationEvent: notificationFeed[ch] = true - case chan<- *state.AppExecResult: + case chan *state.AppExecResult: executionFeed[ch] = true default: panic(fmt.Sprintf("bad subscription: %T", sub)) } case unsub := <-bc.unsubCh: switch ch := unsub.(type) { - case chan<- *block.Block: + case chan *block.Block: delete(blockFeed, ch) - case chan<- *transaction.Transaction: + case chan *transaction.Transaction: delete(txFeed, ch) - case chan<- *state.ContainedNotificationEvent: + case chan *state.ContainedNotificationEvent: delete(notificationFeed, ch) - case chan<- *state.AppExecResult: + case chan *state.AppExecResult: delete(executionFeed, ch) default: panic(fmt.Sprintf("bad unsubscription: %T", unsub)) @@ -1799,7 +1799,7 @@ func (bc *Blockchain) GetConfig() config.ProtocolConfiguration { // there is a new block added to the chain you'll receive it via this channel. // Make sure it's read from regularly as not reading these events might affect // other Blockchain functions. -func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) { +func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) { bc.subCh <- ch } @@ -1807,7 +1807,7 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) { // broadcasting, so when there is a new transaction added to the chain (in a // block) you'll receive it via this channel. Make sure it's read from regularly // as not reading these events might affect other Blockchain functions. -func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { +func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction) { bc.subCh <- ch } @@ -1818,7 +1818,7 @@ func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transactio // transactions use SubscribeForExecutions instead. Make sure this channel is // read from regularly as not reading these events might affect other Blockchain // functions. -func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) { +func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) { bc.subCh <- ch } @@ -1826,35 +1826,65 @@ func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotifi // broadcasting, so when an in-block transaction execution happens you'll receive // the result of it via this channel. Make sure it's read from regularly as not // reading these events might affect other Blockchain functions. -func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { +func (bc *Blockchain) SubscribeForExecutions(ch chan *state.AppExecResult) { bc.subCh <- ch } // UnsubscribeFromBlocks unsubscribes given channel from new block notifications, -// you can close it afterwards. Passing non-subscribed channel is a no-op. -func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) { - bc.unsubCh <- ch +// you can close it afterwards. Passing non-subscribed channel is a no-op, but +// the method can read from this channel (discarding any read data). +func (bc *Blockchain) UnsubscribeFromBlocks(ch chan *block.Block) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // UnsubscribeFromTransactions unsubscribes given channel from new transaction // notifications, you can close it afterwards. Passing non-subscribed channel is -// a no-op. -func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { - bc.unsubCh <- ch +// a no-op, but the method can read from this channel (discarding any read data). +func (bc *Blockchain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // UnsubscribeFromNotifications unsubscribes given channel from new // execution-generated notifications, you can close it afterwards. Passing -// non-subscribed channel is a no-op. -func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) { - bc.unsubCh <- ch +// non-subscribed channel is a no-op, but the method can read from this channel +// (discarding any read data). +func (bc *Blockchain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // UnsubscribeFromExecutions unsubscribes given channel from new execution // notifications, you can close it afterwards. Passing non-subscribed channel is -// a no-op. -func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { - bc.unsubCh <- ch +// a no-op, but the method can read from this channel (discarding any read data). +func (bc *Blockchain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // CalculateClaimable calculates the amount of GAS generated by owning specified diff --git a/pkg/network/server.go b/pkg/network/server.go index 00cfe4764..5392071e7 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -69,8 +69,8 @@ type ( PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data interface{}) error) error RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block)) - SubscribeForBlocks(ch chan<- *block.Block) - UnsubscribeFromBlocks(ch chan<- *block.Block) + SubscribeForBlocks(ch chan *block.Block) + UnsubscribeFromBlocks(ch chan *block.Block) } // Service is a service abstraction (oracle, state root, consensus, etc). diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 0fb1a6c90..fa3f3dd3f 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -32,8 +32,8 @@ type ( BlockHeight() uint32 GetMaxVerificationGAS() int64 GetNotaryContractScriptHash() util.Uint160 - SubscribeForBlocks(ch chan<- *block.Block) - UnsubscribeFromBlocks(ch chan<- *block.Block) + SubscribeForBlocks(ch chan *block.Block) + UnsubscribeFromBlocks(ch chan *block.Block) VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) } diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index e67c801a0..36d832809 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -94,14 +94,14 @@ type ( GetValidators() ([]*keys.PublicKey, error) HeaderHeight() uint32 InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error - SubscribeForBlocks(ch chan<- *block.Block) - SubscribeForExecutions(ch chan<- *state.AppExecResult) - SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) - SubscribeForTransactions(ch chan<- *transaction.Transaction) - UnsubscribeFromBlocks(ch chan<- *block.Block) - UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) - UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) - UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) + SubscribeForBlocks(ch chan *block.Block) + SubscribeForExecutions(ch chan *state.AppExecResult) + SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) + SubscribeForTransactions(ch chan *transaction.Transaction) + UnsubscribeFromBlocks(ch chan *block.Block) + UnsubscribeFromExecutions(ch chan *state.AppExecResult) + UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) + UnsubscribeFromTransactions(ch chan *transaction.Transaction) VerifyTx(*transaction.Transaction) error VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) mempool.Feer // fee interface diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 158377ae2..338ad4f3b 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -23,8 +23,8 @@ type ( Ledger interface { GetConfig() config.ProtocolConfiguration HeaderHeight() uint32 - SubscribeForBlocks(ch chan<- *block.Block) - UnsubscribeFromBlocks(ch chan<- *block.Block) + SubscribeForBlocks(ch chan *block.Block) + UnsubscribeFromBlocks(ch chan *block.Block) } // Service represents a state root service.