Merge pull request #1877 from nspcc-dev/fix-services

Fix services
This commit is contained in:
Roman Khimov 2021-04-02 13:26:18 +03:00 committed by GitHub
commit dbcd628071
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 59 additions and 67 deletions

View file

@ -244,6 +244,7 @@ func (s *service) newPrepareRequest() payload.PrepareRequest {
func (s *service) Start() {
if s.started.CAS(false, true) {
s.log.Info("starting consensus service")
s.dbft.Start()
s.Chain.SubscribeForBlocks(s.blockEvents)
go s.eventLoop()
@ -252,8 +253,10 @@ func (s *service) Start() {
// Shutdown implements Service interface.
func (s *service) Shutdown() {
close(s.quit)
<-s.finished
if s.started.Load() {
close(s.quit)
<-s.finished
}
}
func (s *service) eventLoop() {

View file

@ -83,7 +83,7 @@ func TestStateRoot(t *testing.T) {
defer os.RemoveAll(tmpDir)
w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass")
cfg := createStateRootConfig(w.Path(), "pass")
srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc)
srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc, nil)
require.NoError(t, err)
require.EqualValues(t, 0, srv.CurrentValidatedHeight())
r, err := srv.GetStateRoot(bc.BlockHeight())
@ -153,7 +153,7 @@ func TestStateRootInitNonZeroHeight(t *testing.T) {
defer os.RemoveAll(tmpDir)
w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass")
cfg := createStateRootConfig(w.Path(), "pass")
srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc)
srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc, nil)
require.NoError(t, err)
r, err := srv.GetStateRoot(2)
require.NoError(t, err)
@ -199,17 +199,16 @@ func TestStateRootFull(t *testing.T) {
h, pubs, accs := newMajorityMultisigWithGAS(t, 2)
w := createAndWriteWallet(t, accs[1], path.Join(tmpDir, "wallet2"), "two")
cfg := createStateRootConfig(w.Path(), "two")
srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc)
require.NoError(t, err)
srv.Run()
t.Cleanup(srv.Shutdown)
var lastValidated atomic.Value
var lastHeight atomic.Uint32
srv.SetRelayCallback(func(ep *payload.Extensible) {
srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc, func(ep *payload.Extensible) {
lastHeight.Store(ep.ValidBlockStart)
lastValidated.Store(ep)
})
require.NoError(t, err)
srv.Run()
t.Cleanup(srv.Shutdown)
bc.setNodesByRole(t, true, noderoles.StateValidator, pubs)
transferTokenFromMultisigAccount(t, bc, h, bc.contracts.GAS.Hash, 1_0000_0000)

View file

@ -85,8 +85,7 @@ type (
transactions chan *transaction.Transaction
consensusStarted *atomic.Bool
canHandleExtens *atomic.Bool
syncReached *atomic.Bool
oracle *oracle.Oracle
stateRoot stateroot.Service
@ -132,8 +131,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
register: make(chan Peer),
unregister: make(chan peerDrop),
peers: make(map[Peer]bool),
consensusStarted: atomic.NewBool(false),
canHandleExtens: atomic.NewBool(false),
syncReached: atomic.NewBool(false),
extensiblePool: extpool.New(chain),
log: log,
transactions: make(chan *transaction.Transaction, 64),
@ -168,8 +166,8 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enable")
}
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) {
if !s.consensusStarted.Load() {
s.tryStartConsensus()
if !s.syncReached.Load() {
s.tryStartServices()
}
})
@ -177,7 +175,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled")
}
sr, err := stateroot.New(config.StateRootCfg, s.log, chain)
sr, err := stateroot.New(config.StateRootCfg, s.log, chain, s.handleNewPayload)
if err != nil {
return nil, fmt.Errorf("can't initialize StateRoot service: %w", err)
}
@ -221,10 +219,6 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
s.consensus = srv
if config.StateRootCfg.Enabled {
s.stateRoot.SetRelayCallback(s.handleNewPayload)
}
if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",
zap.Int("configured", s.MinPeers),
@ -267,20 +261,10 @@ func (s *Server) Start(errChan chan error) {
zap.Uint32("blockHeight", s.chain.BlockHeight()),
zap.Uint32("headerHeight", s.chain.HeaderHeight()))
s.tryStartConsensus()
s.tryStartServices()
s.initStaleMemPools()
go s.broadcastTxLoop()
if s.oracle != nil {
go s.oracle.Run()
}
if s.notaryModule != nil {
s.notaryRequestPool.RunSubscriptions()
go s.notaryModule.Run()
}
if s.StateRootCfg.Enabled {
s.stateRoot.Run()
}
go s.relayBlocksLoop()
go s.bQueue.run()
go s.transport.Accept()
@ -293,9 +277,7 @@ func (s *Server) Shutdown() {
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
s.transport.Close()
s.discovery.Close()
if s.consensusStarted.Load() {
s.consensus.Shutdown()
}
s.consensus.Shutdown()
for p := range s.Peers() {
p.Disconnect(errServerShutdown)
}
@ -447,16 +429,26 @@ func (s *Server) runProto() {
}
}
func (s *Server) tryStartConsensus() {
if s.Wallet == nil || s.consensusStarted.Load() {
func (s *Server) tryStartServices() {
if s.syncReached.Load() {
return
}
if s.IsInSync() {
s.log.Info("node reached synchronized state, starting consensus")
if s.consensusStarted.CAS(false, true) {
if s.IsInSync() && s.syncReached.CAS(false, true) {
s.log.Info("node reached synchronized state, starting services")
if s.Wallet != nil {
s.consensus.Start()
}
if s.StateRootCfg.Enabled {
s.stateRoot.Run()
}
if s.oracle != nil {
go s.oracle.Run()
}
if s.notaryModule != nil {
s.notaryRequestPool.RunSubscriptions()
go s.notaryModule.Run()
}
}
}
@ -815,11 +807,8 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error
// handleExtensibleCmd processes received extensible payload.
func (s *Server) handleExtensibleCmd(e *payload.Extensible) error {
if !s.canHandleExtens.Load() {
if !s.IsInSync() {
return nil
}
s.canHandleExtens.Store(true)
if !s.syncReached.Load() {
return nil
}
ok, err := s.extensiblePool.Add(e)
if err != nil {
@ -1053,7 +1042,7 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
}
go peer.StartProtocol()
s.tryStartConsensus()
s.tryStartServices()
default:
return fmt.Errorf("received '%s' during handshake", msg.Command.String())
}

View file

@ -105,7 +105,7 @@ func TestServerStartAndShutdown(t *testing.T) {
<-ch
require.True(t, s.transport.(*fakeTransp).closed.Load())
require.False(t, s.consensus.(*fakeConsensus).stopped.Load())
require.True(t, s.consensus.(*fakeConsensus).stopped.Load())
err, ok := p.droppedWith.Load().(error)
require.True(t, ok)
require.True(t, errors.Is(err, errServerShutdown))
@ -403,6 +403,8 @@ func TestConsensus(t *testing.T) {
atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4)
p := newLocalPeer(t, s)
p.handshaked = true
s.register <- p
require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10)
newConsensusMessage := func(start, end uint32) *Message {
pl := payload.NewExtensible()

View file

@ -37,7 +37,7 @@ func (r *RPCBroadcaster) NewRPCClient(addr string, method SendMethod, timeout ti
func (c *RPCClient) run() {
// We ignore error as not every node can be available on startup.
c.client, _ = client.New(context.Background(), "http://"+c.addr, client.Options{
c.client, _ = client.New(context.Background(), c.addr, client.Options{
DialTimeout: c.sendTimeout,
RequestTimeout: c.sendTimeout,
})
@ -48,7 +48,7 @@ func (c *RPCClient) run() {
case ps := <-c.responses:
if c.client == nil {
var err error
c.client, err = client.New(context.Background(), "http://"+c.addr, client.Options{
c.client, err = client.New(context.Background(), c.addr, client.Options{
DialTimeout: c.sendTimeout,
RequestTimeout: c.sendTimeout,
})

View file

@ -117,6 +117,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu
// Run runs Notary module and should be called in a separate goroutine.
func (n *Notary) Run() {
n.Config.Log.Info("starting notary service")
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh)
for {

View file

@ -162,6 +162,7 @@ func (o *Oracle) Shutdown() {
// Run runs must be executed in a separate goroutine.
func (o *Oracle) Run() {
o.Log.Info("starting oracle service")
for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ {
go o.runRequestWorker()
}

View file

@ -94,18 +94,5 @@ func (s *service) sendValidatedRoot(r *state.MPTRoot, priv *keys.PrivateKey) {
buf := io.NewBufBinWriter()
emit.Bytes(buf.BinWriter, sig)
ep.Witness.InvocationScript = buf.Bytes()
s.getRelayCallback()(ep)
}
func (s *service) getRelayCallback() RelayCallback {
s.cbMtx.RLock()
defer s.cbMtx.RUnlock()
return s.onValidatedRoot
}
// SetRelayCallback sets callback to pool and broadcast tx.
func (s *service) SetRelayCallback(cb RelayCallback) {
s.cbMtx.Lock()
defer s.cbMtx.Unlock()
s.onValidatedRoot = cb
s.onValidatedRoot(ep)
}

View file

@ -24,7 +24,6 @@ type (
OnPayload(p *payload.Extensible) error
AddSignature(height uint32, validatorIndex int32, sig []byte) error
GetConfig() config.StateRoot
SetRelayCallback(RelayCallback)
Run()
Shutdown()
}
@ -46,7 +45,6 @@ type (
srMtx sync.Mutex
incompleteRoots map[uint32]*incompleteRoot
cbMtx sync.RWMutex
onValidatedRoot RelayCallback
blockCh chan *block.Block
done chan struct{}
@ -59,7 +57,7 @@ const (
)
// New returns new state root service instance using underlying module.
func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer) (Service, error) {
func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer, cb RelayCallback) (Service, error) {
s := &service{
StateRoot: bc.GetStateModule(),
Network: bc.GetConfig().Magic,
@ -68,6 +66,7 @@ func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer) (S
incompleteRoots: make(map[uint32]*incompleteRoot),
blockCh: make(chan *block.Block),
done: make(chan struct{}),
onValidatedRoot: cb,
}
s.MainCfg = cfg

View file

@ -12,11 +12,13 @@ import (
// Run runs service instance in a separate goroutine.
func (s *service) Run() {
s.log.Info("starting state validation service")
s.chain.SubscribeForBlocks(s.blockCh)
go s.run()
}
func (s *service) run() {
runloop:
for {
select {
case b := <-s.blockCh:
@ -27,13 +29,22 @@ func (s *service) run() {
s.log.Error("can't sign or send state root", zap.Error(err))
}
case <-s.done:
return
break runloop
}
}
drainloop:
for {
select {
case <-s.blockCh:
default:
break drainloop
}
}
}
// Shutdown stops the service.
func (s *service) Shutdown() {
s.chain.UnsubscribeFromBlocks(s.blockCh)
close(s.done)
}
@ -81,7 +92,7 @@ func (s *service) signAndSend(r *state.MPTRoot) error {
buf := io.NewBufBinWriter()
emit.Bytes(buf.BinWriter, sig)
e.Witness.InvocationScript = buf.Bytes()
s.getRelayCallback()(e)
s.onValidatedRoot(e)
return nil
}