From 4715e523e09c4ca49c6c8c538165ad74914b7693 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Thu, 22 Feb 2024 16:50:58 +0300 Subject: [PATCH] services: move blockchain/mempool subscriptions to separate routine Start of some services is bound to blockchain subscriptions, and thus, can't be run before the blockchain notifications dispatcher. Signed-off-by: Ekaterina Pavlova --- pkg/consensus/consensus.go | 9 ++++++++- pkg/services/notary/notary.go | 4 ++-- pkg/services/stateroot/validators.go | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index c40e72b16..47aab9c5a 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -281,7 +281,6 @@ func (s *service) Start() { b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block! s.lastTimestamp = b.Timestamp s.dbft.Start(s.lastTimestamp * nsInMs) - s.Chain.SubscribeForBlocks(s.blockEvents) go s.eventLoop() } } @@ -299,6 +298,14 @@ func (s *service) Shutdown() { } func (s *service) eventLoop() { + s.Chain.SubscribeForBlocks(s.blockEvents) + + // Manually sync up with potentially missed fresh blocks that may be added by blockchain + // before the subscription. + b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block! + if b.Timestamp >= s.lastTimestamp { + s.handleChainBlock(b) + } events: for { select { diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 7593c5a2a..f824e2ff1 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -175,13 +175,13 @@ func (n *Notary) Start() { return } n.Config.Log.Info("starting notary service") - n.Config.Chain.SubscribeForBlocks(n.blocksCh) - n.mp.SubscribeForTransactions(n.reqCh) go n.newTxCallbackLoop() go n.mainLoop() } func (n *Notary) mainLoop() { + n.Config.Chain.SubscribeForBlocks(n.blocksCh) + n.mp.SubscribeForTransactions(n.reqCh) mainloop: for { select { diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index a62e75892..afaad8617 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -29,11 +29,11 @@ func (s *service) Start() { return } s.log.Info("starting state validation service") - s.chain.SubscribeForBlocks(s.blockCh) go s.run() } func (s *service) run() { + s.chain.SubscribeForBlocks(s.blockCh) runloop: for { select {