core: accept two-side channels for sub/unsub, read on unsub
Blockchain's notificationDispatcher sends events to channels and these channels must be read from. Unfortunately, regular service shutdown procedure does unsubscription first (outside of the read loop) and only then drains the channel. While it waits for unsubscription request to be accepted notificationDispatcher can try pushing more data into the same channel which will lead to a deadlock. Reading in the same method solves this, any number of events can be pushed until unsub channel accepts the data.
This commit is contained in:
parent
dea75a4211
commit
eeeb0f6f0e
7 changed files with 83 additions and 53 deletions
|
@ -26,7 +26,7 @@ import (
|
||||||
type FakeChain struct {
|
type FakeChain struct {
|
||||||
config.ProtocolConfiguration
|
config.ProtocolConfiguration
|
||||||
*mempool.Pool
|
*mempool.Pool
|
||||||
blocksCh []chan<- *block.Block
|
blocksCh []chan *block.Block
|
||||||
Blockheight uint32
|
Blockheight uint32
|
||||||
PoolTxF func(*transaction.Transaction) error
|
PoolTxF func(*transaction.Transaction) error
|
||||||
poolTxWithData func(*transaction.Transaction, interface{}, *mempool.Pool) error
|
poolTxWithData func(*transaction.Transaction, interface{}, *mempool.Pool) error
|
||||||
|
@ -351,22 +351,22 @@ func (chain *FakeChain) PoolTx(tx *transaction.Transaction, _ ...*mempool.Pool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeForBlocks implements the Blockchainer interface.
|
// SubscribeForBlocks implements the Blockchainer interface.
|
||||||
func (chain *FakeChain) SubscribeForBlocks(ch chan<- *block.Block) {
|
func (chain *FakeChain) SubscribeForBlocks(ch chan *block.Block) {
|
||||||
chain.blocksCh = append(chain.blocksCh, ch)
|
chain.blocksCh = append(chain.blocksCh, ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeForExecutions implements the Blockchainer interface.
|
// SubscribeForExecutions implements the Blockchainer interface.
|
||||||
func (chain *FakeChain) SubscribeForExecutions(ch chan<- *state.AppExecResult) {
|
func (chain *FakeChain) SubscribeForExecutions(ch chan *state.AppExecResult) {
|
||||||
panic("TODO")
|
panic("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeForNotifications implements the Blockchainer interface.
|
// SubscribeForNotifications implements the Blockchainer interface.
|
||||||
func (chain *FakeChain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) {
|
func (chain *FakeChain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) {
|
||||||
panic("TODO")
|
panic("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeForTransactions implements the Blockchainer interface.
|
// SubscribeForTransactions implements the Blockchainer interface.
|
||||||
func (chain *FakeChain) SubscribeForTransactions(ch chan<- *transaction.Transaction) {
|
func (chain *FakeChain) SubscribeForTransactions(ch chan *transaction.Transaction) {
|
||||||
panic("TODO")
|
panic("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -384,7 +384,7 @@ func (chain *FakeChain) VerifyWitness(util.Uint160, hash.Hashable, *transaction.
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnsubscribeFromBlocks implements the Blockchainer interface.
|
// UnsubscribeFromBlocks implements the Blockchainer interface.
|
||||||
func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
|
func (chain *FakeChain) UnsubscribeFromBlocks(ch chan *block.Block) {
|
||||||
for i, c := range chain.blocksCh {
|
for i, c := range chain.blocksCh {
|
||||||
if c == ch {
|
if c == ch {
|
||||||
if i < len(chain.blocksCh) {
|
if i < len(chain.blocksCh) {
|
||||||
|
@ -396,17 +396,17 @@ func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnsubscribeFromExecutions implements the Blockchainer interface.
|
// UnsubscribeFromExecutions implements the Blockchainer interface.
|
||||||
func (chain *FakeChain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) {
|
func (chain *FakeChain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) {
|
||||||
panic("TODO")
|
panic("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnsubscribeFromNotifications implements the Blockchainer interface.
|
// UnsubscribeFromNotifications implements the Blockchainer interface.
|
||||||
func (chain *FakeChain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) {
|
func (chain *FakeChain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) {
|
||||||
panic("TODO")
|
panic("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnsubscribeFromTransactions implements the Blockchainer interface.
|
// UnsubscribeFromTransactions implements the Blockchainer interface.
|
||||||
func (chain *FakeChain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) {
|
func (chain *FakeChain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) {
|
||||||
panic("TODO")
|
panic("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,8 +51,8 @@ type Ledger interface {
|
||||||
GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error)
|
GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error)
|
||||||
GetValidators() ([]*keys.PublicKey, error)
|
GetValidators() ([]*keys.PublicKey, error)
|
||||||
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
|
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
|
||||||
SubscribeForBlocks(ch chan<- *coreb.Block)
|
SubscribeForBlocks(ch chan *coreb.Block)
|
||||||
UnsubscribeFromBlocks(ch chan<- *coreb.Block)
|
UnsubscribeFromBlocks(ch chan *coreb.Block)
|
||||||
GetBaseExecFee() int64
|
GetBaseExecFee() int64
|
||||||
interop.Ledger
|
interop.Ledger
|
||||||
mempool.Feer
|
mempool.Feer
|
||||||
|
|
|
@ -784,10 +784,10 @@ func (bc *Blockchain) notificationDispatcher() {
|
||||||
// These are just sets of subscribers, though modelled as maps
|
// These are just sets of subscribers, though modelled as maps
|
||||||
// for ease of management (not a lot of subscriptions is really
|
// for ease of management (not a lot of subscriptions is really
|
||||||
// expected, but maps are convenient for adding/deleting elements).
|
// expected, but maps are convenient for adding/deleting elements).
|
||||||
blockFeed = make(map[chan<- *block.Block]bool)
|
blockFeed = make(map[chan *block.Block]bool)
|
||||||
txFeed = make(map[chan<- *transaction.Transaction]bool)
|
txFeed = make(map[chan *transaction.Transaction]bool)
|
||||||
notificationFeed = make(map[chan<- *state.ContainedNotificationEvent]bool)
|
notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool)
|
||||||
executionFeed = make(map[chan<- *state.AppExecResult]bool)
|
executionFeed = make(map[chan *state.AppExecResult]bool)
|
||||||
)
|
)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -795,26 +795,26 @@ func (bc *Blockchain) notificationDispatcher() {
|
||||||
return
|
return
|
||||||
case sub := <-bc.subCh:
|
case sub := <-bc.subCh:
|
||||||
switch ch := sub.(type) {
|
switch ch := sub.(type) {
|
||||||
case chan<- *block.Block:
|
case chan *block.Block:
|
||||||
blockFeed[ch] = true
|
blockFeed[ch] = true
|
||||||
case chan<- *transaction.Transaction:
|
case chan *transaction.Transaction:
|
||||||
txFeed[ch] = true
|
txFeed[ch] = true
|
||||||
case chan<- *state.ContainedNotificationEvent:
|
case chan *state.ContainedNotificationEvent:
|
||||||
notificationFeed[ch] = true
|
notificationFeed[ch] = true
|
||||||
case chan<- *state.AppExecResult:
|
case chan *state.AppExecResult:
|
||||||
executionFeed[ch] = true
|
executionFeed[ch] = true
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("bad subscription: %T", sub))
|
panic(fmt.Sprintf("bad subscription: %T", sub))
|
||||||
}
|
}
|
||||||
case unsub := <-bc.unsubCh:
|
case unsub := <-bc.unsubCh:
|
||||||
switch ch := unsub.(type) {
|
switch ch := unsub.(type) {
|
||||||
case chan<- *block.Block:
|
case chan *block.Block:
|
||||||
delete(blockFeed, ch)
|
delete(blockFeed, ch)
|
||||||
case chan<- *transaction.Transaction:
|
case chan *transaction.Transaction:
|
||||||
delete(txFeed, ch)
|
delete(txFeed, ch)
|
||||||
case chan<- *state.ContainedNotificationEvent:
|
case chan *state.ContainedNotificationEvent:
|
||||||
delete(notificationFeed, ch)
|
delete(notificationFeed, ch)
|
||||||
case chan<- *state.AppExecResult:
|
case chan *state.AppExecResult:
|
||||||
delete(executionFeed, ch)
|
delete(executionFeed, ch)
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("bad unsubscription: %T", unsub))
|
panic(fmt.Sprintf("bad unsubscription: %T", unsub))
|
||||||
|
@ -1799,7 +1799,7 @@ func (bc *Blockchain) GetConfig() config.ProtocolConfiguration {
|
||||||
// there is a new block added to the chain you'll receive it via this channel.
|
// there is a new block added to the chain you'll receive it via this channel.
|
||||||
// Make sure it's read from regularly as not reading these events might affect
|
// Make sure it's read from regularly as not reading these events might affect
|
||||||
// other Blockchain functions.
|
// other Blockchain functions.
|
||||||
func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) {
|
func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) {
|
||||||
bc.subCh <- ch
|
bc.subCh <- ch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1807,7 +1807,7 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) {
|
||||||
// broadcasting, so when there is a new transaction added to the chain (in a
|
// broadcasting, so when there is a new transaction added to the chain (in a
|
||||||
// block) you'll receive it via this channel. Make sure it's read from regularly
|
// block) you'll receive it via this channel. Make sure it's read from regularly
|
||||||
// as not reading these events might affect other Blockchain functions.
|
// as not reading these events might affect other Blockchain functions.
|
||||||
func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) {
|
func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction) {
|
||||||
bc.subCh <- ch
|
bc.subCh <- ch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1818,7 +1818,7 @@ func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transactio
|
||||||
// transactions use SubscribeForExecutions instead. Make sure this channel is
|
// transactions use SubscribeForExecutions instead. Make sure this channel is
|
||||||
// read from regularly as not reading these events might affect other Blockchain
|
// read from regularly as not reading these events might affect other Blockchain
|
||||||
// functions.
|
// functions.
|
||||||
func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) {
|
func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) {
|
||||||
bc.subCh <- ch
|
bc.subCh <- ch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1826,35 +1826,65 @@ func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotifi
|
||||||
// broadcasting, so when an in-block transaction execution happens you'll receive
|
// broadcasting, so when an in-block transaction execution happens you'll receive
|
||||||
// the result of it via this channel. Make sure it's read from regularly as not
|
// the result of it via this channel. Make sure it's read from regularly as not
|
||||||
// reading these events might affect other Blockchain functions.
|
// reading these events might affect other Blockchain functions.
|
||||||
func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) {
|
func (bc *Blockchain) SubscribeForExecutions(ch chan *state.AppExecResult) {
|
||||||
bc.subCh <- ch
|
bc.subCh <- ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnsubscribeFromBlocks unsubscribes given channel from new block notifications,
|
// UnsubscribeFromBlocks unsubscribes given channel from new block notifications,
|
||||||
// you can close it afterwards. Passing non-subscribed channel is a no-op.
|
// you can close it afterwards. Passing non-subscribed channel is a no-op, but
|
||||||
func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
|
// the method can read from this channel (discarding any read data).
|
||||||
bc.unsubCh <- ch
|
func (bc *Blockchain) UnsubscribeFromBlocks(ch chan *block.Block) {
|
||||||
|
unsubloop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case bc.unsubCh <- ch:
|
||||||
|
break unsubloop
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnsubscribeFromTransactions unsubscribes given channel from new transaction
|
// UnsubscribeFromTransactions unsubscribes given channel from new transaction
|
||||||
// notifications, you can close it afterwards. Passing non-subscribed channel is
|
// notifications, you can close it afterwards. Passing non-subscribed channel is
|
||||||
// a no-op.
|
// a no-op, but the method can read from this channel (discarding any read data).
|
||||||
func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) {
|
func (bc *Blockchain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) {
|
||||||
bc.unsubCh <- ch
|
unsubloop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case bc.unsubCh <- ch:
|
||||||
|
break unsubloop
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnsubscribeFromNotifications unsubscribes given channel from new
|
// UnsubscribeFromNotifications unsubscribes given channel from new
|
||||||
// execution-generated notifications, you can close it afterwards. Passing
|
// execution-generated notifications, you can close it afterwards. Passing
|
||||||
// non-subscribed channel is a no-op.
|
// non-subscribed channel is a no-op, but the method can read from this channel
|
||||||
func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) {
|
// (discarding any read data).
|
||||||
bc.unsubCh <- ch
|
func (bc *Blockchain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) {
|
||||||
|
unsubloop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case bc.unsubCh <- ch:
|
||||||
|
break unsubloop
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnsubscribeFromExecutions unsubscribes given channel from new execution
|
// UnsubscribeFromExecutions unsubscribes given channel from new execution
|
||||||
// notifications, you can close it afterwards. Passing non-subscribed channel is
|
// notifications, you can close it afterwards. Passing non-subscribed channel is
|
||||||
// a no-op.
|
// a no-op, but the method can read from this channel (discarding any read data).
|
||||||
func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) {
|
func (bc *Blockchain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) {
|
||||||
bc.unsubCh <- ch
|
unsubloop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case bc.unsubCh <- ch:
|
||||||
|
break unsubloop
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CalculateClaimable calculates the amount of GAS generated by owning specified
|
// CalculateClaimable calculates the amount of GAS generated by owning specified
|
||||||
|
|
|
@ -69,8 +69,8 @@ type (
|
||||||
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
|
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
|
||||||
PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data interface{}) error) error
|
PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data interface{}) error) error
|
||||||
RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block))
|
RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block))
|
||||||
SubscribeForBlocks(ch chan<- *block.Block)
|
SubscribeForBlocks(ch chan *block.Block)
|
||||||
UnsubscribeFromBlocks(ch chan<- *block.Block)
|
UnsubscribeFromBlocks(ch chan *block.Block)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service is a service abstraction (oracle, state root, consensus, etc).
|
// Service is a service abstraction (oracle, state root, consensus, etc).
|
||||||
|
|
|
@ -32,8 +32,8 @@ type (
|
||||||
BlockHeight() uint32
|
BlockHeight() uint32
|
||||||
GetMaxVerificationGAS() int64
|
GetMaxVerificationGAS() int64
|
||||||
GetNotaryContractScriptHash() util.Uint160
|
GetNotaryContractScriptHash() util.Uint160
|
||||||
SubscribeForBlocks(ch chan<- *block.Block)
|
SubscribeForBlocks(ch chan *block.Block)
|
||||||
UnsubscribeFromBlocks(ch chan<- *block.Block)
|
UnsubscribeFromBlocks(ch chan *block.Block)
|
||||||
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
|
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -94,14 +94,14 @@ type (
|
||||||
GetValidators() ([]*keys.PublicKey, error)
|
GetValidators() ([]*keys.PublicKey, error)
|
||||||
HeaderHeight() uint32
|
HeaderHeight() uint32
|
||||||
InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error
|
InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error
|
||||||
SubscribeForBlocks(ch chan<- *block.Block)
|
SubscribeForBlocks(ch chan *block.Block)
|
||||||
SubscribeForExecutions(ch chan<- *state.AppExecResult)
|
SubscribeForExecutions(ch chan *state.AppExecResult)
|
||||||
SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent)
|
SubscribeForNotifications(ch chan *state.ContainedNotificationEvent)
|
||||||
SubscribeForTransactions(ch chan<- *transaction.Transaction)
|
SubscribeForTransactions(ch chan *transaction.Transaction)
|
||||||
UnsubscribeFromBlocks(ch chan<- *block.Block)
|
UnsubscribeFromBlocks(ch chan *block.Block)
|
||||||
UnsubscribeFromExecutions(ch chan<- *state.AppExecResult)
|
UnsubscribeFromExecutions(ch chan *state.AppExecResult)
|
||||||
UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent)
|
UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent)
|
||||||
UnsubscribeFromTransactions(ch chan<- *transaction.Transaction)
|
UnsubscribeFromTransactions(ch chan *transaction.Transaction)
|
||||||
VerifyTx(*transaction.Transaction) error
|
VerifyTx(*transaction.Transaction) error
|
||||||
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
|
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
|
||||||
mempool.Feer // fee interface
|
mempool.Feer // fee interface
|
||||||
|
|
|
@ -23,8 +23,8 @@ type (
|
||||||
Ledger interface {
|
Ledger interface {
|
||||||
GetConfig() config.ProtocolConfiguration
|
GetConfig() config.ProtocolConfiguration
|
||||||
HeaderHeight() uint32
|
HeaderHeight() uint32
|
||||||
SubscribeForBlocks(ch chan<- *block.Block)
|
SubscribeForBlocks(ch chan *block.Block)
|
||||||
UnsubscribeFromBlocks(ch chan<- *block.Block)
|
UnsubscribeFromBlocks(ch chan *block.Block)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service represents a state root service.
|
// Service represents a state root service.
|
||||||
|
|
Loading…
Reference in a new issue