Merge pull request #2580 from nspcc-dev/service-review

Service review
This commit is contained in:
Roman Khimov 2022-07-05 12:23:25 +03:00 committed by GitHub
commit 9f05009d1a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 145 additions and 21 deletions

View file

@ -67,8 +67,11 @@ type Service interface {
Name() string Name() string
// Start initializes dBFT and starts event loop for consensus service. // Start initializes dBFT and starts event loop for consensus service.
// It must be called only when the sufficient amount of peers are connected. // It must be called only when the sufficient amount of peers are connected.
// The service only starts once, subsequent calls to Start are no-op.
Start() Start()
// Shutdown stops dBFT event loop. // Shutdown stops dBFT event loop. It can only be called once, subsequent calls
// to Shutdown on the same instance are no-op. The instance that was stopped can
// not be started again by calling Start (use a new instance if needed).
Shutdown() Shutdown()
// OnPayload is a callback to notify the Service about a newly received payload. // OnPayload is a callback to notify the Service about a newly received payload.
@ -272,7 +275,7 @@ func (s *service) Start() {
// Shutdown implements the Service interface. // Shutdown implements the Service interface.
func (s *service) Shutdown() { func (s *service) Shutdown() {
if s.started.Load() { if s.started.CAS(true, false) {
close(s.quit) close(s.quit)
<-s.finished <-s.finished
} }
@ -330,14 +333,18 @@ events:
default: default:
} }
} }
drainBlocksLoop: drainLoop:
for { for {
select { select {
case <-s.messages:
case <-s.transactions:
case <-s.blockEvents: case <-s.blockEvents:
default: default:
break drainBlocksLoop break drainLoop
} }
} }
close(s.messages)
close(s.transactions)
close(s.blockEvents) close(s.blockEvents)
close(s.finished) close(s.finished)
} }

View file

@ -233,7 +233,8 @@ func (s *Server) ID() uint32 {
return s.id return s.id
} }
// Start will start the server and its underlying transport. // Start will start the server and its underlying transport. Calling it twice
// is an error.
func (s *Server) Start(errChan chan error) { func (s *Server) Start(errChan chan error) {
s.log.Info("node started", s.log.Info("node started",
zap.Uint32("blockHeight", s.chain.BlockHeight()), zap.Uint32("blockHeight", s.chain.BlockHeight()),
@ -251,7 +252,8 @@ func (s *Server) Start(errChan chan error) {
s.run() s.run()
} }
// Shutdown disconnects all peers and stops listening. // Shutdown disconnects all peers and stops listening. Calling it twice is an error,
// once stopped the same intance of the Server can't be started again by calling Start.
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
s.transport.Close() s.transport.Close()

View file

@ -167,6 +167,11 @@ func (c *Client) Init() error {
return nil return nil
} }
// Close closes unused underlying networks connections.
func (c *Client) Close() {
c.cli.CloseIdleConnections()
}
func (c *Client) performRequest(method string, p request.RawParams, v interface{}) error { func (c *Client) performRequest(method string, p request.RawParams, v interface{}) error {
var r = request.Raw{ var r = request.Raw{
JSONRPC: request.JSONRPCVersion, JSONRPC: request.JSONRPCVersion,

View file

@ -216,6 +216,7 @@ func (s *Server) Name() string {
// Start creates a new JSON-RPC server listening on the configured port. It creates // Start creates a new JSON-RPC server listening on the configured port. It creates
// goroutines needed internally and it returns its errors via errChan passed to New(). // goroutines needed internally and it returns its errors via errChan passed to New().
// The Server only starts once, subsequent calls to Start are no-op.
func (s *Server) Start() { func (s *Server) Start() {
if !s.config.Enabled { if !s.config.Enabled {
s.log.Info("RPC server is not enabled") s.log.Info("RPC server is not enabled")
@ -261,9 +262,12 @@ func (s *Server) Start() {
}() }()
} }
// Shutdown stops the RPC server. It can only be called once. // Shutdown stops the RPC server if it's running. It can only be called once,
// subsequent calls to Shutdown on the same instance are no-op. The instance
// that was stopped can not be started again by calling Start (use a new
// instance if needed).
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
if !s.started.Load() { if !s.started.CAS(true, false) {
return return
} }
// Signal to websocket writer routines and handleSubEvents. // Signal to websocket writer routines and handleSubEvents.

View file

@ -14,6 +14,7 @@ type RPCBroadcaster struct {
Responses chan request.RawParams Responses chan request.RawParams
close chan struct{} close chan struct{}
finished chan struct{}
sendTimeout time.Duration sendTimeout time.Duration
} }
@ -23,6 +24,7 @@ func NewRPCBroadcaster(log *zap.Logger, sendTimeout time.Duration) *RPCBroadcast
Clients: make(map[string]*RPCClient), Clients: make(map[string]*RPCClient),
Log: log, Log: log,
close: make(chan struct{}), close: make(chan struct{}),
finished: make(chan struct{}),
Responses: make(chan request.RawParams), Responses: make(chan request.RawParams),
sendTimeout: sendTimeout, sendTimeout: sendTimeout,
} }
@ -33,10 +35,11 @@ func (r *RPCBroadcaster) Run() {
for _, c := range r.Clients { for _, c := range r.Clients {
go c.run() go c.run()
} }
run:
for { for {
select { select {
case <-r.close: case <-r.close:
return break run
case ps := <-r.Responses: case ps := <-r.Responses:
for _, c := range r.Clients { for _, c := range r.Clients {
select { select {
@ -47,9 +50,32 @@ func (r *RPCBroadcaster) Run() {
} }
} }
} }
for _, c := range r.Clients {
<-c.finished
}
drain:
for {
select {
case <-r.Responses:
default:
break drain
}
}
close(r.Responses)
close(r.finished)
} }
// Shutdown implements oracle.Broadcaster. // SendParams sends a request using all clients if the broadcaster is active.
func (r *RPCBroadcaster) SendParams(params request.RawParams) {
select {
case <-r.close:
case r.Responses <- params:
}
}
// Shutdown implements oracle.Broadcaster. The same instance can't be Run again
// after the shutdown.
func (r *RPCBroadcaster) Shutdown() { func (r *RPCBroadcaster) Shutdown() {
close(r.close) close(r.close)
<-r.finished
} }

View file

@ -14,6 +14,7 @@ type RPCClient struct {
client *client.Client client *client.Client
addr string addr string
close chan struct{} close chan struct{}
finished chan struct{}
responses chan request.RawParams responses chan request.RawParams
log *zap.Logger log *zap.Logger
sendTimeout time.Duration sendTimeout time.Duration
@ -28,6 +29,7 @@ func (r *RPCBroadcaster) NewRPCClient(addr string, method SendMethod, timeout ti
return &RPCClient{ return &RPCClient{
addr: addr, addr: addr,
close: r.close, close: r.close,
finished: make(chan struct{}),
responses: ch, responses: ch,
log: r.Log.With(zap.String("address", addr)), log: r.Log.With(zap.String("address", addr)),
sendTimeout: timeout, sendTimeout: timeout,
@ -41,10 +43,11 @@ func (c *RPCClient) run() {
DialTimeout: c.sendTimeout, DialTimeout: c.sendTimeout,
RequestTimeout: c.sendTimeout, RequestTimeout: c.sendTimeout,
}) })
run:
for { for {
select { select {
case <-c.close: case <-c.close:
return break run
case ps := <-c.responses: case ps := <-c.responses:
if c.client == nil { if c.client == nil {
var err error var err error
@ -63,4 +66,14 @@ func (c *RPCClient) run() {
} }
} }
} }
c.client.Close()
drain:
for {
select {
case <-c.responses:
default:
break drain
}
}
close(c.finished)
} }

View file

@ -22,6 +22,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/vm" "github.com/nspcc-dev/neo-go/pkg/vm"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -47,6 +48,8 @@ type (
// newTxs is a channel where new transactions are sent // newTxs is a channel where new transactions are sent
// to be processed in an `onTransaction` callback. // to be processed in an `onTransaction` callback.
newTxs chan txHashPair newTxs chan txHashPair
// started is a status bool to protect from double start/shutdown.
started *atomic.Bool
// reqMtx protects requests list. // reqMtx protects requests list.
reqMtx sync.RWMutex reqMtx sync.RWMutex
@ -64,6 +67,7 @@ type (
reqCh chan mempoolevent.Event reqCh chan mempoolevent.Event
blocksCh chan *block.Block blocksCh chan *block.Block
stopCh chan struct{} stopCh chan struct{}
done chan struct{}
} }
// Config represents external configuration for Notary module. // Config represents external configuration for Notary module.
@ -142,6 +146,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu
requests: make(map[util.Uint256]*request), requests: make(map[util.Uint256]*request),
Config: cfg, Config: cfg,
Network: net, Network: net,
started: atomic.NewBool(false),
wallet: wallet, wallet: wallet,
onTransaction: onTransaction, onTransaction: onTransaction,
newTxs: make(chan txHashPair, defaultTxChannelCapacity), newTxs: make(chan txHashPair, defaultTxChannelCapacity),
@ -149,6 +154,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu
reqCh: make(chan mempoolevent.Event), reqCh: make(chan mempoolevent.Event),
blocksCh: make(chan *block.Block), blocksCh: make(chan *block.Block),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
done: make(chan struct{}),
}, nil }, nil
} }
@ -158,7 +164,11 @@ func (n *Notary) Name() string {
} }
// Start runs a Notary module in a separate goroutine. // Start runs a Notary module in a separate goroutine.
// The Notary only starts once, subsequent calls to Start are no-op.
func (n *Notary) Start() { func (n *Notary) Start() {
if !n.started.CAS(false, true) {
return
}
n.Config.Log.Info("starting notary service") n.Config.Log.Info("starting notary service")
n.Config.Chain.SubscribeForBlocks(n.blocksCh) n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh) n.mp.SubscribeForTransactions(n.reqCh)
@ -199,15 +209,25 @@ drainLoop:
} }
close(n.blocksCh) close(n.blocksCh)
close(n.reqCh) close(n.reqCh)
close(n.done)
} }
// Shutdown stops the Notary module. // Shutdown stops the Notary module. It can only be called once, subsequent calls
// to Shutdown on the same instance are no-op. The instance that was stopped can
// not be started again by calling Start (use a new instance if needed).
func (n *Notary) Shutdown() { func (n *Notary) Shutdown() {
if !n.started.CAS(true, false) {
return
}
close(n.stopCh) close(n.stopCh)
<-n.done
} }
// OnNewRequest is a callback method which is called after a new notary request is added to the notary request pool. // OnNewRequest is a callback method which is called after a new notary request is added to the notary request pool.
func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) { func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
if !n.started.Load() {
return
}
acc := n.getAccount() acc := n.getAccount()
if acc == nil { if acc == nil {
return return
@ -314,7 +334,7 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
// OnRequestRemoval is a callback which is called after fallback transaction is removed // OnRequestRemoval is a callback which is called after fallback transaction is removed
// from the notary payload pool due to expiration, main tx appliance or any other reason. // from the notary payload pool due to expiration, main tx appliance or any other reason.
func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) { func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) {
if n.getAccount() == nil { if !n.started.Load() || n.getAccount() == nil {
return return
} }
@ -338,6 +358,9 @@ func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) {
// PostPersist is a callback which is called after a new block event is received. // PostPersist is a callback which is called after a new block event is received.
// PostPersist must not be called under the blockchain lock, because it uses finalization function. // PostPersist must not be called under the blockchain lock, because it uses finalization function.
func (n *Notary) PostPersist() { func (n *Notary) PostPersist() {
if !n.started.Load() {
return
}
acc := n.getAccount() acc := n.getAccount()
if acc == nil { if acc == nil {
return return

View file

@ -51,7 +51,7 @@ func (r *oracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transactio
base64.StdEncoding.EncodeToString(txSig), base64.StdEncoding.EncodeToString(txSig),
base64.StdEncoding.EncodeToString(msgSig), base64.StdEncoding.EncodeToString(msgSig),
) )
r.Responses <- params r.SendParams(params)
} }
// GetMessage returns data which is signed upon sending response by RPC. // GetMessage returns data which is signed upon sending response by RPC.

View file

@ -53,6 +53,7 @@ type (
oracleSignContract []byte oracleSignContract []byte
close chan struct{} close chan struct{}
done chan struct{}
requestCh chan request requestCh chan request
requestMap chan map[uint64]*state.OracleRequest requestMap chan map[uint64]*state.OracleRequest
@ -123,6 +124,7 @@ func NewOracle(cfg Config) (*Oracle, error) {
Config: cfg, Config: cfg,
close: make(chan struct{}), close: make(chan struct{}),
done: make(chan struct{}),
requestMap: make(chan map[uint64]*state.OracleRequest, 1), requestMap: make(chan map[uint64]*state.OracleRequest, 1),
pending: make(map[uint64]*state.OracleRequest), pending: make(map[uint64]*state.OracleRequest),
responses: make(map[uint64]*incompleteTx), responses: make(map[uint64]*incompleteTx),
@ -179,13 +181,23 @@ func (o *Oracle) Name() string {
return "oracle" return "oracle"
} }
// Shutdown shutdowns Oracle. // Shutdown shutdowns Oracle. It can only be called once, subsequent calls
// to Shutdown on the same instance are no-op. The instance that was stopped can
// not be started again by calling Start (use a new instance if needed).
func (o *Oracle) Shutdown() { func (o *Oracle) Shutdown() {
o.respMtx.Lock()
defer o.respMtx.Unlock()
if !o.running {
return
}
o.running = false
close(o.close) close(o.close)
o.getBroadcaster().Shutdown() o.getBroadcaster().Shutdown()
<-o.done
} }
// Start runs the oracle service in a separate goroutine. // Start runs the oracle service in a separate goroutine.
// The Oracle only starts once, subsequent calls to Start are no-op.
func (o *Oracle) Start() { func (o *Oracle) Start() {
o.respMtx.Lock() o.respMtx.Lock()
if o.running { if o.running {
@ -207,11 +219,11 @@ func (o *Oracle) start() {
} }
tick := time.NewTicker(o.MainCfg.RefreshInterval) tick := time.NewTicker(o.MainCfg.RefreshInterval)
main:
for { for {
select { select {
case <-o.close: case <-o.close:
tick.Stop() break main
return
case <-tick.C: case <-tick.C:
var reprocess []uint64 var reprocess []uint64
o.respMtx.Lock() o.respMtx.Lock()
@ -243,6 +255,17 @@ func (o *Oracle) start() {
} }
} }
} }
tick.Stop()
drain:
for {
select {
case <-o.requestMap:
default:
break drain
}
}
close(o.requestMap)
close(o.done)
} }
// UpdateNativeContract updates native oracle contract info for tx verification. // UpdateNativeContract updates native oracle contract info for tx verification.

View file

@ -14,6 +14,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -32,7 +33,12 @@ type (
OnPayload(p *payload.Extensible) error OnPayload(p *payload.Extensible) error
AddSignature(height uint32, validatorIndex int32, sig []byte) error AddSignature(height uint32, validatorIndex int32, sig []byte) error
GetConfig() config.StateRoot GetConfig() config.StateRoot
// Start runs service instance in a separate goroutine.
// The service only starts once, subsequent calls to Start are no-op.
Start() Start()
// Shutdown stops the service. It can only be called once, subsequent calls
// to Shutdown on the same instance are no-op. The instance that was stopped can
// not be started again by calling Start (use a new instance if needed).
Shutdown() Shutdown()
} }
@ -44,6 +50,7 @@ type (
Network netmode.Magic Network netmode.Magic
log *zap.Logger log *zap.Logger
started *atomic.Bool
accMtx sync.RWMutex accMtx sync.RWMutex
accHeight uint32 accHeight uint32
myIndex byte myIndex byte
@ -57,6 +64,7 @@ type (
maxRetries int maxRetries int
relayExtensible RelayCallback relayExtensible RelayCallback
blockCh chan *block.Block blockCh chan *block.Block
stopCh chan struct{}
done chan struct{} done chan struct{}
} }
) )
@ -72,10 +80,12 @@ func New(cfg config.StateRoot, sm *stateroot.Module, log *zap.Logger, bc Ledger,
s := &service{ s := &service{
Module: sm, Module: sm,
Network: bcConf.Magic, Network: bcConf.Magic,
started: atomic.NewBool(false),
chain: bc, chain: bc,
log: log, log: log,
incompleteRoots: make(map[uint32]*incompleteRoot), incompleteRoots: make(map[uint32]*incompleteRoot),
blockCh: make(chan *block.Block), blockCh: make(chan *block.Block),
stopCh: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
timePerBlock: time.Duration(bcConf.SecondsPerBlock) * time.Second, timePerBlock: time.Duration(bcConf.SecondsPerBlock) * time.Second,
maxRetries: voteValidEndInc, maxRetries: voteValidEndInc,

View file

@ -23,7 +23,11 @@ func (s *service) Name() string {
} }
// Start runs service instance in a separate goroutine. // Start runs service instance in a separate goroutine.
// The service only starts once, subsequent calls to Start are no-op.
func (s *service) Start() { func (s *service) Start() {
if !s.started.CAS(false, true) {
return
}
s.log.Info("starting state validation service") s.log.Info("starting state validation service")
s.chain.SubscribeForBlocks(s.blockCh) s.chain.SubscribeForBlocks(s.blockCh)
go s.run() go s.run()
@ -43,10 +47,11 @@ runloop:
s.srMtx.Lock() s.srMtx.Lock()
delete(s.incompleteRoots, b.Index-voteValidEndInc) delete(s.incompleteRoots, b.Index-voteValidEndInc)
s.srMtx.Unlock() s.srMtx.Unlock()
case <-s.done: case <-s.stopCh:
break runloop break runloop
} }
} }
s.chain.UnsubscribeFromBlocks(s.blockCh)
drainloop: drainloop:
for { for {
select { select {
@ -56,12 +61,18 @@ drainloop:
} }
} }
close(s.blockCh) close(s.blockCh)
close(s.done)
} }
// Shutdown stops the service. // Shutdown stops the service. It can only be called once, subsequent calls
// to Shutdown on the same instance are no-op. The instance that was stopped can
// not be started again by calling Start (use a new instance if needed).
func (s *service) Shutdown() { func (s *service) Shutdown() {
s.chain.UnsubscribeFromBlocks(s.blockCh) if !s.started.CAS(true, false) {
close(s.done) return
}
close(s.stopCh)
<-s.done
} }
func (s *service) signAndSend(r *state.MPTRoot) error { func (s *service) signAndSend(r *state.MPTRoot) error {