From 94a8784dcbb123664e45cbdf6a0b4bdd67c745e3 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 27 Jul 2022 11:25:58 +0300 Subject: [PATCH] network: allow to drop services and solve concurrency issues Now that services can come and go we need to protect all of the associated fields and allow to deregister them. --- cli/server/server.go | 7 +++- pkg/network/server.go | 84 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index 4c60e3ce4..505338b49 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -549,9 +549,10 @@ Main: configureAddresses(&cfgnew.ApplicationConfiguration) switch sig { case syscall.SIGHUP: + serv.DelService(&rpcServer) rpcServer.Shutdown() rpcServer = rpcsrv.New(chain, cfgnew.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) - serv.AddService(&rpcServer) // Replaces old one by service name. + serv.AddService(&rpcServer) if !cfgnew.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() { rpcServer.Start() } @@ -563,6 +564,7 @@ Main: go prometheus.Start() case syscall.SIGUSR1: if oracleSrv != nil { + serv.DelService(oracleSrv) chain.SetOracle(nil) rpcServer.SetOracleHandler(nil) oracleSrv.Shutdown() @@ -579,6 +581,7 @@ Main: } } if p2pNotary != nil { + serv.DelService(p2pNotary) chain.SetNotary(nil) p2pNotary.Shutdown() } @@ -590,6 +593,7 @@ Main: if p2pNotary != nil && serv.IsInSync() { p2pNotary.Start() } + serv.DelExtensibleService(sr, stateroot.Category) srMod.SetUpdateValidatorsCallback(nil) sr.Shutdown() sr, err = stateroot.New(cfgnew.ApplicationConfiguration.StateRoot, srMod, log, chain, serv.BroadcastExtensible) @@ -603,6 +607,7 @@ Main: } case syscall.SIGUSR2: if dbftSrv != nil { + serv.DelExtensibleHPService(dbftSrv, consensus.Category) dbftSrv.Shutdown() } dbftSrv, err = mkConsensus(cfgnew.ApplicationConfiguration.UnlockWallet, serverConfig.TimePerBlock, chain, serv, log) diff --git a/pkg/network/server.go b/pkg/network/server.go index 929271e99..b69be9ebc 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -101,10 +101,12 @@ type ( notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer - services map[string]Service - extensHandlers map[string]func(*payload.Extensible) error - extensHighPrio string - txCallback func(*transaction.Transaction) + + serviceLock sync.RWMutex + services map[string]Service + extensHandlers map[string]func(*payload.Extensible) error + extensHighPrio string + txCallback func(*transaction.Transaction) txInLock sync.Mutex txInMap map[util.Uint256]struct{} @@ -263,9 +265,11 @@ func (s *Server) Shutdown() { } s.bQueue.discard() s.bSyncQueue.discard() + s.serviceLock.RLock() for _, svc := range s.services { svc.Shutdown() } + s.serviceLock.RUnlock() if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.StopSubscriptions() } @@ -274,20 +278,72 @@ func (s *Server) Shutdown() { // AddService allows to add a service to be started/stopped by Server. func (s *Server) AddService(svc Service) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() + s.addService(svc) +} + +// addService is an unlocked version of AddService. +func (s *Server) addService(svc Service) { s.services[svc.Name()] = svc } // AddExtensibleService register a service that handles an extensible payload of some kind. func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() + s.addExtensibleService(svc, category, handler) +} + +// addExtensibleService is an unlocked version of AddExtensibleService. +func (s *Server) addExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) { s.extensHandlers[category] = handler - s.AddService(svc) + s.addService(svc) } // AddExtensibleHPService registers a high-priority service that handles an extensible payload of some kind. func (s *Server) AddExtensibleHPService(svc Service, category string, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() s.txCallback = txCallback s.extensHighPrio = category - s.AddExtensibleService(svc, category, handler) + s.addExtensibleService(svc, category, handler) +} + +// DelService drops a service from the list, use it when the service is stopped +// outside of the Server. +func (s *Server) DelService(svc Service) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() + s.delService(svc) +} + +// delService is an unlocked version of DelService. +func (s *Server) delService(svc Service) { + delete(s.services, svc.Name()) +} + +// DelExtensibleService drops a service that handler extensible payloads from the +// list, use it when the service is stopped outside of the Server. +func (s *Server) DelExtensibleService(svc Service, category string) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() + s.delExtensibleService(svc, category) +} + +// delExtensibleService is an unlocked version of DelExtensibleService. +func (s *Server) delExtensibleService(svc Service, category string) { + delete(s.extensHandlers, category) + s.delService(svc) +} + +// DelExtensibleHPService unregisters a high-priority service that handles an extensible payload of some kind. +func (s *Server) DelExtensibleHPService(svc Service, category string) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() + s.txCallback = nil + s.extensHighPrio = "" + s.delExtensibleService(svc, category) } // GetNotaryPool allows to retrieve notary pool, if it's configured. @@ -428,9 +484,11 @@ func (s *Server) tryStartServices() { if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber. } + s.serviceLock.RLock() for _, svc := range s.services { svc.Start() } + s.serviceLock.RUnlock() } } @@ -931,7 +989,9 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { if !ok { // payload is already in cache return nil } + s.serviceLock.RLock() handler := s.extensHandlers[e.Category] + s.serviceLock.RUnlock() if handler != nil { err = handler(e) if err != nil { @@ -944,7 +1004,10 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { func (s *Server) advertiseExtensible(e *payload.Extensible) { msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()})) - if e.Category == s.extensHighPrio { + s.serviceLock.RLock() + hp := s.extensHighPrio + s.serviceLock.RUnlock() + if e.Category == hp { // It's high priority because it directly affects consensus process, // even though it's just an inv. s.broadcastHPMessage(msg) @@ -966,8 +1029,11 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { } s.txInMap[tx.Hash()] = struct{}{} s.txInLock.Unlock() - if s.txCallback != nil { - s.txCallback(tx) + s.serviceLock.RLock() + txCallback := s.txCallback + s.serviceLock.RUnlock() + if txCallback != nil { + txCallback(tx) } if s.verifyAndPoolTX(tx) == nil { s.broadcastTX(tx, nil)