diff --git a/pkg/core/notary_test.go b/pkg/core/notary_test.go index 0a5fdc277..e0443f239 100644 --- a/pkg/core/notary_test.go +++ b/pkg/core/notary_test.go @@ -95,7 +95,7 @@ func TestNotary(t *testing.T) { bc.SetNotary(ntr1) bc.RegisterPostBlock(func(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) { - ntr1.PostPersist(bc, pool, b) + ntr1.PostPersist() }) notaryNodes := keys.PublicKeys{acc1.PrivateKey().PublicKey(), acc2.PrivateKey().PublicKey()} diff --git a/pkg/network/server.go b/pkg/network/server.go index 5de319ce1..79f48a256 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -158,9 +158,6 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai } s.notaryModule = n chain.SetNotary(n) - chain.RegisterPostBlock(func(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) { - s.notaryModule.PostPersist(bc, pool, b) - }) } } else if chain.GetConfig().P2PNotary.Enabled { return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enable") diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 5b4c51f3d..8c30f9ed0 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -44,8 +44,9 @@ type ( mp *mempool.Pool // requests channel - reqCh chan mempool.Event - stopCh chan struct{} + reqCh chan mempool.Event + blocksCh chan *block.Block + stopCh chan struct{} } // Config represents external configuration for Notary module. @@ -109,17 +110,20 @@ func NewNotary(bc blockchainer.Blockchainer, mp *mempool.Pool, log *zap.Logger, onTransaction: onTransaction, mp: mp, reqCh: make(chan mempool.Event), + blocksCh: make(chan *block.Block), stopCh: make(chan struct{}), }, nil } // Run runs Notary module and should be called in a separate goroutine. func (n *Notary) Run() { + n.Config.Chain.SubscribeForBlocks(n.blocksCh) n.mp.SubscribeForTransactions(n.reqCh) for { select { case <-n.stopCh: n.mp.UnsubscribeFromTransactions(n.reqCh) + n.Config.Chain.UnsubscribeFromBlocks(n.blocksCh) return case event := <-n.reqCh: if req, ok := event.Data.(*payload.P2PNotaryRequest); ok { @@ -130,6 +134,9 @@ func (n *Notary) Run() { n.OnRequestRemoval(req) } } + case <-n.blocksCh: + // new block was added, need to check for valid fallbacks + n.PostPersist() } } } @@ -259,16 +266,18 @@ func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) { } } -// PostPersist is a callback which is called after new block is persisted. -func (n *Notary) PostPersist(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) { +// PostPersist is a callback which is called after new block event is received. +// PostPersist must not be called under the blockchain lock, because it uses finalization function. +func (n *Notary) PostPersist() { if n.getAccount() == nil { return } n.reqMtx.Lock() defer n.reqMtx.Unlock() + currHeight := n.Config.Chain.BlockHeight() for h, r := range n.requests { - if !r.isSent && r.typ != Unknown && r.nSigs == r.nSigsCollected && r.minNotValidBefore > bc.BlockHeight() { + if !r.isSent && r.typ != Unknown && r.nSigs == r.nSigsCollected && r.minNotValidBefore > currHeight { if err := n.finalize(r.main); err != nil { n.Config.Log.Error("failed to finalize main transaction", zap.Error(err)) } else { @@ -276,10 +285,10 @@ func (n *Notary) PostPersist(bc blockchainer.Blockchainer, pool *mempool.Pool, b } continue } - if r.minNotValidBefore <= bc.BlockHeight() { // then at least one of the fallbacks can already be sent. + if r.minNotValidBefore <= currHeight { // then at least one of the fallbacks can already be sent. newFallbacks := r.fallbacks[:0] for _, fb := range r.fallbacks { - if nvb := fb.GetAttributes(transaction.NotValidBeforeT)[0].Value.(*transaction.NotValidBefore).Height; nvb <= bc.BlockHeight() { + if nvb := fb.GetAttributes(transaction.NotValidBeforeT)[0].Value.(*transaction.NotValidBefore).Height; nvb <= currHeight { if err := n.finalize(fb); err != nil { newFallbacks = append(newFallbacks, fb) // wait for the next block to resend them }