From e337a2f8aba039f43feb1055d5c90a207547bca7 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 10 Jan 2024 12:51:02 +0300 Subject: [PATCH] blockchain: make SubscribeFor* channel arguments send-only There is no need to accept rw channel. Strengthening the type to send-only will allow the caller to ensure control of reading from the provided channel. Closes #2885 Signed-off-by: Ekaterina Pavlova --- internal/fakechain/fakechain.go | 6 +- pkg/consensus/consensus.go | 4 +- pkg/core/blockchain.go | 95 ++++++++++--------------------- pkg/network/server.go | 4 +- pkg/services/notary/notary.go | 4 +- pkg/services/rpcsrv/server.go | 20 +++---- pkg/services/stateroot/service.go | 4 +- 7 files changed, 51 insertions(+), 86 deletions(-) diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index 46034b48f..db79e6d84 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -24,7 +24,7 @@ import ( type FakeChain struct { config.Blockchain *mempool.Pool - blocksCh []chan *block.Block + blocksCh []chan<- *block.Block Blockheight atomic.Uint32 PoolTxF func(*transaction.Transaction) error poolTxWithData func(*transaction.Transaction, any, *mempool.Pool) error @@ -346,7 +346,7 @@ func (chain *FakeChain) PoolTx(tx *transaction.Transaction, _ ...*mempool.Pool) } // SubscribeForBlocks implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForBlocks(ch chan *block.Block) { +func (chain *FakeChain) SubscribeForBlocks(ch chan<- *block.Block) { chain.blocksCh = append(chain.blocksCh, ch) } @@ -379,7 +379,7 @@ func (chain *FakeChain) VerifyWitness(util.Uint160, hash.Hashable, *transaction. } // UnsubscribeFromBlocks implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromBlocks(ch chan *block.Block) { +func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) { for i, c := range chain.blocksCh { if c == ch { if i < len(chain.blocksCh) { diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index c40e72b16..787ad9463 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -50,8 +50,8 @@ type Ledger interface { GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) ComputeNextBlockValidators() []*keys.PublicKey PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error - SubscribeForBlocks(ch chan *coreb.Block) - UnsubscribeFromBlocks(ch chan *coreb.Block) + SubscribeForBlocks(ch chan<- *coreb.Block) + UnsubscribeFromBlocks(ch chan<- *coreb.Block) GetBaseExecFee() int64 CalculateAttributesFee(tx *transaction.Transaction) int64 interop.Ledger diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 73f95b95c..81a6fa4ce 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -1327,11 +1327,11 @@ func (bc *Blockchain) notificationDispatcher() { // These are just sets of subscribers, though modelled as maps // for ease of management (not a lot of subscriptions is really // expected, but maps are convenient for adding/deleting elements). - blockFeed = make(map[chan *block.Block]bool) - headerFeed = make(map[chan *block.Header]bool) - txFeed = make(map[chan *transaction.Transaction]bool) - notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool) - executionFeed = make(map[chan *state.AppExecResult]bool) + blockFeed = make(map[chan<- *block.Block]bool) + headerFeed = make(map[chan<- *block.Header]bool) + txFeed = make(map[chan<- *transaction.Transaction]bool) + notificationFeed = make(map[chan<- *state.ContainedNotificationEvent]bool) + executionFeed = make(map[chan<- *state.AppExecResult]bool) ) for { select { @@ -1339,30 +1339,30 @@ func (bc *Blockchain) notificationDispatcher() { return case sub := <-bc.subCh: switch ch := sub.(type) { - case chan *block.Header: + case chan<- *block.Header: headerFeed[ch] = true - case chan *block.Block: + case chan<- *block.Block: blockFeed[ch] = true - case chan *transaction.Transaction: + case chan<- *transaction.Transaction: txFeed[ch] = true - case chan *state.ContainedNotificationEvent: + case chan<- *state.ContainedNotificationEvent: notificationFeed[ch] = true - case chan *state.AppExecResult: + case chan<- *state.AppExecResult: executionFeed[ch] = true default: panic(fmt.Sprintf("bad subscription: %T", sub)) } case unsub := <-bc.unsubCh: switch ch := unsub.(type) { - case chan *block.Header: + case chan<- *block.Header: delete(headerFeed, ch) - case chan *block.Block: + case chan<- *block.Block: delete(blockFeed, ch) - case chan *transaction.Transaction: + case chan<- *transaction.Transaction: delete(txFeed, ch) - case chan *state.ContainedNotificationEvent: + case chan<- *state.ContainedNotificationEvent: delete(notificationFeed, ch) - case chan *state.AppExecResult: + case chan<- *state.AppExecResult: delete(executionFeed, ch) default: panic(fmt.Sprintf("bad unsubscription: %T", unsub)) @@ -2285,7 +2285,7 @@ func (bc *Blockchain) GetConfig() config.Blockchain { // Make sure it's read from regularly as not reading these events might affect // other Blockchain functions. Make sure you're not changing the received blocks, // as it may affect the functionality of Blockchain and other subscribers. -func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) { +func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) { bc.subCh <- ch } @@ -2295,7 +2295,7 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) { // affect other Blockchain functions. Make sure you're not changing the received // headers, as it may affect the functionality of Blockchain and other // subscribers. -func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) { +func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan<- *block.Header) { bc.subCh <- ch } @@ -2305,7 +2305,7 @@ func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) { // as not reading these events might affect other Blockchain functions. Make sure // you're not changing the received transactions, as it may affect the // functionality of Blockchain and other subscribers. -func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction) { +func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { bc.subCh <- ch } @@ -2317,7 +2317,7 @@ func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction) // read from regularly as not reading these events might affect other Blockchain // functions. Make sure you're not changing the received notification events, as // it may affect the functionality of Blockchain and other subscribers. -func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) { +func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) { bc.subCh <- ch } @@ -2327,80 +2327,45 @@ func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotifica // reading these events might affect other Blockchain functions. Make sure you're // not changing the received execution results, as it may affect the // functionality of Blockchain and other subscribers. -func (bc *Blockchain) SubscribeForExecutions(ch chan *state.AppExecResult) { +func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { bc.subCh <- ch } // UnsubscribeFromBlocks unsubscribes given channel from new block notifications, // you can close it afterwards. Passing non-subscribed channel is a no-op, but // the method can read from this channel (discarding any read data). -func (bc *Blockchain) UnsubscribeFromBlocks(ch chan *block.Block) { -unsubloop: - for { - select { - case <-ch: - case bc.unsubCh <- ch: - break unsubloop - } - } +func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) { + bc.unsubCh <- ch } // UnsubscribeFromHeadersOfAddedBlocks unsubscribes given channel from new // block's header notifications, you can close it afterwards. Passing // non-subscribed channel is a no-op, but the method can read from this // channel (discarding any read data). -func (bc *Blockchain) UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header) { -unsubloop: - for { - select { - case <-ch: - case bc.unsubCh <- ch: - break unsubloop - } - } +func (bc *Blockchain) UnsubscribeFromHeadersOfAddedBlocks(ch chan<- *block.Header) { + bc.unsubCh <- ch } // UnsubscribeFromTransactions unsubscribes given channel from new transaction // notifications, you can close it afterwards. Passing non-subscribed channel is // a no-op, but the method can read from this channel (discarding any read data). -func (bc *Blockchain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) { -unsubloop: - for { - select { - case <-ch: - case bc.unsubCh <- ch: - break unsubloop - } - } +func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { + bc.unsubCh <- ch } // UnsubscribeFromNotifications unsubscribes given channel from new // execution-generated notifications, you can close it afterwards. Passing // non-subscribed channel is a no-op, but the method can read from this channel // (discarding any read data). -func (bc *Blockchain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) { -unsubloop: - for { - select { - case <-ch: - case bc.unsubCh <- ch: - break unsubloop - } - } +func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) { + bc.unsubCh <- ch } // UnsubscribeFromExecutions unsubscribes given channel from new execution // notifications, you can close it afterwards. Passing non-subscribed channel is // a no-op, but the method can read from this channel (discarding any read data). -func (bc *Blockchain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) { -unsubloop: - for { - select { - case <-ch: - case bc.unsubCh <- ch: - break unsubloop - } - } +func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { + bc.unsubCh <- ch } // CalculateClaimable calculates the amount of GAS generated by owning specified diff --git a/pkg/network/server.go b/pkg/network/server.go index fdce2f41b..7012a29fe 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -74,8 +74,8 @@ type ( PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error PoolTxWithData(t *transaction.Transaction, data any, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data any) error) error RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block)) - SubscribeForBlocks(ch chan *block.Block) - UnsubscribeFromBlocks(ch chan *block.Block) + SubscribeForBlocks(ch chan<- *block.Block) + UnsubscribeFromBlocks(ch chan<- *block.Block) } // Service is a service abstraction (oracle, state root, consensus, etc). diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 55ba8748d..a6fd8fe0c 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -33,8 +33,8 @@ type ( BlockHeight() uint32 GetMaxVerificationGAS() int64 GetNotaryContractScriptHash() util.Uint160 - SubscribeForBlocks(ch chan *block.Block) - UnsubscribeFromBlocks(ch chan *block.Block) + SubscribeForBlocks(ch chan<- *block.Block) + UnsubscribeFromBlocks(ch chan<- *block.Block) VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) } diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 5c4e2e9f6..29a02b6c5 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -99,16 +99,16 @@ type ( HeaderHeight() uint32 InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error P2PSigExtensionsEnabled() bool - SubscribeForBlocks(ch chan *block.Block) - SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) - SubscribeForExecutions(ch chan *state.AppExecResult) - SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) - SubscribeForTransactions(ch chan *transaction.Transaction) - UnsubscribeFromBlocks(ch chan *block.Block) - UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header) - UnsubscribeFromExecutions(ch chan *state.AppExecResult) - UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) - UnsubscribeFromTransactions(ch chan *transaction.Transaction) + SubscribeForBlocks(ch chan<- *block.Block) + SubscribeForHeadersOfAddedBlocks(ch chan<- *block.Header) + SubscribeForExecutions(ch chan<- *state.AppExecResult) + SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) + SubscribeForTransactions(ch chan<- *transaction.Transaction) + UnsubscribeFromBlocks(ch chan<- *block.Block) + UnsubscribeFromHeadersOfAddedBlocks(ch chan<- *block.Header) + UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) + UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) + UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) VerifyTx(*transaction.Transaction) error VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) mempool.Feer // fee interface diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index e7123e88c..d1769f75b 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -26,8 +26,8 @@ type ( GetConfig() config.Blockchain GetDesignatedByRole(role noderoles.Role) (keys.PublicKeys, uint32, error) HeaderHeight() uint32 - SubscribeForBlocks(ch chan *block.Block) - UnsubscribeFromBlocks(ch chan *block.Block) + SubscribeForBlocks(ch chan<- *block.Block) + UnsubscribeFromBlocks(ch chan<- *block.Block) } // Service represents a state root service.