From cd4f46247dcd6843e416a1f4dff38473c50f8a91 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 1 Jul 2022 21:49:40 +0300 Subject: [PATCH 01/12] consensus: make double-call to Shutdown a no-op --- pkg/consensus/consensus.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index dfdb8539b..8e1b5ad32 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -274,7 +274,7 @@ func (s *service) Start() { // Shutdown implements the Service interface. func (s *service) Shutdown() { - if s.started.Load() { + if s.started.CAS(true, false) { close(s.quit) <-s.finished } From 527505ea5eb9a70aa668b60210624b5a6c0c923c Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 1 Jul 2022 21:55:41 +0300 Subject: [PATCH 02/12] consensus: drain messages and transactions on exit as well There might be some threads blocked on these channels. --- pkg/consensus/consensus.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 8e1b5ad32..d9963d2c4 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -332,14 +332,18 @@ events: default: } } -drainBlocksLoop: +drainLoop: for { select { + case <-s.messages: + case <-s.transactions: case <-s.blockEvents: default: - break drainBlocksLoop + break drainLoop } } + close(s.messages) + close(s.transactions) close(s.blockEvents) close(s.finished) } From c096f32a32dfccfbad7d85f2ccf3427112d24d20 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 1 Jul 2022 22:01:04 +0300 Subject: [PATCH 03/12] rpc/server: make double Shutdown a no-op --- pkg/rpc/server/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 491a4596b..b647660b3 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -260,9 +260,9 @@ func (s *Server) Start() { }() } -// Shutdown stops the RPC server. It can only be called once. +// Shutdown stops the RPC server if it's running. func (s *Server) Shutdown() { - if !s.started.Load() { + if !s.started.CAS(true, false) { return } // Signal to websocket writer routines and handleSubEvents. From 649fe58550cd29791eed86ae265a3cb4c641aa94 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 1 Jul 2022 23:04:54 +0300 Subject: [PATCH 04/12] rpcbroadcaster: properly stop broadcaster Drain channels, wait for everything to stop. --- .../helpers/rpcbroadcaster/broadcaster.go | 27 ++++++++++++++++++- pkg/services/helpers/rpcbroadcaster/client.go | 14 +++++++++- pkg/services/oracle/broadcaster/oracle.go | 2 +- 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/pkg/services/helpers/rpcbroadcaster/broadcaster.go b/pkg/services/helpers/rpcbroadcaster/broadcaster.go index b43225325..e030dcb22 100644 --- a/pkg/services/helpers/rpcbroadcaster/broadcaster.go +++ b/pkg/services/helpers/rpcbroadcaster/broadcaster.go @@ -14,6 +14,7 @@ type RPCBroadcaster struct { Responses chan request.RawParams close chan struct{} + finished chan struct{} sendTimeout time.Duration } @@ -23,6 +24,7 @@ func NewRPCBroadcaster(log *zap.Logger, sendTimeout time.Duration) *RPCBroadcast Clients: make(map[string]*RPCClient), Log: log, close: make(chan struct{}), + finished: make(chan struct{}), Responses: make(chan request.RawParams), sendTimeout: sendTimeout, } @@ -33,10 +35,11 @@ func (r *RPCBroadcaster) Run() { for _, c := range r.Clients { go c.run() } +run: for { select { case <-r.close: - return + break run case ps := <-r.Responses: for _, c := range r.Clients { select { @@ -47,9 +50,31 @@ func (r *RPCBroadcaster) Run() { } } } + for _, c := range r.Clients { + <-c.finished + } +drain: + for { + select { + case <-r.Responses: + default: + break drain + } + } + close(r.Responses) + close(r.finished) +} + +// SendParams sends a request using all clients if the broadcaster is active. +func (r *RPCBroadcaster) SendParams(params request.RawParams) { + select { + case <-r.close: + case r.Responses <- params: + } } // Shutdown implements oracle.Broadcaster. func (r *RPCBroadcaster) Shutdown() { close(r.close) + <-r.finished } diff --git a/pkg/services/helpers/rpcbroadcaster/client.go b/pkg/services/helpers/rpcbroadcaster/client.go index 403c726ea..1045312b0 100644 --- a/pkg/services/helpers/rpcbroadcaster/client.go +++ b/pkg/services/helpers/rpcbroadcaster/client.go @@ -14,6 +14,7 @@ type RPCClient struct { client *client.Client addr string close chan struct{} + finished chan struct{} responses chan request.RawParams log *zap.Logger sendTimeout time.Duration @@ -28,6 +29,7 @@ func (r *RPCBroadcaster) NewRPCClient(addr string, method SendMethod, timeout ti return &RPCClient{ addr: addr, close: r.close, + finished: make(chan struct{}), responses: ch, log: r.Log.With(zap.String("address", addr)), sendTimeout: timeout, @@ -41,10 +43,11 @@ func (c *RPCClient) run() { DialTimeout: c.sendTimeout, RequestTimeout: c.sendTimeout, }) +run: for { select { case <-c.close: - return + break run case ps := <-c.responses: if c.client == nil { var err error @@ -63,4 +66,13 @@ func (c *RPCClient) run() { } } } +drain: + for { + select { + case <-c.responses: + default: + break drain + } + } + close(c.finished) } diff --git a/pkg/services/oracle/broadcaster/oracle.go b/pkg/services/oracle/broadcaster/oracle.go index 29dd4a160..8a8d4c328 100644 --- a/pkg/services/oracle/broadcaster/oracle.go +++ b/pkg/services/oracle/broadcaster/oracle.go @@ -51,7 +51,7 @@ func (r *oracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transactio base64.StdEncoding.EncodeToString(txSig), base64.StdEncoding.EncodeToString(msgSig), ) - r.Responses <- params + r.SendParams(params) } // GetMessage returns data which is signed upon sending response by RPC. From 6b2fc5e056974bec870db386c8e1e6be2b89cebd Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 1 Jul 2022 23:07:37 +0300 Subject: [PATCH 05/12] rpc/client: add Close method Allow to close unused network connections and use it during RPC broadcaster shutdown, because otherwise we leak them. --- pkg/rpc/client/client.go | 5 +++++ pkg/services/helpers/rpcbroadcaster/client.go | 1 + 2 files changed, 6 insertions(+) diff --git a/pkg/rpc/client/client.go b/pkg/rpc/client/client.go index fb03f4bd8..a837e5238 100644 --- a/pkg/rpc/client/client.go +++ b/pkg/rpc/client/client.go @@ -167,6 +167,11 @@ func (c *Client) Init() error { return nil } +// Close closes unused underlying networks connections. +func (c *Client) Close() { + c.cli.CloseIdleConnections() +} + func (c *Client) performRequest(method string, p request.RawParams, v interface{}) error { var r = request.Raw{ JSONRPC: request.JSONRPCVersion, diff --git a/pkg/services/helpers/rpcbroadcaster/client.go b/pkg/services/helpers/rpcbroadcaster/client.go index 1045312b0..33696084f 100644 --- a/pkg/services/helpers/rpcbroadcaster/client.go +++ b/pkg/services/helpers/rpcbroadcaster/client.go @@ -66,6 +66,7 @@ run: } } } + c.client.Close() drain: for { select { From cab633ffeda6327b419fc5200827931950b4617e Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 1 Jul 2022 23:31:25 +0300 Subject: [PATCH 06/12] oracle: make double-Shutdown a no-op --- pkg/services/oracle/oracle.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 1882d49d0..168ff2630 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -181,6 +181,12 @@ func (o *Oracle) Name() string { // Shutdown shutdowns Oracle. func (o *Oracle) Shutdown() { + o.respMtx.Lock() + defer o.respMtx.Unlock() + if !o.running { + return + } + o.running = false close(o.close) o.getBroadcaster().Shutdown() } From 73e34514a5e1215094fc142f3f39cab520637d8a Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 1 Jul 2022 23:31:38 +0300 Subject: [PATCH 07/12] oracle: wait for the service to stop during Shutdown --- pkg/services/oracle/oracle.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 168ff2630..fc63af081 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -53,6 +53,7 @@ type ( oracleSignContract []byte close chan struct{} + done chan struct{} requestCh chan request requestMap chan map[uint64]*state.OracleRequest @@ -123,6 +124,7 @@ func NewOracle(cfg Config) (*Oracle, error) { Config: cfg, close: make(chan struct{}), + done: make(chan struct{}), requestMap: make(chan map[uint64]*state.OracleRequest, 1), pending: make(map[uint64]*state.OracleRequest), responses: make(map[uint64]*incompleteTx), @@ -189,6 +191,7 @@ func (o *Oracle) Shutdown() { o.running = false close(o.close) o.getBroadcaster().Shutdown() + <-o.done } // Start runs the oracle service in a separate goroutine. @@ -213,11 +216,11 @@ func (o *Oracle) start() { } tick := time.NewTicker(o.MainCfg.RefreshInterval) +main: for { select { case <-o.close: - tick.Stop() - return + break main case <-tick.C: var reprocess []uint64 o.respMtx.Lock() @@ -249,6 +252,17 @@ func (o *Oracle) start() { } } } + tick.Stop() +drain: + for { + select { + case <-o.requestMap: + default: + break drain + } + } + close(o.requestMap) + close(o.done) } // UpdateNativeContract updates native oracle contract info for tx verification. From 0d627c947f5b137170f23827b8f251dab8ed29c3 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sat, 2 Jul 2022 15:48:35 +0300 Subject: [PATCH 08/12] notary: control start/stop state Don't start/stop twice, don't do anything in callbacks if not started. --- pkg/services/notary/notary.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index bc567055c..f270d2a71 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -22,6 +22,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm" "github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/wallet" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -47,6 +48,8 @@ type ( // newTxs is a channel where new transactions are sent // to be processed in an `onTransaction` callback. newTxs chan txHashPair + // started is a status bool to protect from double start/shutdown. + started *atomic.Bool // reqMtx protects requests list. reqMtx sync.RWMutex @@ -142,6 +145,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu requests: make(map[util.Uint256]*request), Config: cfg, Network: net, + started: atomic.NewBool(false), wallet: wallet, onTransaction: onTransaction, newTxs: make(chan txHashPair, defaultTxChannelCapacity), @@ -159,6 +163,9 @@ func (n *Notary) Name() string { // Start runs a Notary module in a separate goroutine. func (n *Notary) Start() { + if !n.started.CAS(false, true) { + return + } n.Config.Log.Info("starting notary service") n.Config.Chain.SubscribeForBlocks(n.blocksCh) n.mp.SubscribeForTransactions(n.reqCh) @@ -203,11 +210,17 @@ drainLoop: // Shutdown stops the Notary module. func (n *Notary) Shutdown() { + if !n.started.CAS(true, false) { + return + } close(n.stopCh) } // OnNewRequest is a callback method which is called after a new notary request is added to the notary request pool. func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) { + if !n.started.Load() { + return + } acc := n.getAccount() if acc == nil { return @@ -314,7 +327,7 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) { // OnRequestRemoval is a callback which is called after fallback transaction is removed // from the notary payload pool due to expiration, main tx appliance or any other reason. func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) { - if n.getAccount() == nil { + if !n.started.Load() || n.getAccount() == nil { return } @@ -338,6 +351,9 @@ func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) { // PostPersist is a callback which is called after a new block event is received. // PostPersist must not be called under the blockchain lock, because it uses finalization function. func (n *Notary) PostPersist() { + if !n.started.Load() { + return + } acc := n.getAccount() if acc == nil { return From bf462a81fe4c3b4dec2aa0ed4348b7ea112239dd Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sat, 2 Jul 2022 15:50:27 +0300 Subject: [PATCH 09/12] notary: wait for the service to finish on Shutdown --- pkg/services/notary/notary.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index f270d2a71..44325f48c 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -67,6 +67,7 @@ type ( reqCh chan mempoolevent.Event blocksCh chan *block.Block stopCh chan struct{} + done chan struct{} } // Config represents external configuration for Notary module. @@ -153,6 +154,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu reqCh: make(chan mempoolevent.Event), blocksCh: make(chan *block.Block), stopCh: make(chan struct{}), + done: make(chan struct{}), }, nil } @@ -206,6 +208,7 @@ drainLoop: } close(n.blocksCh) close(n.reqCh) + close(n.done) } // Shutdown stops the Notary module. @@ -214,6 +217,7 @@ func (n *Notary) Shutdown() { return } close(n.stopCh) + <-n.done } // OnNewRequest is a callback method which is called after a new notary request is added to the notary request pool. From 58b9ac41e2cd3a5cbbf62aedffefe7ed1f0a08d3 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 4 Jul 2022 10:55:53 +0300 Subject: [PATCH 10/12] stateroot: handle double start/shutdown --- pkg/services/stateroot/service.go | 3 +++ pkg/services/stateroot/validators.go | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 5766066df..256f25910 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -14,6 +14,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/wallet" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -44,6 +45,7 @@ type ( Network netmode.Magic log *zap.Logger + started *atomic.Bool accMtx sync.RWMutex accHeight uint32 myIndex byte @@ -72,6 +74,7 @@ func New(cfg config.StateRoot, sm *stateroot.Module, log *zap.Logger, bc Ledger, s := &service{ Module: sm, Network: bcConf.Magic, + started: atomic.NewBool(false), chain: bc, log: log, incompleteRoots: make(map[uint32]*incompleteRoot), diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 03ec9cf63..9e0e234dc 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -24,6 +24,9 @@ func (s *service) Name() string { // Start runs service instance in a separate goroutine. func (s *service) Start() { + if !s.started.CAS(false, true) { + return + } s.log.Info("starting state validation service") s.chain.SubscribeForBlocks(s.blockCh) go s.run() @@ -60,6 +63,9 @@ drainloop: // Shutdown stops the service. func (s *service) Shutdown() { + if !s.started.CAS(true, false) { + return + } s.chain.UnsubscribeFromBlocks(s.blockCh) close(s.done) } From 36d4c17a153fad3ab0ff890786ba2c635059d94e Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 4 Jul 2022 10:59:34 +0300 Subject: [PATCH 11/12] stateroot: wait for the service to stop un Shutdown --- pkg/services/stateroot/service.go | 2 ++ pkg/services/stateroot/validators.go | 8 +++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 256f25910..789452daa 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -59,6 +59,7 @@ type ( maxRetries int relayExtensible RelayCallback blockCh chan *block.Block + stopCh chan struct{} done chan struct{} } ) @@ -79,6 +80,7 @@ func New(cfg config.StateRoot, sm *stateroot.Module, log *zap.Logger, bc Ledger, log: log, incompleteRoots: make(map[uint32]*incompleteRoot), blockCh: make(chan *block.Block), + stopCh: make(chan struct{}), done: make(chan struct{}), timePerBlock: time.Duration(bcConf.SecondsPerBlock) * time.Second, maxRetries: voteValidEndInc, diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 9e0e234dc..36f7faaf4 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -46,10 +46,11 @@ runloop: s.srMtx.Lock() delete(s.incompleteRoots, b.Index-voteValidEndInc) s.srMtx.Unlock() - case <-s.done: + case <-s.stopCh: break runloop } } + s.chain.UnsubscribeFromBlocks(s.blockCh) drainloop: for { select { @@ -59,6 +60,7 @@ drainloop: } } close(s.blockCh) + close(s.done) } // Shutdown stops the service. @@ -66,8 +68,8 @@ func (s *service) Shutdown() { if !s.started.CAS(true, false) { return } - s.chain.UnsubscribeFromBlocks(s.blockCh) - close(s.done) + close(s.stopCh) + <-s.done } func (s *service) signAndSend(r *state.MPTRoot) error { From 3e2eda6752d512f284ef7718bd3f50ed36f20dd5 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 4 Jul 2022 23:03:50 +0300 Subject: [PATCH 12/12] *: add some comments to service Start/Shutdown methods --- pkg/consensus/consensus.go | 5 ++++- pkg/network/server.go | 6 ++++-- pkg/rpc/server/server.go | 6 +++++- pkg/services/helpers/rpcbroadcaster/broadcaster.go | 3 ++- pkg/services/notary/notary.go | 5 ++++- pkg/services/oracle/oracle.go | 5 ++++- pkg/services/stateroot/service.go | 5 +++++ pkg/services/stateroot/validators.go | 5 ++++- 8 files changed, 32 insertions(+), 8 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index d9963d2c4..882f8568f 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -67,8 +67,11 @@ type Service interface { Name() string // Start initializes dBFT and starts event loop for consensus service. // It must be called only when the sufficient amount of peers are connected. + // The service only starts once, subsequent calls to Start are no-op. Start() - // Shutdown stops dBFT event loop. + // Shutdown stops dBFT event loop. It can only be called once, subsequent calls + // to Shutdown on the same instance are no-op. The instance that was stopped can + // not be started again by calling Start (use a new instance if needed). Shutdown() // OnPayload is a callback to notify the Service about a newly received payload. diff --git a/pkg/network/server.go b/pkg/network/server.go index 91e591f8d..f8292a24b 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -233,7 +233,8 @@ func (s *Server) ID() uint32 { return s.id } -// Start will start the server and its underlying transport. +// Start will start the server and its underlying transport. Calling it twice +// is an error. func (s *Server) Start(errChan chan error) { s.log.Info("node started", zap.Uint32("blockHeight", s.chain.BlockHeight()), @@ -251,7 +252,8 @@ func (s *Server) Start(errChan chan error) { s.run() } -// Shutdown disconnects all peers and stops listening. +// Shutdown disconnects all peers and stops listening. Calling it twice is an error, +// once stopped the same intance of the Server can't be started again by calling Start. func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.transport.Close() diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index b647660b3..8643945b4 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -215,6 +215,7 @@ func (s *Server) Name() string { // Start creates a new JSON-RPC server listening on the configured port. It creates // goroutines needed internally and it returns its errors via errChan passed to New(). +// The Server only starts once, subsequent calls to Start are no-op. func (s *Server) Start() { if !s.config.Enabled { s.log.Info("RPC server is not enabled") @@ -260,7 +261,10 @@ func (s *Server) Start() { }() } -// Shutdown stops the RPC server if it's running. +// Shutdown stops the RPC server if it's running. It can only be called once, +// subsequent calls to Shutdown on the same instance are no-op. The instance +// that was stopped can not be started again by calling Start (use a new +// instance if needed). func (s *Server) Shutdown() { if !s.started.CAS(true, false) { return diff --git a/pkg/services/helpers/rpcbroadcaster/broadcaster.go b/pkg/services/helpers/rpcbroadcaster/broadcaster.go index e030dcb22..7b6620b2a 100644 --- a/pkg/services/helpers/rpcbroadcaster/broadcaster.go +++ b/pkg/services/helpers/rpcbroadcaster/broadcaster.go @@ -73,7 +73,8 @@ func (r *RPCBroadcaster) SendParams(params request.RawParams) { } } -// Shutdown implements oracle.Broadcaster. +// Shutdown implements oracle.Broadcaster. The same instance can't be Run again +// after the shutdown. func (r *RPCBroadcaster) Shutdown() { close(r.close) <-r.finished diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 44325f48c..b3141a2c8 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -164,6 +164,7 @@ func (n *Notary) Name() string { } // Start runs a Notary module in a separate goroutine. +// The Notary only starts once, subsequent calls to Start are no-op. func (n *Notary) Start() { if !n.started.CAS(false, true) { return @@ -211,7 +212,9 @@ drainLoop: close(n.done) } -// Shutdown stops the Notary module. +// Shutdown stops the Notary module. It can only be called once, subsequent calls +// to Shutdown on the same instance are no-op. The instance that was stopped can +// not be started again by calling Start (use a new instance if needed). func (n *Notary) Shutdown() { if !n.started.CAS(true, false) { return diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index fc63af081..a88077dee 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -181,7 +181,9 @@ func (o *Oracle) Name() string { return "oracle" } -// Shutdown shutdowns Oracle. +// Shutdown shutdowns Oracle. It can only be called once, subsequent calls +// to Shutdown on the same instance are no-op. The instance that was stopped can +// not be started again by calling Start (use a new instance if needed). func (o *Oracle) Shutdown() { o.respMtx.Lock() defer o.respMtx.Unlock() @@ -195,6 +197,7 @@ func (o *Oracle) Shutdown() { } // Start runs the oracle service in a separate goroutine. +// The Oracle only starts once, subsequent calls to Start are no-op. func (o *Oracle) Start() { o.respMtx.Lock() if o.running { diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 789452daa..158377ae2 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -33,7 +33,12 @@ type ( OnPayload(p *payload.Extensible) error AddSignature(height uint32, validatorIndex int32, sig []byte) error GetConfig() config.StateRoot + // Start runs service instance in a separate goroutine. + // The service only starts once, subsequent calls to Start are no-op. Start() + // Shutdown stops the service. It can only be called once, subsequent calls + // to Shutdown on the same instance are no-op. The instance that was stopped can + // not be started again by calling Start (use a new instance if needed). Shutdown() } diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 36f7faaf4..536440aaf 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -23,6 +23,7 @@ func (s *service) Name() string { } // Start runs service instance in a separate goroutine. +// The service only starts once, subsequent calls to Start are no-op. func (s *service) Start() { if !s.started.CAS(false, true) { return @@ -63,7 +64,9 @@ drainloop: close(s.done) } -// Shutdown stops the service. +// Shutdown stops the service. It can only be called once, subsequent calls +// to Shutdown on the same instance are no-op. The instance that was stopped can +// not be started again by calling Start (use a new instance if needed). func (s *service) Shutdown() { if !s.started.CAS(true, false) { return