diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index b475c977d..ec1273ef8 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -67,8 +67,11 @@ type Service interface { Name() string // Start initializes dBFT and starts event loop for consensus service. // It must be called only when the sufficient amount of peers are connected. + // The service only starts once, subsequent calls to Start are no-op. Start() - // Shutdown stops dBFT event loop. + // Shutdown stops dBFT event loop. It can only be called once, subsequent calls + // to Shutdown on the same instance are no-op. The instance that was stopped can + // not be started again by calling Start (use a new instance if needed). Shutdown() // OnPayload is a callback to notify the Service about a newly received payload. @@ -272,7 +275,7 @@ func (s *service) Start() { // Shutdown implements the Service interface. func (s *service) Shutdown() { - if s.started.Load() { + if s.started.CAS(true, false) { close(s.quit) <-s.finished } @@ -330,14 +333,18 @@ events: default: } } -drainBlocksLoop: +drainLoop: for { select { + case <-s.messages: + case <-s.transactions: case <-s.blockEvents: default: - break drainBlocksLoop + break drainLoop } } + close(s.messages) + close(s.transactions) close(s.blockEvents) close(s.finished) } diff --git a/pkg/network/server.go b/pkg/network/server.go index 91e591f8d..f8292a24b 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -233,7 +233,8 @@ func (s *Server) ID() uint32 { return s.id } -// Start will start the server and its underlying transport. +// Start will start the server and its underlying transport. Calling it twice +// is an error. func (s *Server) Start(errChan chan error) { s.log.Info("node started", zap.Uint32("blockHeight", s.chain.BlockHeight()), @@ -251,7 +252,8 @@ func (s *Server) Start(errChan chan error) { s.run() } -// Shutdown disconnects all peers and stops listening. +// Shutdown disconnects all peers and stops listening. Calling it twice is an error, +// once stopped the same intance of the Server can't be started again by calling Start. func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.transport.Close() diff --git a/pkg/rpc/client/client.go b/pkg/rpc/client/client.go index fb03f4bd8..a837e5238 100644 --- a/pkg/rpc/client/client.go +++ b/pkg/rpc/client/client.go @@ -167,6 +167,11 @@ func (c *Client) Init() error { return nil } +// Close closes unused underlying networks connections. +func (c *Client) Close() { + c.cli.CloseIdleConnections() +} + func (c *Client) performRequest(method string, p request.RawParams, v interface{}) error { var r = request.Raw{ JSONRPC: request.JSONRPCVersion, diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 8436369d8..6baa42ea9 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -216,6 +216,7 @@ func (s *Server) Name() string { // Start creates a new JSON-RPC server listening on the configured port. It creates // goroutines needed internally and it returns its errors via errChan passed to New(). +// The Server only starts once, subsequent calls to Start are no-op. func (s *Server) Start() { if !s.config.Enabled { s.log.Info("RPC server is not enabled") @@ -261,9 +262,12 @@ func (s *Server) Start() { }() } -// Shutdown stops the RPC server. It can only be called once. +// Shutdown stops the RPC server if it's running. It can only be called once, +// subsequent calls to Shutdown on the same instance are no-op. The instance +// that was stopped can not be started again by calling Start (use a new +// instance if needed). func (s *Server) Shutdown() { - if !s.started.Load() { + if !s.started.CAS(true, false) { return } // Signal to websocket writer routines and handleSubEvents. diff --git a/pkg/services/helpers/rpcbroadcaster/broadcaster.go b/pkg/services/helpers/rpcbroadcaster/broadcaster.go index b43225325..7b6620b2a 100644 --- a/pkg/services/helpers/rpcbroadcaster/broadcaster.go +++ b/pkg/services/helpers/rpcbroadcaster/broadcaster.go @@ -14,6 +14,7 @@ type RPCBroadcaster struct { Responses chan request.RawParams close chan struct{} + finished chan struct{} sendTimeout time.Duration } @@ -23,6 +24,7 @@ func NewRPCBroadcaster(log *zap.Logger, sendTimeout time.Duration) *RPCBroadcast Clients: make(map[string]*RPCClient), Log: log, close: make(chan struct{}), + finished: make(chan struct{}), Responses: make(chan request.RawParams), sendTimeout: sendTimeout, } @@ -33,10 +35,11 @@ func (r *RPCBroadcaster) Run() { for _, c := range r.Clients { go c.run() } +run: for { select { case <-r.close: - return + break run case ps := <-r.Responses: for _, c := range r.Clients { select { @@ -47,9 +50,32 @@ func (r *RPCBroadcaster) Run() { } } } + for _, c := range r.Clients { + <-c.finished + } +drain: + for { + select { + case <-r.Responses: + default: + break drain + } + } + close(r.Responses) + close(r.finished) } -// Shutdown implements oracle.Broadcaster. +// SendParams sends a request using all clients if the broadcaster is active. +func (r *RPCBroadcaster) SendParams(params request.RawParams) { + select { + case <-r.close: + case r.Responses <- params: + } +} + +// Shutdown implements oracle.Broadcaster. The same instance can't be Run again +// after the shutdown. func (r *RPCBroadcaster) Shutdown() { close(r.close) + <-r.finished } diff --git a/pkg/services/helpers/rpcbroadcaster/client.go b/pkg/services/helpers/rpcbroadcaster/client.go index 403c726ea..33696084f 100644 --- a/pkg/services/helpers/rpcbroadcaster/client.go +++ b/pkg/services/helpers/rpcbroadcaster/client.go @@ -14,6 +14,7 @@ type RPCClient struct { client *client.Client addr string close chan struct{} + finished chan struct{} responses chan request.RawParams log *zap.Logger sendTimeout time.Duration @@ -28,6 +29,7 @@ func (r *RPCBroadcaster) NewRPCClient(addr string, method SendMethod, timeout ti return &RPCClient{ addr: addr, close: r.close, + finished: make(chan struct{}), responses: ch, log: r.Log.With(zap.String("address", addr)), sendTimeout: timeout, @@ -41,10 +43,11 @@ func (c *RPCClient) run() { DialTimeout: c.sendTimeout, RequestTimeout: c.sendTimeout, }) +run: for { select { case <-c.close: - return + break run case ps := <-c.responses: if c.client == nil { var err error @@ -63,4 +66,14 @@ func (c *RPCClient) run() { } } } + c.client.Close() +drain: + for { + select { + case <-c.responses: + default: + break drain + } + } + close(c.finished) } diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index bc567055c..b3141a2c8 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -22,6 +22,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm" "github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/wallet" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -47,6 +48,8 @@ type ( // newTxs is a channel where new transactions are sent // to be processed in an `onTransaction` callback. newTxs chan txHashPair + // started is a status bool to protect from double start/shutdown. + started *atomic.Bool // reqMtx protects requests list. reqMtx sync.RWMutex @@ -64,6 +67,7 @@ type ( reqCh chan mempoolevent.Event blocksCh chan *block.Block stopCh chan struct{} + done chan struct{} } // Config represents external configuration for Notary module. @@ -142,6 +146,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu requests: make(map[util.Uint256]*request), Config: cfg, Network: net, + started: atomic.NewBool(false), wallet: wallet, onTransaction: onTransaction, newTxs: make(chan txHashPair, defaultTxChannelCapacity), @@ -149,6 +154,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu reqCh: make(chan mempoolevent.Event), blocksCh: make(chan *block.Block), stopCh: make(chan struct{}), + done: make(chan struct{}), }, nil } @@ -158,7 +164,11 @@ func (n *Notary) Name() string { } // Start runs a Notary module in a separate goroutine. +// The Notary only starts once, subsequent calls to Start are no-op. func (n *Notary) Start() { + if !n.started.CAS(false, true) { + return + } n.Config.Log.Info("starting notary service") n.Config.Chain.SubscribeForBlocks(n.blocksCh) n.mp.SubscribeForTransactions(n.reqCh) @@ -199,15 +209,25 @@ drainLoop: } close(n.blocksCh) close(n.reqCh) + close(n.done) } -// Shutdown stops the Notary module. +// Shutdown stops the Notary module. It can only be called once, subsequent calls +// to Shutdown on the same instance are no-op. The instance that was stopped can +// not be started again by calling Start (use a new instance if needed). func (n *Notary) Shutdown() { + if !n.started.CAS(true, false) { + return + } close(n.stopCh) + <-n.done } // OnNewRequest is a callback method which is called after a new notary request is added to the notary request pool. func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) { + if !n.started.Load() { + return + } acc := n.getAccount() if acc == nil { return @@ -314,7 +334,7 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) { // OnRequestRemoval is a callback which is called after fallback transaction is removed // from the notary payload pool due to expiration, main tx appliance or any other reason. func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) { - if n.getAccount() == nil { + if !n.started.Load() || n.getAccount() == nil { return } @@ -338,6 +358,9 @@ func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) { // PostPersist is a callback which is called after a new block event is received. // PostPersist must not be called under the blockchain lock, because it uses finalization function. func (n *Notary) PostPersist() { + if !n.started.Load() { + return + } acc := n.getAccount() if acc == nil { return diff --git a/pkg/services/oracle/broadcaster/oracle.go b/pkg/services/oracle/broadcaster/oracle.go index 29dd4a160..8a8d4c328 100644 --- a/pkg/services/oracle/broadcaster/oracle.go +++ b/pkg/services/oracle/broadcaster/oracle.go @@ -51,7 +51,7 @@ func (r *oracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transactio base64.StdEncoding.EncodeToString(txSig), base64.StdEncoding.EncodeToString(msgSig), ) - r.Responses <- params + r.SendParams(params) } // GetMessage returns data which is signed upon sending response by RPC. diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 1882d49d0..a88077dee 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -53,6 +53,7 @@ type ( oracleSignContract []byte close chan struct{} + done chan struct{} requestCh chan request requestMap chan map[uint64]*state.OracleRequest @@ -123,6 +124,7 @@ func NewOracle(cfg Config) (*Oracle, error) { Config: cfg, close: make(chan struct{}), + done: make(chan struct{}), requestMap: make(chan map[uint64]*state.OracleRequest, 1), pending: make(map[uint64]*state.OracleRequest), responses: make(map[uint64]*incompleteTx), @@ -179,13 +181,23 @@ func (o *Oracle) Name() string { return "oracle" } -// Shutdown shutdowns Oracle. +// Shutdown shutdowns Oracle. It can only be called once, subsequent calls +// to Shutdown on the same instance are no-op. The instance that was stopped can +// not be started again by calling Start (use a new instance if needed). func (o *Oracle) Shutdown() { + o.respMtx.Lock() + defer o.respMtx.Unlock() + if !o.running { + return + } + o.running = false close(o.close) o.getBroadcaster().Shutdown() + <-o.done } // Start runs the oracle service in a separate goroutine. +// The Oracle only starts once, subsequent calls to Start are no-op. func (o *Oracle) Start() { o.respMtx.Lock() if o.running { @@ -207,11 +219,11 @@ func (o *Oracle) start() { } tick := time.NewTicker(o.MainCfg.RefreshInterval) +main: for { select { case <-o.close: - tick.Stop() - return + break main case <-tick.C: var reprocess []uint64 o.respMtx.Lock() @@ -243,6 +255,17 @@ func (o *Oracle) start() { } } } + tick.Stop() +drain: + for { + select { + case <-o.requestMap: + default: + break drain + } + } + close(o.requestMap) + close(o.done) } // UpdateNativeContract updates native oracle contract info for tx verification. diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 5766066df..158377ae2 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -14,6 +14,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/wallet" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -32,7 +33,12 @@ type ( OnPayload(p *payload.Extensible) error AddSignature(height uint32, validatorIndex int32, sig []byte) error GetConfig() config.StateRoot + // Start runs service instance in a separate goroutine. + // The service only starts once, subsequent calls to Start are no-op. Start() + // Shutdown stops the service. It can only be called once, subsequent calls + // to Shutdown on the same instance are no-op. The instance that was stopped can + // not be started again by calling Start (use a new instance if needed). Shutdown() } @@ -44,6 +50,7 @@ type ( Network netmode.Magic log *zap.Logger + started *atomic.Bool accMtx sync.RWMutex accHeight uint32 myIndex byte @@ -57,6 +64,7 @@ type ( maxRetries int relayExtensible RelayCallback blockCh chan *block.Block + stopCh chan struct{} done chan struct{} } ) @@ -72,10 +80,12 @@ func New(cfg config.StateRoot, sm *stateroot.Module, log *zap.Logger, bc Ledger, s := &service{ Module: sm, Network: bcConf.Magic, + started: atomic.NewBool(false), chain: bc, log: log, incompleteRoots: make(map[uint32]*incompleteRoot), blockCh: make(chan *block.Block), + stopCh: make(chan struct{}), done: make(chan struct{}), timePerBlock: time.Duration(bcConf.SecondsPerBlock) * time.Second, maxRetries: voteValidEndInc, diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 03ec9cf63..536440aaf 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -23,7 +23,11 @@ func (s *service) Name() string { } // Start runs service instance in a separate goroutine. +// The service only starts once, subsequent calls to Start are no-op. func (s *service) Start() { + if !s.started.CAS(false, true) { + return + } s.log.Info("starting state validation service") s.chain.SubscribeForBlocks(s.blockCh) go s.run() @@ -43,10 +47,11 @@ runloop: s.srMtx.Lock() delete(s.incompleteRoots, b.Index-voteValidEndInc) s.srMtx.Unlock() - case <-s.done: + case <-s.stopCh: break runloop } } + s.chain.UnsubscribeFromBlocks(s.blockCh) drainloop: for { select { @@ -56,12 +61,18 @@ drainloop: } } close(s.blockCh) + close(s.done) } -// Shutdown stops the service. +// Shutdown stops the service. It can only be called once, subsequent calls +// to Shutdown on the same instance are no-op. The instance that was stopped can +// not be started again by calling Start (use a new instance if needed). func (s *service) Shutdown() { - s.chain.UnsubscribeFromBlocks(s.blockCh) - close(s.done) + if !s.started.CAS(true, false) { + return + } + close(s.stopCh) + <-s.done } func (s *service) signAndSend(r *state.MPTRoot) error {