diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index 0ac04a4c5..3ae130061 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -26,7 +26,7 @@ import ( type FakeChain struct { config.ProtocolConfiguration *mempool.Pool - blocksCh []chan<- *block.Block + blocksCh []chan *block.Block Blockheight uint32 PoolTxF func(*transaction.Transaction) error poolTxWithData func(*transaction.Transaction, interface{}, *mempool.Pool) error @@ -351,22 +351,22 @@ func (chain *FakeChain) PoolTx(tx *transaction.Transaction, _ ...*mempool.Pool) } // SubscribeForBlocks implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForBlocks(ch chan<- *block.Block) { +func (chain *FakeChain) SubscribeForBlocks(ch chan *block.Block) { chain.blocksCh = append(chain.blocksCh, ch) } // SubscribeForExecutions implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { +func (chain *FakeChain) SubscribeForExecutions(ch chan *state.AppExecResult) { panic("TODO") } // SubscribeForNotifications implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) { +func (chain *FakeChain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) { panic("TODO") } // SubscribeForTransactions implements the Blockchainer interface. -func (chain *FakeChain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { +func (chain *FakeChain) SubscribeForTransactions(ch chan *transaction.Transaction) { panic("TODO") } @@ -384,7 +384,7 @@ func (chain *FakeChain) VerifyWitness(util.Uint160, hash.Hashable, *transaction. } // UnsubscribeFromBlocks implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) { +func (chain *FakeChain) UnsubscribeFromBlocks(ch chan *block.Block) { for i, c := range chain.blocksCh { if c == ch { if i < len(chain.blocksCh) { @@ -396,17 +396,17 @@ func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) { } // UnsubscribeFromExecutions implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { +func (chain *FakeChain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) { panic("TODO") } // UnsubscribeFromNotifications implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) { +func (chain *FakeChain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) { panic("TODO") } // UnsubscribeFromTransactions implements the Blockchainer interface. -func (chain *FakeChain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { +func (chain *FakeChain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) { panic("TODO") } diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index dca2b80e6..f4cec74fb 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -51,8 +51,8 @@ type Ledger interface { GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) GetValidators() ([]*keys.PublicKey, error) PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error - SubscribeForBlocks(ch chan<- *coreb.Block) - UnsubscribeFromBlocks(ch chan<- *coreb.Block) + SubscribeForBlocks(ch chan *coreb.Block) + UnsubscribeFromBlocks(ch chan *coreb.Block) GetBaseExecFee() int64 interop.Ledger mempool.Feer diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 73bb05915..17fb7b3db 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -784,10 +784,10 @@ func (bc *Blockchain) notificationDispatcher() { // These are just sets of subscribers, though modelled as maps // for ease of management (not a lot of subscriptions is really // expected, but maps are convenient for adding/deleting elements). - blockFeed = make(map[chan<- *block.Block]bool) - txFeed = make(map[chan<- *transaction.Transaction]bool) - notificationFeed = make(map[chan<- *state.ContainedNotificationEvent]bool) - executionFeed = make(map[chan<- *state.AppExecResult]bool) + blockFeed = make(map[chan *block.Block]bool) + txFeed = make(map[chan *transaction.Transaction]bool) + notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool) + executionFeed = make(map[chan *state.AppExecResult]bool) ) for { select { @@ -795,26 +795,26 @@ func (bc *Blockchain) notificationDispatcher() { return case sub := <-bc.subCh: switch ch := sub.(type) { - case chan<- *block.Block: + case chan *block.Block: blockFeed[ch] = true - case chan<- *transaction.Transaction: + case chan *transaction.Transaction: txFeed[ch] = true - case chan<- *state.ContainedNotificationEvent: + case chan *state.ContainedNotificationEvent: notificationFeed[ch] = true - case chan<- *state.AppExecResult: + case chan *state.AppExecResult: executionFeed[ch] = true default: panic(fmt.Sprintf("bad subscription: %T", sub)) } case unsub := <-bc.unsubCh: switch ch := unsub.(type) { - case chan<- *block.Block: + case chan *block.Block: delete(blockFeed, ch) - case chan<- *transaction.Transaction: + case chan *transaction.Transaction: delete(txFeed, ch) - case chan<- *state.ContainedNotificationEvent: + case chan *state.ContainedNotificationEvent: delete(notificationFeed, ch) - case chan<- *state.AppExecResult: + case chan *state.AppExecResult: delete(executionFeed, ch) default: panic(fmt.Sprintf("bad unsubscription: %T", unsub)) @@ -1799,7 +1799,7 @@ func (bc *Blockchain) GetConfig() config.ProtocolConfiguration { // there is a new block added to the chain you'll receive it via this channel. // Make sure it's read from regularly as not reading these events might affect // other Blockchain functions. -func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) { +func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) { bc.subCh <- ch } @@ -1807,7 +1807,7 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) { // broadcasting, so when there is a new transaction added to the chain (in a // block) you'll receive it via this channel. Make sure it's read from regularly // as not reading these events might affect other Blockchain functions. -func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { +func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction) { bc.subCh <- ch } @@ -1818,7 +1818,7 @@ func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transactio // transactions use SubscribeForExecutions instead. Make sure this channel is // read from regularly as not reading these events might affect other Blockchain // functions. -func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) { +func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) { bc.subCh <- ch } @@ -1826,35 +1826,65 @@ func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotifi // broadcasting, so when an in-block transaction execution happens you'll receive // the result of it via this channel. Make sure it's read from regularly as not // reading these events might affect other Blockchain functions. -func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { +func (bc *Blockchain) SubscribeForExecutions(ch chan *state.AppExecResult) { bc.subCh <- ch } // UnsubscribeFromBlocks unsubscribes given channel from new block notifications, -// you can close it afterwards. Passing non-subscribed channel is a no-op. -func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) { - bc.unsubCh <- ch +// you can close it afterwards. Passing non-subscribed channel is a no-op, but +// the method can read from this channel (discarding any read data). +func (bc *Blockchain) UnsubscribeFromBlocks(ch chan *block.Block) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // UnsubscribeFromTransactions unsubscribes given channel from new transaction // notifications, you can close it afterwards. Passing non-subscribed channel is -// a no-op. -func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { - bc.unsubCh <- ch +// a no-op, but the method can read from this channel (discarding any read data). +func (bc *Blockchain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // UnsubscribeFromNotifications unsubscribes given channel from new // execution-generated notifications, you can close it afterwards. Passing -// non-subscribed channel is a no-op. -func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) { - bc.unsubCh <- ch +// non-subscribed channel is a no-op, but the method can read from this channel +// (discarding any read data). +func (bc *Blockchain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // UnsubscribeFromExecutions unsubscribes given channel from new execution // notifications, you can close it afterwards. Passing non-subscribed channel is -// a no-op. -func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { - bc.unsubCh <- ch +// a no-op, but the method can read from this channel (discarding any read data). +func (bc *Blockchain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } } // CalculateClaimable calculates the amount of GAS generated by owning specified diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index f3fbc557b..e6580139d 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -49,6 +49,7 @@ type DefaultDiscovery struct { isDead bool requestCh chan int pool chan string + runExit chan struct{} } // NewDefaultDiscovery returns a new DefaultDiscovery. @@ -64,6 +65,7 @@ func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *Defa attempted: make(map[string]bool), requestCh: make(chan int), pool: make(chan string, maxPoolSize), + runExit: make(chan struct{}), } go d.run() return d @@ -211,6 +213,7 @@ func (d *DefaultDiscovery) Close() { default: } close(d.requestCh) + <-d.runExit } // run is a goroutine that makes DefaultDiscovery process its queue to connect @@ -259,7 +262,7 @@ func (d *DefaultDiscovery) run() { } } if !ok { - return + break } // Special case, no connections after all attempts. d.lock.RLock() @@ -270,4 +273,5 @@ func (d *DefaultDiscovery) run() { requested = oldRequest } } + close(d.runExit) } diff --git a/pkg/network/server.go b/pkg/network/server.go index 1779f7b70..5392071e7 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -69,8 +69,8 @@ type ( PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data interface{}) error) error RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block)) - SubscribeForBlocks(ch chan<- *block.Block) - UnsubscribeFromBlocks(ch chan<- *block.Block) + SubscribeForBlocks(ch chan *block.Block) + UnsubscribeFromBlocks(ch chan *block.Block) } // Service is a service abstraction (oracle, state root, consensus, etc). @@ -121,6 +121,7 @@ type ( register chan Peer unregister chan peerDrop quit chan struct{} + relayFin chan struct{} transactions chan *transaction.Transaction @@ -170,6 +171,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy id: randomID(), config: chain.GetConfig(), quit: make(chan struct{}), + relayFin: make(chan struct{}), register: make(chan Peer), unregister: make(chan peerDrop), txInMap: make(map[util.Uint256]struct{}), @@ -273,6 +275,7 @@ func (s *Server) Shutdown() { s.notaryRequestPool.StopSubscriptions() } close(s.quit) + <-s.relayFin } // AddService allows to add a service to be started/stopped by Server. @@ -1433,6 +1436,7 @@ drainBlocksLoop: } } close(ch) + close(s.relayFin) } // verifyAndPoolTX verifies the TX and adds it to the local mempool. diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 586ca48cd..750878e9f 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -1014,7 +1014,6 @@ func TestVerifyNotaryRequest(t *testing.T) { bc.NotaryContractScriptHash = util.Uint160{1, 2, 3} s, err := newServerFromConstructors(ServerConfig{}, bc, new(fakechain.FakeStateSync), zaptest.NewLogger(t), newFakeTransp, newTestDiscovery) require.NoError(t, err) - t.Cleanup(s.Shutdown) newNotaryRequest := func() *payload.P2PNotaryRequest { return &payload.P2PNotaryRequest{ MainTransaction: &transaction.Transaction{Script: []byte{0, 1, 2}}, diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 0fb1a6c90..fa3f3dd3f 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -32,8 +32,8 @@ type ( BlockHeight() uint32 GetMaxVerificationGAS() int64 GetNotaryContractScriptHash() util.Uint160 - SubscribeForBlocks(ch chan<- *block.Block) - UnsubscribeFromBlocks(ch chan<- *block.Block) + SubscribeForBlocks(ch chan *block.Block) + UnsubscribeFromBlocks(ch chan *block.Block) VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) } diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index e67c801a0..36d832809 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -94,14 +94,14 @@ type ( GetValidators() ([]*keys.PublicKey, error) HeaderHeight() uint32 InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error - SubscribeForBlocks(ch chan<- *block.Block) - SubscribeForExecutions(ch chan<- *state.AppExecResult) - SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) - SubscribeForTransactions(ch chan<- *transaction.Transaction) - UnsubscribeFromBlocks(ch chan<- *block.Block) - UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) - UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) - UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) + SubscribeForBlocks(ch chan *block.Block) + SubscribeForExecutions(ch chan *state.AppExecResult) + SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) + SubscribeForTransactions(ch chan *transaction.Transaction) + UnsubscribeFromBlocks(ch chan *block.Block) + UnsubscribeFromExecutions(ch chan *state.AppExecResult) + UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) + UnsubscribeFromTransactions(ch chan *transaction.Transaction) VerifyTx(*transaction.Transaction) error VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) mempool.Feer // fee interface diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 158377ae2..338ad4f3b 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -23,8 +23,8 @@ type ( Ledger interface { GetConfig() config.ProtocolConfiguration HeaderHeight() uint32 - SubscribeForBlocks(ch chan<- *block.Block) - UnsubscribeFromBlocks(ch chan<- *block.Block) + SubscribeForBlocks(ch chan *block.Block) + UnsubscribeFromBlocks(ch chan *block.Block) } // Service represents a state root service.