network: allow to drop services and solve concurrency issues

Now that services can come and go we need to protect all of the associated
fields and allow to deregister them.
This commit is contained in:
Roman Khimov 2022-07-27 11:25:58 +03:00
parent 5a7fa2d3df
commit 94a8784dcb
2 changed files with 81 additions and 10 deletions

View file

@ -549,9 +549,10 @@ Main:
configureAddresses(&cfgnew.ApplicationConfiguration) configureAddresses(&cfgnew.ApplicationConfiguration)
switch sig { switch sig {
case syscall.SIGHUP: case syscall.SIGHUP:
serv.DelService(&rpcServer)
rpcServer.Shutdown() rpcServer.Shutdown()
rpcServer = rpcsrv.New(chain, cfgnew.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) rpcServer = rpcsrv.New(chain, cfgnew.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan)
serv.AddService(&rpcServer) // Replaces old one by service name. serv.AddService(&rpcServer)
if !cfgnew.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() { if !cfgnew.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() {
rpcServer.Start() rpcServer.Start()
} }
@ -563,6 +564,7 @@ Main:
go prometheus.Start() go prometheus.Start()
case syscall.SIGUSR1: case syscall.SIGUSR1:
if oracleSrv != nil { if oracleSrv != nil {
serv.DelService(oracleSrv)
chain.SetOracle(nil) chain.SetOracle(nil)
rpcServer.SetOracleHandler(nil) rpcServer.SetOracleHandler(nil)
oracleSrv.Shutdown() oracleSrv.Shutdown()
@ -579,6 +581,7 @@ Main:
} }
} }
if p2pNotary != nil { if p2pNotary != nil {
serv.DelService(p2pNotary)
chain.SetNotary(nil) chain.SetNotary(nil)
p2pNotary.Shutdown() p2pNotary.Shutdown()
} }
@ -590,6 +593,7 @@ Main:
if p2pNotary != nil && serv.IsInSync() { if p2pNotary != nil && serv.IsInSync() {
p2pNotary.Start() p2pNotary.Start()
} }
serv.DelExtensibleService(sr, stateroot.Category)
srMod.SetUpdateValidatorsCallback(nil) srMod.SetUpdateValidatorsCallback(nil)
sr.Shutdown() sr.Shutdown()
sr, err = stateroot.New(cfgnew.ApplicationConfiguration.StateRoot, srMod, log, chain, serv.BroadcastExtensible) sr, err = stateroot.New(cfgnew.ApplicationConfiguration.StateRoot, srMod, log, chain, serv.BroadcastExtensible)
@ -603,6 +607,7 @@ Main:
} }
case syscall.SIGUSR2: case syscall.SIGUSR2:
if dbftSrv != nil { if dbftSrv != nil {
serv.DelExtensibleHPService(dbftSrv, consensus.Category)
dbftSrv.Shutdown() dbftSrv.Shutdown()
} }
dbftSrv, err = mkConsensus(cfgnew.ApplicationConfiguration.UnlockWallet, serverConfig.TimePerBlock, chain, serv, log) dbftSrv, err = mkConsensus(cfgnew.ApplicationConfiguration.UnlockWallet, serverConfig.TimePerBlock, chain, serv, log)

View file

@ -101,10 +101,12 @@ type (
notaryRequestPool *mempool.Pool notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool extensiblePool *extpool.Pool
notaryFeer NotaryFeer notaryFeer NotaryFeer
services map[string]Service
extensHandlers map[string]func(*payload.Extensible) error serviceLock sync.RWMutex
extensHighPrio string services map[string]Service
txCallback func(*transaction.Transaction) extensHandlers map[string]func(*payload.Extensible) error
extensHighPrio string
txCallback func(*transaction.Transaction)
txInLock sync.Mutex txInLock sync.Mutex
txInMap map[util.Uint256]struct{} txInMap map[util.Uint256]struct{}
@ -263,9 +265,11 @@ func (s *Server) Shutdown() {
} }
s.bQueue.discard() s.bQueue.discard()
s.bSyncQueue.discard() s.bSyncQueue.discard()
s.serviceLock.RLock()
for _, svc := range s.services { for _, svc := range s.services {
svc.Shutdown() svc.Shutdown()
} }
s.serviceLock.RUnlock()
if s.chain.P2PSigExtensionsEnabled() { if s.chain.P2PSigExtensionsEnabled() {
s.notaryRequestPool.StopSubscriptions() s.notaryRequestPool.StopSubscriptions()
} }
@ -274,20 +278,72 @@ func (s *Server) Shutdown() {
// AddService allows to add a service to be started/stopped by Server. // AddService allows to add a service to be started/stopped by Server.
func (s *Server) AddService(svc Service) { func (s *Server) AddService(svc Service) {
s.serviceLock.Lock()
defer s.serviceLock.Unlock()
s.addService(svc)
}
// addService is an unlocked version of AddService.
func (s *Server) addService(svc Service) {
s.services[svc.Name()] = svc s.services[svc.Name()] = svc
} }
// AddExtensibleService register a service that handles an extensible payload of some kind. // AddExtensibleService register a service that handles an extensible payload of some kind.
func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) { func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) {
s.serviceLock.Lock()
defer s.serviceLock.Unlock()
s.addExtensibleService(svc, category, handler)
}
// addExtensibleService is an unlocked version of AddExtensibleService.
func (s *Server) addExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) {
s.extensHandlers[category] = handler s.extensHandlers[category] = handler
s.AddService(svc) s.addService(svc)
} }
// AddExtensibleHPService registers a high-priority service that handles an extensible payload of some kind. // AddExtensibleHPService registers a high-priority service that handles an extensible payload of some kind.
func (s *Server) AddExtensibleHPService(svc Service, category string, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) { func (s *Server) AddExtensibleHPService(svc Service, category string, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) {
s.serviceLock.Lock()
defer s.serviceLock.Unlock()
s.txCallback = txCallback s.txCallback = txCallback
s.extensHighPrio = category s.extensHighPrio = category
s.AddExtensibleService(svc, category, handler) s.addExtensibleService(svc, category, handler)
}
// DelService drops a service from the list, use it when the service is stopped
// outside of the Server.
func (s *Server) DelService(svc Service) {
s.serviceLock.Lock()
defer s.serviceLock.Unlock()
s.delService(svc)
}
// delService is an unlocked version of DelService.
func (s *Server) delService(svc Service) {
delete(s.services, svc.Name())
}
// DelExtensibleService drops a service that handler extensible payloads from the
// list, use it when the service is stopped outside of the Server.
func (s *Server) DelExtensibleService(svc Service, category string) {
s.serviceLock.Lock()
defer s.serviceLock.Unlock()
s.delExtensibleService(svc, category)
}
// delExtensibleService is an unlocked version of DelExtensibleService.
func (s *Server) delExtensibleService(svc Service, category string) {
delete(s.extensHandlers, category)
s.delService(svc)
}
// DelExtensibleHPService unregisters a high-priority service that handles an extensible payload of some kind.
func (s *Server) DelExtensibleHPService(svc Service, category string) {
s.serviceLock.Lock()
defer s.serviceLock.Unlock()
s.txCallback = nil
s.extensHighPrio = ""
s.delExtensibleService(svc, category)
} }
// GetNotaryPool allows to retrieve notary pool, if it's configured. // GetNotaryPool allows to retrieve notary pool, if it's configured.
@ -428,9 +484,11 @@ func (s *Server) tryStartServices() {
if s.chain.P2PSigExtensionsEnabled() { if s.chain.P2PSigExtensionsEnabled() {
s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber. s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber.
} }
s.serviceLock.RLock()
for _, svc := range s.services { for _, svc := range s.services {
svc.Start() svc.Start()
} }
s.serviceLock.RUnlock()
} }
} }
@ -931,7 +989,9 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error {
if !ok { // payload is already in cache if !ok { // payload is already in cache
return nil return nil
} }
s.serviceLock.RLock()
handler := s.extensHandlers[e.Category] handler := s.extensHandlers[e.Category]
s.serviceLock.RUnlock()
if handler != nil { if handler != nil {
err = handler(e) err = handler(e)
if err != nil { if err != nil {
@ -944,7 +1004,10 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error {
func (s *Server) advertiseExtensible(e *payload.Extensible) { func (s *Server) advertiseExtensible(e *payload.Extensible) {
msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()})) msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()}))
if e.Category == s.extensHighPrio { s.serviceLock.RLock()
hp := s.extensHighPrio
s.serviceLock.RUnlock()
if e.Category == hp {
// It's high priority because it directly affects consensus process, // It's high priority because it directly affects consensus process,
// even though it's just an inv. // even though it's just an inv.
s.broadcastHPMessage(msg) s.broadcastHPMessage(msg)
@ -966,8 +1029,11 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
} }
s.txInMap[tx.Hash()] = struct{}{} s.txInMap[tx.Hash()] = struct{}{}
s.txInLock.Unlock() s.txInLock.Unlock()
if s.txCallback != nil { s.serviceLock.RLock()
s.txCallback(tx) txCallback := s.txCallback
s.serviceLock.RUnlock()
if txCallback != nil {
txCallback(tx)
} }
if s.verifyAndPoolTX(tx) == nil { if s.verifyAndPoolTX(tx) == nil {
s.broadcastTX(tx, nil) s.broadcastTX(tx, nil)