From 155089f4e56dc85548dc3b87dd164d192195809f Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 19 Aug 2022 20:04:15 +0300 Subject: [PATCH 1/4] network: drop cleanup from TestVerifyNotaryRequest It never runs the server, so 746644a4ebd868e10fce18d9bf3eb6a4cebdedcf was a bit wrong with this. --- pkg/network/server_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 586ca48cd..750878e9f 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -1014,7 +1014,6 @@ func TestVerifyNotaryRequest(t *testing.T) { bc.NotaryContractScriptHash = util.Uint160{1, 2, 3} s, err := newServerFromConstructors(ServerConfig{}, bc, new(fakechain.FakeStateSync), zaptest.NewLogger(t), newFakeTransp, newTestDiscovery) require.NoError(t, err) - t.Cleanup(s.Shutdown) newNotaryRequest := func() *payload.P2PNotaryRequest { return &payload.P2PNotaryRequest{ MainTransaction: &transaction.Transaction{Script: []byte{0, 1, 2}}, From dea75a4211b0f41f54b97dd50d254ab2d1ef3be1 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 19 Aug 2022 20:43:15 +0300 Subject: [PATCH 2/4] network: wait for the relayer thread to finish on shutdown Unsubscribe and drain first, then return from the Shutdown method. It's important wrt to subsequent chain shutdown process (normally it's closed right after the network server). --- pkg/network/server.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/network/server.go b/pkg/network/server.go index 1779f7b70..00cfe4764 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -121,6 +121,7 @@ type ( register chan Peer unregister chan peerDrop quit chan struct{} + relayFin chan struct{} transactions chan *transaction.Transaction @@ -170,6 +171,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy id: randomID(), config: chain.GetConfig(), quit: make(chan struct{}), + relayFin: make(chan struct{}), register: make(chan Peer), unregister: make(chan peerDrop), txInMap: make(map[util.Uint256]struct{}), @@ -273,6 +275,7 @@ func (s *Server) Shutdown() { s.notaryRequestPool.StopSubscriptions() } close(s.quit) + <-s.relayFin } // AddService allows to add a service to be started/stopped by Server. @@ -1433,6 +1436,7 @@ drainBlocksLoop: } } close(ch) + close(s.relayFin) } // verifyAndPoolTX verifies the TX and adds it to the local mempool. From eeeb0f6f0e7ecae49ff9cbdbedc8b3b4077f00cc Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 19 Aug 2022 20:47:55 +0300 Subject: [PATCH 3/4] core: accept two-side channels for sub/unsub, read on unsub Blockchain's notificationDispatcher sends events to channels and these channels must be read from. Unfortunately, regular service shutdown procedure does unsubscription first (outside of the read loop) and only then drains the channel. While it waits for unsubscription request to be accepted notificationDispatcher can try pushing more data into the same channel which will lead to a deadlock. Reading in the same method solves this, any number of events can be pushed until unsub channel accepts the data. --- internal/fakechain/fakechain.go | 18 +++---- pkg/consensus/consensus.go | 4 +- pkg/core/blockchain.go | 86 +++++++++++++++++++++---------- pkg/network/server.go | 4 +- pkg/services/notary/notary.go | 4 +- pkg/services/rpcsrv/server.go | 16 +++--- pkg/services/stateroot/service.go | 4 +- 7 files changed, 83 insertions(+), 53 deletions(-) diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index 0ac04a4c5..3ae130061 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -26,7 +26,7 @@ import ( type FakeChain struct { config.ProtocolConfiguration *mempool.Pool - blocksCh []chan<- *block.Block + blocksCh []chan *block.Block Blockheight uint32 PoolTxF func(*transaction.Transaction) error poolTxWithData func(*transaction.Transaction, interface{}, *mempool.Pool) error @@ -351,22 +351,22 @@ func (chain *FakeChain) PoolTx(tx *transaction.Transaction, _ ...*mempool.Pool) } // SubscribeForBlocks implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForBlocks(ch chan<- *block.Block) { +func (chain *FakeChain) SubscribeForBlocks(ch chan *block.Block) { chain.blocksCh = append(chain.blocksCh, ch) } // SubscribeForExecutions implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { +func (chain *FakeChain) SubscribeForExecutions(ch chan *state.AppExecResult) { panic("TODO") } // SubscribeForNotifications implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) { +func (chain *FakeChain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) { panic("TODO") } // SubscribeForTransactions implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { +func (chain *FakeChain) SubscribeForTransactions(ch chan *transaction.Transaction) { panic("TODO") } @@ -384,7 +384,7 @@ func (chain *FakeChain) VerifyWitness(util.Uint160, hash.Hashable, *transaction. } // UnsubscribeFromBlocks implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) { +func (chain *FakeChain) UnsubscribeFromBlocks(ch chan *block.Block) { for i, c := range chain.blocksCh { if c == ch { if i < len(chain.blocksCh) { @@ -396,17 +396,17 @@ func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) { } // UnsubscribeFromExecutions implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { +func (chain *FakeChain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) { panic("TODO") } // UnsubscribeFromNotifications implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) { +func (chain *FakeChain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) { panic("TODO") } // UnsubscribeFromTransactions implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { +func (chain *FakeChain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) { panic("TODO") } diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index dca2b80e6..f4cec74fb 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -51,8 +51,8 @@ type Ledger interface { GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) GetValidators() ([]*keys.PublicKey, error) PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error - SubscribeForBlocks(ch chan<- *coreb.Block) - UnsubscribeFromBlocks(ch chan<- *coreb.Block) + SubscribeForBlocks(ch chan *coreb.Block) + UnsubscribeFromBlocks(ch chan *coreb.Block) GetBaseExecFee() int64 interop.Ledger mempool.Feer diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 73bb05915..17fb7b3db 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -784,10 +784,10 @@ func (bc *Blockchain) notificationDispatcher() { // These are just sets of subscribers, though modelled as maps // for ease of management (not a lot of subscriptions is really // expected, but maps are convenient for adding/deleting elements). - blockFeed = make(map[chan<- *block.Block]bool) - txFeed = make(map[chan<- *transaction.Transaction]bool) - notificationFeed = make(map[chan<- *state.ContainedNotificationEvent]bool) - executionFeed = make(map[chan<- *state.AppExecResult]bool) + blockFeed = make(map[chan *block.Block]bool) + txFeed = make(map[chan *transaction.Transaction]bool) + notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool) + executionFeed = make(map[chan *state.AppExecResult]bool) ) for { select { @@ -795,26 +795,26 @@ func (bc *Blockchain) notificationDispatcher() { return case sub := <-bc.subCh: switch ch := sub.(type) { - case chan<- *block.Block: + case chan *block.Block: blockFeed[ch] = true - case chan<- *transaction.Transaction: + case chan *transaction.Transaction: txFeed[ch] = true - case chan<- *state.ContainedNotificationEvent: + case chan *state.ContainedNotificationEvent: notificationFeed[ch] = true - case chan<- *state.AppExecResult: + case chan *state.AppExecResult: executionFeed[ch] = true default: panic(fmt.Sprintf("bad subscription: %T", sub)) } case unsub := <-bc.unsubCh: switch ch := unsub.(type) { - case chan<- *block.Block: + case chan *block.Block: delete(blockFeed, ch) - case chan<- *transaction.Transaction: + case chan *transaction.Transaction: delete(txFeed, ch) - case chan<- *state.ContainedNotificationEvent: + case chan *state.ContainedNotificationEvent: delete(notificationFeed, ch) - case chan<- *state.AppExecResult: + case chan *state.AppExecResult: delete(executionFeed, ch) default: panic(fmt.Sprintf("bad unsubscription: %T", unsub)) @@ -1799,7 +1799,7 @@ func (bc *Blockchain) GetConfig() config.ProtocolConfiguration { // there is a new block added to the chain you'll receive it via this channel. // Make sure it's read from regularly as not reading these events might affect // other Blockchain functions. -func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) { +func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) { bc.subCh <- ch } @@ -1807,7 +1807,7 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) { // broadcasting, so when there is a new transaction added to the chain (in a // block) you'll receive it via this channel. Make sure it's read from regularly // as not reading these events might affect other Blockchain functions. -func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { +func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction) { bc.subCh <- ch } @@ -1818,7 +1818,7 @@ func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transactio // transactions use SubscribeForExecutions instead. Make sure this channel is // read from regularly as not reading these events might affect other Blockchain // functions. -func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) { +func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) { bc.subCh <- ch } @@ -1826,35 +1826,65 @@ func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotifi // broadcasting, so when an in-block transaction execution happens you'll receive // the result of it via this channel. Make sure it's read from regularly as not // reading these events might affect other Blockchain functions. -func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { +func (bc *Blockchain) SubscribeForExecutions(ch chan *state.AppExecResult) { bc.subCh <- ch } // UnsubscribeFromBlocks unsubscribes given channel from new block notifications, -// you can close it afterwards. Passing non-subscribed channel is a no-op. -func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) { - bc.unsubCh <- ch +// you can close it afterwards. Passing non-subscribed channel is a no-op, but +// the method can read from this channel (discarding any read data). +func (bc *Blockchain) UnsubscribeFromBlocks(ch chan *block.Block) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // UnsubscribeFromTransactions unsubscribes given channel from new transaction // notifications, you can close it afterwards. Passing non-subscribed channel is -// a no-op. -func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { - bc.unsubCh <- ch +// a no-op, but the method can read from this channel (discarding any read data). +func (bc *Blockchain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // UnsubscribeFromNotifications unsubscribes given channel from new // execution-generated notifications, you can close it afterwards. Passing -// non-subscribed channel is a no-op. -func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) { - bc.unsubCh <- ch +// non-subscribed channel is a no-op, but the method can read from this channel +// (discarding any read data). +func (bc *Blockchain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // UnsubscribeFromExecutions unsubscribes given channel from new execution // notifications, you can close it afterwards. Passing non-subscribed channel is -// a no-op. -func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { - bc.unsubCh <- ch +// a no-op, but the method can read from this channel (discarding any read data). +func (bc *Blockchain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // CalculateClaimable calculates the amount of GAS generated by owning specified diff --git a/pkg/network/server.go b/pkg/network/server.go index 00cfe4764..5392071e7 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -69,8 +69,8 @@ type ( PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data interface{}) error) error RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block)) - SubscribeForBlocks(ch chan<- *block.Block) - UnsubscribeFromBlocks(ch chan<- *block.Block) + SubscribeForBlocks(ch chan *block.Block) + UnsubscribeFromBlocks(ch chan *block.Block) } // Service is a service abstraction (oracle, state root, consensus, etc). diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 0fb1a6c90..fa3f3dd3f 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -32,8 +32,8 @@ type ( BlockHeight() uint32 GetMaxVerificationGAS() int64 GetNotaryContractScriptHash() util.Uint160 - SubscribeForBlocks(ch chan<- *block.Block) - UnsubscribeFromBlocks(ch chan<- *block.Block) + SubscribeForBlocks(ch chan *block.Block) + UnsubscribeFromBlocks(ch chan *block.Block) VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) } diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index e67c801a0..36d832809 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -94,14 +94,14 @@ type ( GetValidators() ([]*keys.PublicKey, error) HeaderHeight() uint32 InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error - SubscribeForBlocks(ch chan<- *block.Block) - SubscribeForExecutions(ch chan<- *state.AppExecResult) - SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) - SubscribeForTransactions(ch chan<- *transaction.Transaction) - UnsubscribeFromBlocks(ch chan<- *block.Block) - UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) - UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) - UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) + SubscribeForBlocks(ch chan *block.Block) + SubscribeForExecutions(ch chan *state.AppExecResult) + SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) + SubscribeForTransactions(ch chan *transaction.Transaction) + UnsubscribeFromBlocks(ch chan *block.Block) + UnsubscribeFromExecutions(ch chan *state.AppExecResult) + UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) + UnsubscribeFromTransactions(ch chan *transaction.Transaction) VerifyTx(*transaction.Transaction) error VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) mempool.Feer // fee interface diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 158377ae2..338ad4f3b 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -23,8 +23,8 @@ type ( Ledger interface { GetConfig() config.ProtocolConfiguration HeaderHeight() uint32 - SubscribeForBlocks(ch chan<- *block.Block) - UnsubscribeFromBlocks(ch chan<- *block.Block) + SubscribeForBlocks(ch chan *block.Block) + UnsubscribeFromBlocks(ch chan *block.Block) } // Service represents a state root service. From 779a5c070fadb51215b3ba95507770f1e06b483d Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 19 Aug 2022 22:23:47 +0300 Subject: [PATCH 4/4] network: wait for exit in discoverer And synchronize other threads with channels instead of mutexes. Overall this scheme is more reliable. --- pkg/network/discovery.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index f3fbc557b..e6580139d 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -49,6 +49,7 @@ type DefaultDiscovery struct { isDead bool requestCh chan int pool chan string + runExit chan struct{} } // NewDefaultDiscovery returns a new DefaultDiscovery. @@ -64,6 +65,7 @@ func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *Defa attempted: make(map[string]bool), requestCh: make(chan int), pool: make(chan string, maxPoolSize), + runExit: make(chan struct{}), } go d.run() return d @@ -211,6 +213,7 @@ func (d *DefaultDiscovery) Close() { default: } close(d.requestCh) + <-d.runExit } // run is a goroutine that makes DefaultDiscovery process its queue to connect @@ -259,7 +262,7 @@ func (d *DefaultDiscovery) run() { } } if !ok { - return + break } // Special case, no connections after all attempts. d.lock.RLock() @@ -270,4 +273,5 @@ func (d *DefaultDiscovery) run() { requested = oldRequest } } + close(d.runExit) }