network: refactor notary service's PostBlock

There was a deadlock while trying to finalize transaction during
PostBlock:
	1) (*Notary).PostBlock is called under the blockchain lock
	2) (*Notary).onTransaction is called inside the PostBlock
	3) (*Notary).onTransaction needs to RLock the blockchain to add
completed transaction to the memory pool (and the blockchain is Lock'ed
by this moment)

The problem is fixed by using notifications subsistem, because it's not
required to call (*Notary).PostBlock under the blockchain lock.
This commit is contained in:
Anna Shaleva 2021-02-09 16:03:06 +03:00
parent 5d6fdda664
commit 8444f3d816
3 changed files with 17 additions and 11 deletions

View file

@ -95,7 +95,7 @@ func TestNotary(t *testing.T) {
bc.SetNotary(ntr1) bc.SetNotary(ntr1)
bc.RegisterPostBlock(func(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) { bc.RegisterPostBlock(func(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) {
ntr1.PostPersist(bc, pool, b) ntr1.PostPersist()
}) })
notaryNodes := keys.PublicKeys{acc1.PrivateKey().PublicKey(), acc2.PrivateKey().PublicKey()} notaryNodes := keys.PublicKeys{acc1.PrivateKey().PublicKey(), acc2.PrivateKey().PublicKey()}

View file

@ -158,9 +158,6 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
} }
s.notaryModule = n s.notaryModule = n
chain.SetNotary(n) chain.SetNotary(n)
chain.RegisterPostBlock(func(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) {
s.notaryModule.PostPersist(bc, pool, b)
})
} }
} else if chain.GetConfig().P2PNotary.Enabled { } else if chain.GetConfig().P2PNotary.Enabled {
return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enable") return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enable")

View file

@ -45,6 +45,7 @@ type (
mp *mempool.Pool mp *mempool.Pool
// requests channel // requests channel
reqCh chan mempool.Event reqCh chan mempool.Event
blocksCh chan *block.Block
stopCh chan struct{} stopCh chan struct{}
} }
@ -109,17 +110,20 @@ func NewNotary(bc blockchainer.Blockchainer, mp *mempool.Pool, log *zap.Logger,
onTransaction: onTransaction, onTransaction: onTransaction,
mp: mp, mp: mp,
reqCh: make(chan mempool.Event), reqCh: make(chan mempool.Event),
blocksCh: make(chan *block.Block),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
}, nil }, nil
} }
// Run runs Notary module and should be called in a separate goroutine. // Run runs Notary module and should be called in a separate goroutine.
func (n *Notary) Run() { func (n *Notary) Run() {
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh) n.mp.SubscribeForTransactions(n.reqCh)
for { for {
select { select {
case <-n.stopCh: case <-n.stopCh:
n.mp.UnsubscribeFromTransactions(n.reqCh) n.mp.UnsubscribeFromTransactions(n.reqCh)
n.Config.Chain.UnsubscribeFromBlocks(n.blocksCh)
return return
case event := <-n.reqCh: case event := <-n.reqCh:
if req, ok := event.Data.(*payload.P2PNotaryRequest); ok { if req, ok := event.Data.(*payload.P2PNotaryRequest); ok {
@ -130,6 +134,9 @@ func (n *Notary) Run() {
n.OnRequestRemoval(req) n.OnRequestRemoval(req)
} }
} }
case <-n.blocksCh:
// new block was added, need to check for valid fallbacks
n.PostPersist()
} }
} }
} }
@ -259,16 +266,18 @@ func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) {
} }
} }
// PostPersist is a callback which is called after new block is persisted. // PostPersist is a callback which is called after new block event is received.
func (n *Notary) PostPersist(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) { // PostPersist must not be called under the blockchain lock, because it uses finalization function.
func (n *Notary) PostPersist() {
if n.getAccount() == nil { if n.getAccount() == nil {
return return
} }
n.reqMtx.Lock() n.reqMtx.Lock()
defer n.reqMtx.Unlock() defer n.reqMtx.Unlock()
currHeight := n.Config.Chain.BlockHeight()
for h, r := range n.requests { for h, r := range n.requests {
if !r.isSent && r.typ != Unknown && r.nSigs == r.nSigsCollected && r.minNotValidBefore > bc.BlockHeight() { if !r.isSent && r.typ != Unknown && r.nSigs == r.nSigsCollected && r.minNotValidBefore > currHeight {
if err := n.finalize(r.main); err != nil { if err := n.finalize(r.main); err != nil {
n.Config.Log.Error("failed to finalize main transaction", zap.Error(err)) n.Config.Log.Error("failed to finalize main transaction", zap.Error(err))
} else { } else {
@ -276,10 +285,10 @@ func (n *Notary) PostPersist(bc blockchainer.Blockchainer, pool *mempool.Pool, b
} }
continue continue
} }
if r.minNotValidBefore <= bc.BlockHeight() { // then at least one of the fallbacks can already be sent. if r.minNotValidBefore <= currHeight { // then at least one of the fallbacks can already be sent.
newFallbacks := r.fallbacks[:0] newFallbacks := r.fallbacks[:0]
for _, fb := range r.fallbacks { for _, fb := range r.fallbacks {
if nvb := fb.GetAttributes(transaction.NotValidBeforeT)[0].Value.(*transaction.NotValidBefore).Height; nvb <= bc.BlockHeight() { if nvb := fb.GetAttributes(transaction.NotValidBeforeT)[0].Value.(*transaction.NotValidBefore).Height; nvb <= currHeight {
if err := n.finalize(fb); err != nil { if err := n.finalize(fb); err != nil {
newFallbacks = append(newFallbacks, fb) // wait for the next block to resend them newFallbacks = append(newFallbacks, fb) // wait for the next block to resend them
} }