Merge pull request #2652 from nspcc-dev/shutdown-fixes

Shutdown sequence fixes
This commit is contained in:
Roman Khimov 2022-08-22 10:22:54 +03:00 committed by GitHub
commit 606597b9a1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 92 additions and 55 deletions

View file

@ -26,7 +26,7 @@ import (
type FakeChain struct { type FakeChain struct {
config.ProtocolConfiguration config.ProtocolConfiguration
*mempool.Pool *mempool.Pool
blocksCh []chan<- *block.Block blocksCh []chan *block.Block
Blockheight uint32 Blockheight uint32
PoolTxF func(*transaction.Transaction) error PoolTxF func(*transaction.Transaction) error
poolTxWithData func(*transaction.Transaction, interface{}, *mempool.Pool) error poolTxWithData func(*transaction.Transaction, interface{}, *mempool.Pool) error
@ -351,22 +351,22 @@ func (chain *FakeChain) PoolTx(tx *transaction.Transaction, _ ...*mempool.Pool)
} }
// SubscribeForBlocks implements the Blockchainer interface. // SubscribeForBlocks implements the Blockchainer interface.
func (chain *FakeChain) SubscribeForBlocks(ch chan<- *block.Block) { func (chain *FakeChain) SubscribeForBlocks(ch chan *block.Block) {
chain.blocksCh = append(chain.blocksCh, ch) chain.blocksCh = append(chain.blocksCh, ch)
} }
// SubscribeForExecutions implements the Blockchainer interface. // SubscribeForExecutions implements the Blockchainer interface.
func (chain *FakeChain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { func (chain *FakeChain) SubscribeForExecutions(ch chan *state.AppExecResult) {
panic("TODO") panic("TODO")
} }
// SubscribeForNotifications implements the Blockchainer interface. // SubscribeForNotifications implements the Blockchainer interface.
func (chain *FakeChain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) { func (chain *FakeChain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) {
panic("TODO") panic("TODO")
} }
// SubscribeForTransactions implements the Blockchainer interface. // SubscribeForTransactions implements the Blockchainer interface.
func (chain *FakeChain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { func (chain *FakeChain) SubscribeForTransactions(ch chan *transaction.Transaction) {
panic("TODO") panic("TODO")
} }
@ -384,7 +384,7 @@ func (chain *FakeChain) VerifyWitness(util.Uint160, hash.Hashable, *transaction.
} }
// UnsubscribeFromBlocks implements the Blockchainer interface. // UnsubscribeFromBlocks implements the Blockchainer interface.
func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) { func (chain *FakeChain) UnsubscribeFromBlocks(ch chan *block.Block) {
for i, c := range chain.blocksCh { for i, c := range chain.blocksCh {
if c == ch { if c == ch {
if i < len(chain.blocksCh) { if i < len(chain.blocksCh) {
@ -396,17 +396,17 @@ func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
} }
// UnsubscribeFromExecutions implements the Blockchainer interface. // UnsubscribeFromExecutions implements the Blockchainer interface.
func (chain *FakeChain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { func (chain *FakeChain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) {
panic("TODO") panic("TODO")
} }
// UnsubscribeFromNotifications implements the Blockchainer interface. // UnsubscribeFromNotifications implements the Blockchainer interface.
func (chain *FakeChain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) { func (chain *FakeChain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) {
panic("TODO") panic("TODO")
} }
// UnsubscribeFromTransactions implements the Blockchainer interface. // UnsubscribeFromTransactions implements the Blockchainer interface.
func (chain *FakeChain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { func (chain *FakeChain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) {
panic("TODO") panic("TODO")
} }

View file

@ -51,8 +51,8 @@ type Ledger interface {
GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error)
GetValidators() ([]*keys.PublicKey, error) GetValidators() ([]*keys.PublicKey, error)
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
SubscribeForBlocks(ch chan<- *coreb.Block) SubscribeForBlocks(ch chan *coreb.Block)
UnsubscribeFromBlocks(ch chan<- *coreb.Block) UnsubscribeFromBlocks(ch chan *coreb.Block)
GetBaseExecFee() int64 GetBaseExecFee() int64
interop.Ledger interop.Ledger
mempool.Feer mempool.Feer

View file

@ -784,10 +784,10 @@ func (bc *Blockchain) notificationDispatcher() {
// These are just sets of subscribers, though modelled as maps // These are just sets of subscribers, though modelled as maps
// for ease of management (not a lot of subscriptions is really // for ease of management (not a lot of subscriptions is really
// expected, but maps are convenient for adding/deleting elements). // expected, but maps are convenient for adding/deleting elements).
blockFeed = make(map[chan<- *block.Block]bool) blockFeed = make(map[chan *block.Block]bool)
txFeed = make(map[chan<- *transaction.Transaction]bool) txFeed = make(map[chan *transaction.Transaction]bool)
notificationFeed = make(map[chan<- *state.ContainedNotificationEvent]bool) notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool)
executionFeed = make(map[chan<- *state.AppExecResult]bool) executionFeed = make(map[chan *state.AppExecResult]bool)
) )
for { for {
select { select {
@ -795,26 +795,26 @@ func (bc *Blockchain) notificationDispatcher() {
return return
case sub := <-bc.subCh: case sub := <-bc.subCh:
switch ch := sub.(type) { switch ch := sub.(type) {
case chan<- *block.Block: case chan *block.Block:
blockFeed[ch] = true blockFeed[ch] = true
case chan<- *transaction.Transaction: case chan *transaction.Transaction:
txFeed[ch] = true txFeed[ch] = true
case chan<- *state.ContainedNotificationEvent: case chan *state.ContainedNotificationEvent:
notificationFeed[ch] = true notificationFeed[ch] = true
case chan<- *state.AppExecResult: case chan *state.AppExecResult:
executionFeed[ch] = true executionFeed[ch] = true
default: default:
panic(fmt.Sprintf("bad subscription: %T", sub)) panic(fmt.Sprintf("bad subscription: %T", sub))
} }
case unsub := <-bc.unsubCh: case unsub := <-bc.unsubCh:
switch ch := unsub.(type) { switch ch := unsub.(type) {
case chan<- *block.Block: case chan *block.Block:
delete(blockFeed, ch) delete(blockFeed, ch)
case chan<- *transaction.Transaction: case chan *transaction.Transaction:
delete(txFeed, ch) delete(txFeed, ch)
case chan<- *state.ContainedNotificationEvent: case chan *state.ContainedNotificationEvent:
delete(notificationFeed, ch) delete(notificationFeed, ch)
case chan<- *state.AppExecResult: case chan *state.AppExecResult:
delete(executionFeed, ch) delete(executionFeed, ch)
default: default:
panic(fmt.Sprintf("bad unsubscription: %T", unsub)) panic(fmt.Sprintf("bad unsubscription: %T", unsub))
@ -1799,7 +1799,7 @@ func (bc *Blockchain) GetConfig() config.ProtocolConfiguration {
// there is a new block added to the chain you'll receive it via this channel. // there is a new block added to the chain you'll receive it via this channel.
// Make sure it's read from regularly as not reading these events might affect // Make sure it's read from regularly as not reading these events might affect
// other Blockchain functions. // other Blockchain functions.
func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) { func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) {
bc.subCh <- ch bc.subCh <- ch
} }
@ -1807,7 +1807,7 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) {
// broadcasting, so when there is a new transaction added to the chain (in a // broadcasting, so when there is a new transaction added to the chain (in a
// block) you'll receive it via this channel. Make sure it's read from regularly // block) you'll receive it via this channel. Make sure it's read from regularly
// as not reading these events might affect other Blockchain functions. // as not reading these events might affect other Blockchain functions.
func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction) {
bc.subCh <- ch bc.subCh <- ch
} }
@ -1818,7 +1818,7 @@ func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transactio
// transactions use SubscribeForExecutions instead. Make sure this channel is // transactions use SubscribeForExecutions instead. Make sure this channel is
// read from regularly as not reading these events might affect other Blockchain // read from regularly as not reading these events might affect other Blockchain
// functions. // functions.
func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) { func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) {
bc.subCh <- ch bc.subCh <- ch
} }
@ -1826,35 +1826,65 @@ func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotifi
// broadcasting, so when an in-block transaction execution happens you'll receive // broadcasting, so when an in-block transaction execution happens you'll receive
// the result of it via this channel. Make sure it's read from regularly as not // the result of it via this channel. Make sure it's read from regularly as not
// reading these events might affect other Blockchain functions. // reading these events might affect other Blockchain functions.
func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { func (bc *Blockchain) SubscribeForExecutions(ch chan *state.AppExecResult) {
bc.subCh <- ch bc.subCh <- ch
} }
// UnsubscribeFromBlocks unsubscribes given channel from new block notifications, // UnsubscribeFromBlocks unsubscribes given channel from new block notifications,
// you can close it afterwards. Passing non-subscribed channel is a no-op. // you can close it afterwards. Passing non-subscribed channel is a no-op, but
func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) { // the method can read from this channel (discarding any read data).
bc.unsubCh <- ch func (bc *Blockchain) UnsubscribeFromBlocks(ch chan *block.Block) {
unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
} }
// UnsubscribeFromTransactions unsubscribes given channel from new transaction // UnsubscribeFromTransactions unsubscribes given channel from new transaction
// notifications, you can close it afterwards. Passing non-subscribed channel is // notifications, you can close it afterwards. Passing non-subscribed channel is
// a no-op. // a no-op, but the method can read from this channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { func (bc *Blockchain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) {
bc.unsubCh <- ch unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
} }
// UnsubscribeFromNotifications unsubscribes given channel from new // UnsubscribeFromNotifications unsubscribes given channel from new
// execution-generated notifications, you can close it afterwards. Passing // execution-generated notifications, you can close it afterwards. Passing
// non-subscribed channel is a no-op. // non-subscribed channel is a no-op, but the method can read from this channel
func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) { // (discarding any read data).
bc.unsubCh <- ch func (bc *Blockchain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) {
unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
} }
// UnsubscribeFromExecutions unsubscribes given channel from new execution // UnsubscribeFromExecutions unsubscribes given channel from new execution
// notifications, you can close it afterwards. Passing non-subscribed channel is // notifications, you can close it afterwards. Passing non-subscribed channel is
// a no-op. // a no-op, but the method can read from this channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { func (bc *Blockchain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) {
bc.unsubCh <- ch unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
} }
// CalculateClaimable calculates the amount of GAS generated by owning specified // CalculateClaimable calculates the amount of GAS generated by owning specified

View file

@ -49,6 +49,7 @@ type DefaultDiscovery struct {
isDead bool isDead bool
requestCh chan int requestCh chan int
pool chan string pool chan string
runExit chan struct{}
} }
// NewDefaultDiscovery returns a new DefaultDiscovery. // NewDefaultDiscovery returns a new DefaultDiscovery.
@ -64,6 +65,7 @@ func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *Defa
attempted: make(map[string]bool), attempted: make(map[string]bool),
requestCh: make(chan int), requestCh: make(chan int),
pool: make(chan string, maxPoolSize), pool: make(chan string, maxPoolSize),
runExit: make(chan struct{}),
} }
go d.run() go d.run()
return d return d
@ -211,6 +213,7 @@ func (d *DefaultDiscovery) Close() {
default: default:
} }
close(d.requestCh) close(d.requestCh)
<-d.runExit
} }
// run is a goroutine that makes DefaultDiscovery process its queue to connect // run is a goroutine that makes DefaultDiscovery process its queue to connect
@ -259,7 +262,7 @@ func (d *DefaultDiscovery) run() {
} }
} }
if !ok { if !ok {
return break
} }
// Special case, no connections after all attempts. // Special case, no connections after all attempts.
d.lock.RLock() d.lock.RLock()
@ -270,4 +273,5 @@ func (d *DefaultDiscovery) run() {
requested = oldRequest requested = oldRequest
} }
} }
close(d.runExit)
} }

View file

@ -69,8 +69,8 @@ type (
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data interface{}) error) error PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data interface{}) error) error
RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block)) RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block))
SubscribeForBlocks(ch chan<- *block.Block) SubscribeForBlocks(ch chan *block.Block)
UnsubscribeFromBlocks(ch chan<- *block.Block) UnsubscribeFromBlocks(ch chan *block.Block)
} }
// Service is a service abstraction (oracle, state root, consensus, etc). // Service is a service abstraction (oracle, state root, consensus, etc).
@ -121,6 +121,7 @@ type (
register chan Peer register chan Peer
unregister chan peerDrop unregister chan peerDrop
quit chan struct{} quit chan struct{}
relayFin chan struct{}
transactions chan *transaction.Transaction transactions chan *transaction.Transaction
@ -170,6 +171,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
id: randomID(), id: randomID(),
config: chain.GetConfig(), config: chain.GetConfig(),
quit: make(chan struct{}), quit: make(chan struct{}),
relayFin: make(chan struct{}),
register: make(chan Peer), register: make(chan Peer),
unregister: make(chan peerDrop), unregister: make(chan peerDrop),
txInMap: make(map[util.Uint256]struct{}), txInMap: make(map[util.Uint256]struct{}),
@ -273,6 +275,7 @@ func (s *Server) Shutdown() {
s.notaryRequestPool.StopSubscriptions() s.notaryRequestPool.StopSubscriptions()
} }
close(s.quit) close(s.quit)
<-s.relayFin
} }
// AddService allows to add a service to be started/stopped by Server. // AddService allows to add a service to be started/stopped by Server.
@ -1433,6 +1436,7 @@ drainBlocksLoop:
} }
} }
close(ch) close(ch)
close(s.relayFin)
} }
// verifyAndPoolTX verifies the TX and adds it to the local mempool. // verifyAndPoolTX verifies the TX and adds it to the local mempool.

View file

@ -1014,7 +1014,6 @@ func TestVerifyNotaryRequest(t *testing.T) {
bc.NotaryContractScriptHash = util.Uint160{1, 2, 3} bc.NotaryContractScriptHash = util.Uint160{1, 2, 3}
s, err := newServerFromConstructors(ServerConfig{}, bc, new(fakechain.FakeStateSync), zaptest.NewLogger(t), newFakeTransp, newTestDiscovery) s, err := newServerFromConstructors(ServerConfig{}, bc, new(fakechain.FakeStateSync), zaptest.NewLogger(t), newFakeTransp, newTestDiscovery)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(s.Shutdown)
newNotaryRequest := func() *payload.P2PNotaryRequest { newNotaryRequest := func() *payload.P2PNotaryRequest {
return &payload.P2PNotaryRequest{ return &payload.P2PNotaryRequest{
MainTransaction: &transaction.Transaction{Script: []byte{0, 1, 2}}, MainTransaction: &transaction.Transaction{Script: []byte{0, 1, 2}},

View file

@ -32,8 +32,8 @@ type (
BlockHeight() uint32 BlockHeight() uint32
GetMaxVerificationGAS() int64 GetMaxVerificationGAS() int64
GetNotaryContractScriptHash() util.Uint160 GetNotaryContractScriptHash() util.Uint160
SubscribeForBlocks(ch chan<- *block.Block) SubscribeForBlocks(ch chan *block.Block)
UnsubscribeFromBlocks(ch chan<- *block.Block) UnsubscribeFromBlocks(ch chan *block.Block)
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
} }

View file

@ -94,14 +94,14 @@ type (
GetValidators() ([]*keys.PublicKey, error) GetValidators() ([]*keys.PublicKey, error)
HeaderHeight() uint32 HeaderHeight() uint32
InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error
SubscribeForBlocks(ch chan<- *block.Block) SubscribeForBlocks(ch chan *block.Block)
SubscribeForExecutions(ch chan<- *state.AppExecResult) SubscribeForExecutions(ch chan *state.AppExecResult)
SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent)
SubscribeForTransactions(ch chan<- *transaction.Transaction) SubscribeForTransactions(ch chan *transaction.Transaction)
UnsubscribeFromBlocks(ch chan<- *block.Block) UnsubscribeFromBlocks(ch chan *block.Block)
UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) UnsubscribeFromExecutions(ch chan *state.AppExecResult)
UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent)
UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) UnsubscribeFromTransactions(ch chan *transaction.Transaction)
VerifyTx(*transaction.Transaction) error VerifyTx(*transaction.Transaction) error
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
mempool.Feer // fee interface mempool.Feer // fee interface

View file

@ -23,8 +23,8 @@ type (
Ledger interface { Ledger interface {
GetConfig() config.ProtocolConfiguration GetConfig() config.ProtocolConfiguration
HeaderHeight() uint32 HeaderHeight() uint32
SubscribeForBlocks(ch chan<- *block.Block) SubscribeForBlocks(ch chan *block.Block)
UnsubscribeFromBlocks(ch chan<- *block.Block) UnsubscribeFromBlocks(ch chan *block.Block)
} }
// Service represents a state root service. // Service represents a state root service.