From 8444f3d816e860675f53d3a3a9d9dd33b846fbe4 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 9 Feb 2021 16:03:06 +0300 Subject: [PATCH] network: refactor notary service's PostBlock There was a deadlock while trying to finalize transaction during PostBlock: 1) (*Notary).PostBlock is called under the blockchain lock 2) (*Notary).onTransaction is called inside the PostBlock 3) (*Notary).onTransaction needs to RLock the blockchain to add completed transaction to the memory pool (and the blockchain is Lock'ed by this moment) The problem is fixed by using notifications subsistem, because it's not required to call (*Notary).PostBlock under the blockchain lock. --- pkg/core/notary_test.go | 2 +- pkg/network/server.go | 3 --- pkg/services/notary/notary.go | 23 ++++++++++++++++------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/pkg/core/notary_test.go b/pkg/core/notary_test.go index 0a5fdc277..e0443f239 100644 --- a/pkg/core/notary_test.go +++ b/pkg/core/notary_test.go @@ -95,7 +95,7 @@ func TestNotary(t *testing.T) { bc.SetNotary(ntr1) bc.RegisterPostBlock(func(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) { - ntr1.PostPersist(bc, pool, b) + ntr1.PostPersist() }) notaryNodes := keys.PublicKeys{acc1.PrivateKey().PublicKey(), acc2.PrivateKey().PublicKey()} diff --git a/pkg/network/server.go b/pkg/network/server.go index 5de319ce1..79f48a256 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -158,9 +158,6 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai } s.notaryModule = n chain.SetNotary(n) - chain.RegisterPostBlock(func(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) { - s.notaryModule.PostPersist(bc, pool, b) - }) } } else if chain.GetConfig().P2PNotary.Enabled { return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enable") diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 5b4c51f3d..8c30f9ed0 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -44,8 +44,9 @@ type ( mp *mempool.Pool // requests channel - reqCh chan mempool.Event - stopCh chan struct{} + reqCh chan mempool.Event + blocksCh chan *block.Block + stopCh chan struct{} } // Config represents external configuration for Notary module. @@ -109,17 +110,20 @@ func NewNotary(bc blockchainer.Blockchainer, mp *mempool.Pool, log *zap.Logger, onTransaction: onTransaction, mp: mp, reqCh: make(chan mempool.Event), + blocksCh: make(chan *block.Block), stopCh: make(chan struct{}), }, nil } // Run runs Notary module and should be called in a separate goroutine. func (n *Notary) Run() { + n.Config.Chain.SubscribeForBlocks(n.blocksCh) n.mp.SubscribeForTransactions(n.reqCh) for { select { case <-n.stopCh: n.mp.UnsubscribeFromTransactions(n.reqCh) + n.Config.Chain.UnsubscribeFromBlocks(n.blocksCh) return case event := <-n.reqCh: if req, ok := event.Data.(*payload.P2PNotaryRequest); ok { @@ -130,6 +134,9 @@ func (n *Notary) Run() { n.OnRequestRemoval(req) } } + case <-n.blocksCh: + // new block was added, need to check for valid fallbacks + n.PostPersist() } } } @@ -259,16 +266,18 @@ func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) { } } -// PostPersist is a callback which is called after new block is persisted. -func (n *Notary) PostPersist(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) { +// PostPersist is a callback which is called after new block event is received. +// PostPersist must not be called under the blockchain lock, because it uses finalization function. +func (n *Notary) PostPersist() { if n.getAccount() == nil { return } n.reqMtx.Lock() defer n.reqMtx.Unlock() + currHeight := n.Config.Chain.BlockHeight() for h, r := range n.requests { - if !r.isSent && r.typ != Unknown && r.nSigs == r.nSigsCollected && r.minNotValidBefore > bc.BlockHeight() { + if !r.isSent && r.typ != Unknown && r.nSigs == r.nSigsCollected && r.minNotValidBefore > currHeight { if err := n.finalize(r.main); err != nil { n.Config.Log.Error("failed to finalize main transaction", zap.Error(err)) } else { @@ -276,10 +285,10 @@ func (n *Notary) PostPersist(bc blockchainer.Blockchainer, pool *mempool.Pool, b } continue } - if r.minNotValidBefore <= bc.BlockHeight() { // then at least one of the fallbacks can already be sent. + if r.minNotValidBefore <= currHeight { // then at least one of the fallbacks can already be sent. newFallbacks := r.fallbacks[:0] for _, fb := range r.fallbacks { - if nvb := fb.GetAttributes(transaction.NotValidBeforeT)[0].Value.(*transaction.NotValidBefore).Height; nvb <= bc.BlockHeight() { + if nvb := fb.GetAttributes(transaction.NotValidBeforeT)[0].Value.(*transaction.NotValidBefore).Height; nvb <= currHeight { if err := n.finalize(fb); err != nil { newFallbacks = append(newFallbacks, fb) // wait for the next block to resend them }