blockchain: make SubscribeFor* channel arguments send-only

There is no need to accept rw channel. Strengthening the type to
send-only will allow the caller to ensure control of reading from the
provided channel.

Closes #2885

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-01-10 12:51:02 +03:00
parent d901697615
commit e337a2f8ab
7 changed files with 51 additions and 86 deletions

View file

@ -24,7 +24,7 @@ import (
type FakeChain struct { type FakeChain struct {
config.Blockchain config.Blockchain
*mempool.Pool *mempool.Pool
blocksCh []chan *block.Block blocksCh []chan<- *block.Block
Blockheight atomic.Uint32 Blockheight atomic.Uint32
PoolTxF func(*transaction.Transaction) error PoolTxF func(*transaction.Transaction) error
poolTxWithData func(*transaction.Transaction, any, *mempool.Pool) error poolTxWithData func(*transaction.Transaction, any, *mempool.Pool) error
@ -346,7 +346,7 @@ func (chain *FakeChain) PoolTx(tx *transaction.Transaction, _ ...*mempool.Pool)
} }
// SubscribeForBlocks implements the Blockchainer interface. // SubscribeForBlocks implements the Blockchainer interface.
func (chain *FakeChain) SubscribeForBlocks(ch chan *block.Block) { func (chain *FakeChain) SubscribeForBlocks(ch chan<- *block.Block) {
chain.blocksCh = append(chain.blocksCh, ch) chain.blocksCh = append(chain.blocksCh, ch)
} }
@ -379,7 +379,7 @@ func (chain *FakeChain) VerifyWitness(util.Uint160, hash.Hashable, *transaction.
} }
// UnsubscribeFromBlocks implements the Blockchainer interface. // UnsubscribeFromBlocks implements the Blockchainer interface.
func (chain *FakeChain) UnsubscribeFromBlocks(ch chan *block.Block) { func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
for i, c := range chain.blocksCh { for i, c := range chain.blocksCh {
if c == ch { if c == ch {
if i < len(chain.blocksCh) { if i < len(chain.blocksCh) {

View file

@ -50,8 +50,8 @@ type Ledger interface {
GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error)
ComputeNextBlockValidators() []*keys.PublicKey ComputeNextBlockValidators() []*keys.PublicKey
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
SubscribeForBlocks(ch chan *coreb.Block) SubscribeForBlocks(ch chan<- *coreb.Block)
UnsubscribeFromBlocks(ch chan *coreb.Block) UnsubscribeFromBlocks(ch chan<- *coreb.Block)
GetBaseExecFee() int64 GetBaseExecFee() int64
CalculateAttributesFee(tx *transaction.Transaction) int64 CalculateAttributesFee(tx *transaction.Transaction) int64
interop.Ledger interop.Ledger

View file

@ -1327,11 +1327,11 @@ func (bc *Blockchain) notificationDispatcher() {
// These are just sets of subscribers, though modelled as maps // These are just sets of subscribers, though modelled as maps
// for ease of management (not a lot of subscriptions is really // for ease of management (not a lot of subscriptions is really
// expected, but maps are convenient for adding/deleting elements). // expected, but maps are convenient for adding/deleting elements).
blockFeed = make(map[chan *block.Block]bool) blockFeed = make(map[chan<- *block.Block]bool)
headerFeed = make(map[chan *block.Header]bool) headerFeed = make(map[chan<- *block.Header]bool)
txFeed = make(map[chan *transaction.Transaction]bool) txFeed = make(map[chan<- *transaction.Transaction]bool)
notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool) notificationFeed = make(map[chan<- *state.ContainedNotificationEvent]bool)
executionFeed = make(map[chan *state.AppExecResult]bool) executionFeed = make(map[chan<- *state.AppExecResult]bool)
) )
for { for {
select { select {
@ -1339,30 +1339,30 @@ func (bc *Blockchain) notificationDispatcher() {
return return
case sub := <-bc.subCh: case sub := <-bc.subCh:
switch ch := sub.(type) { switch ch := sub.(type) {
case chan *block.Header: case chan<- *block.Header:
headerFeed[ch] = true headerFeed[ch] = true
case chan *block.Block: case chan<- *block.Block:
blockFeed[ch] = true blockFeed[ch] = true
case chan *transaction.Transaction: case chan<- *transaction.Transaction:
txFeed[ch] = true txFeed[ch] = true
case chan *state.ContainedNotificationEvent: case chan<- *state.ContainedNotificationEvent:
notificationFeed[ch] = true notificationFeed[ch] = true
case chan *state.AppExecResult: case chan<- *state.AppExecResult:
executionFeed[ch] = true executionFeed[ch] = true
default: default:
panic(fmt.Sprintf("bad subscription: %T", sub)) panic(fmt.Sprintf("bad subscription: %T", sub))
} }
case unsub := <-bc.unsubCh: case unsub := <-bc.unsubCh:
switch ch := unsub.(type) { switch ch := unsub.(type) {
case chan *block.Header: case chan<- *block.Header:
delete(headerFeed, ch) delete(headerFeed, ch)
case chan *block.Block: case chan<- *block.Block:
delete(blockFeed, ch) delete(blockFeed, ch)
case chan *transaction.Transaction: case chan<- *transaction.Transaction:
delete(txFeed, ch) delete(txFeed, ch)
case chan *state.ContainedNotificationEvent: case chan<- *state.ContainedNotificationEvent:
delete(notificationFeed, ch) delete(notificationFeed, ch)
case chan *state.AppExecResult: case chan<- *state.AppExecResult:
delete(executionFeed, ch) delete(executionFeed, ch)
default: default:
panic(fmt.Sprintf("bad unsubscription: %T", unsub)) panic(fmt.Sprintf("bad unsubscription: %T", unsub))
@ -2285,7 +2285,7 @@ func (bc *Blockchain) GetConfig() config.Blockchain {
// Make sure it's read from regularly as not reading these events might affect // Make sure it's read from regularly as not reading these events might affect
// other Blockchain functions. Make sure you're not changing the received blocks, // other Blockchain functions. Make sure you're not changing the received blocks,
// as it may affect the functionality of Blockchain and other subscribers. // as it may affect the functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) { func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) {
bc.subCh <- ch bc.subCh <- ch
} }
@ -2295,7 +2295,7 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) {
// affect other Blockchain functions. Make sure you're not changing the received // affect other Blockchain functions. Make sure you're not changing the received
// headers, as it may affect the functionality of Blockchain and other // headers, as it may affect the functionality of Blockchain and other
// subscribers. // subscribers.
func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) { func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan<- *block.Header) {
bc.subCh <- ch bc.subCh <- ch
} }
@ -2305,7 +2305,7 @@ func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) {
// as not reading these events might affect other Blockchain functions. Make sure // as not reading these events might affect other Blockchain functions. Make sure
// you're not changing the received transactions, as it may affect the // you're not changing the received transactions, as it may affect the
// functionality of Blockchain and other subscribers. // functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction) { func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) {
bc.subCh <- ch bc.subCh <- ch
} }
@ -2317,7 +2317,7 @@ func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction)
// read from regularly as not reading these events might affect other Blockchain // read from regularly as not reading these events might affect other Blockchain
// functions. Make sure you're not changing the received notification events, as // functions. Make sure you're not changing the received notification events, as
// it may affect the functionality of Blockchain and other subscribers. // it may affect the functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) { func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) {
bc.subCh <- ch bc.subCh <- ch
} }
@ -2327,80 +2327,45 @@ func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotifica
// reading these events might affect other Blockchain functions. Make sure you're // reading these events might affect other Blockchain functions. Make sure you're
// not changing the received execution results, as it may affect the // not changing the received execution results, as it may affect the
// functionality of Blockchain and other subscribers. // functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForExecutions(ch chan *state.AppExecResult) { func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) {
bc.subCh <- ch bc.subCh <- ch
} }
// UnsubscribeFromBlocks unsubscribes given channel from new block notifications, // UnsubscribeFromBlocks unsubscribes given channel from new block notifications,
// you can close it afterwards. Passing non-subscribed channel is a no-op, but // you can close it afterwards. Passing non-subscribed channel is a no-op, but
// the method can read from this channel (discarding any read data). // the method can read from this channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromBlocks(ch chan *block.Block) { func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
unsubloop: bc.unsubCh <- ch
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
} }
// UnsubscribeFromHeadersOfAddedBlocks unsubscribes given channel from new // UnsubscribeFromHeadersOfAddedBlocks unsubscribes given channel from new
// block's header notifications, you can close it afterwards. Passing // block's header notifications, you can close it afterwards. Passing
// non-subscribed channel is a no-op, but the method can read from this // non-subscribed channel is a no-op, but the method can read from this
// channel (discarding any read data). // channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header) { func (bc *Blockchain) UnsubscribeFromHeadersOfAddedBlocks(ch chan<- *block.Header) {
unsubloop: bc.unsubCh <- ch
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
} }
// UnsubscribeFromTransactions unsubscribes given channel from new transaction // UnsubscribeFromTransactions unsubscribes given channel from new transaction
// notifications, you can close it afterwards. Passing non-subscribed channel is // notifications, you can close it afterwards. Passing non-subscribed channel is
// a no-op, but the method can read from this channel (discarding any read data). // a no-op, but the method can read from this channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) { func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) {
unsubloop: bc.unsubCh <- ch
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
} }
// UnsubscribeFromNotifications unsubscribes given channel from new // UnsubscribeFromNotifications unsubscribes given channel from new
// execution-generated notifications, you can close it afterwards. Passing // execution-generated notifications, you can close it afterwards. Passing
// non-subscribed channel is a no-op, but the method can read from this channel // non-subscribed channel is a no-op, but the method can read from this channel
// (discarding any read data). // (discarding any read data).
func (bc *Blockchain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) { func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) {
unsubloop: bc.unsubCh <- ch
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
} }
// UnsubscribeFromExecutions unsubscribes given channel from new execution // UnsubscribeFromExecutions unsubscribes given channel from new execution
// notifications, you can close it afterwards. Passing non-subscribed channel is // notifications, you can close it afterwards. Passing non-subscribed channel is
// a no-op, but the method can read from this channel (discarding any read data). // a no-op, but the method can read from this channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) { func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) {
unsubloop: bc.unsubCh <- ch
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
} }
// CalculateClaimable calculates the amount of GAS generated by owning specified // CalculateClaimable calculates the amount of GAS generated by owning specified

View file

@ -74,8 +74,8 @@ type (
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
PoolTxWithData(t *transaction.Transaction, data any, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data any) error) error PoolTxWithData(t *transaction.Transaction, data any, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data any) error) error
RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block)) RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block))
SubscribeForBlocks(ch chan *block.Block) SubscribeForBlocks(ch chan<- *block.Block)
UnsubscribeFromBlocks(ch chan *block.Block) UnsubscribeFromBlocks(ch chan<- *block.Block)
} }
// Service is a service abstraction (oracle, state root, consensus, etc). // Service is a service abstraction (oracle, state root, consensus, etc).

View file

@ -33,8 +33,8 @@ type (
BlockHeight() uint32 BlockHeight() uint32
GetMaxVerificationGAS() int64 GetMaxVerificationGAS() int64
GetNotaryContractScriptHash() util.Uint160 GetNotaryContractScriptHash() util.Uint160
SubscribeForBlocks(ch chan *block.Block) SubscribeForBlocks(ch chan<- *block.Block)
UnsubscribeFromBlocks(ch chan *block.Block) UnsubscribeFromBlocks(ch chan<- *block.Block)
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
} }

View file

@ -99,16 +99,16 @@ type (
HeaderHeight() uint32 HeaderHeight() uint32
InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error
P2PSigExtensionsEnabled() bool P2PSigExtensionsEnabled() bool
SubscribeForBlocks(ch chan *block.Block) SubscribeForBlocks(ch chan<- *block.Block)
SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) SubscribeForHeadersOfAddedBlocks(ch chan<- *block.Header)
SubscribeForExecutions(ch chan *state.AppExecResult) SubscribeForExecutions(ch chan<- *state.AppExecResult)
SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent)
SubscribeForTransactions(ch chan *transaction.Transaction) SubscribeForTransactions(ch chan<- *transaction.Transaction)
UnsubscribeFromBlocks(ch chan *block.Block) UnsubscribeFromBlocks(ch chan<- *block.Block)
UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header) UnsubscribeFromHeadersOfAddedBlocks(ch chan<- *block.Header)
UnsubscribeFromExecutions(ch chan *state.AppExecResult) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult)
UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent)
UnsubscribeFromTransactions(ch chan *transaction.Transaction) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction)
VerifyTx(*transaction.Transaction) error VerifyTx(*transaction.Transaction) error
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
mempool.Feer // fee interface mempool.Feer // fee interface

View file

@ -26,8 +26,8 @@ type (
GetConfig() config.Blockchain GetConfig() config.Blockchain
GetDesignatedByRole(role noderoles.Role) (keys.PublicKeys, uint32, error) GetDesignatedByRole(role noderoles.Role) (keys.PublicKeys, uint32, error)
HeaderHeight() uint32 HeaderHeight() uint32
SubscribeForBlocks(ch chan *block.Block) SubscribeForBlocks(ch chan<- *block.Block)
UnsubscribeFromBlocks(ch chan *block.Block) UnsubscribeFromBlocks(ch chan<- *block.Block)
} }
// Service represents a state root service. // Service represents a state root service.