From 3507f52c320805c3e87234dca3c9bd65ec9bc672 Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Tue, 13 Jul 2021 14:22:21 +0300 Subject: [PATCH] notary: process new transactions in a separate goroutine Related #2063. Signed-off-by: Evgeniy Stratonikov --- pkg/core/notary_test.go | 124 ++++++++++++++++++++++------------ pkg/services/notary/notary.go | 101 +++++++++++++++++++++------ 2 files changed, 164 insertions(+), 61 deletions(-) diff --git a/pkg/core/notary_test.go b/pkg/core/notary_test.go index c0089d809..d2355b46e 100644 --- a/pkg/core/notary_test.go +++ b/pkg/core/notary_test.go @@ -70,6 +70,16 @@ func TestNotary(t *testing.T) { finalizeWithError bool choosy bool ) + setFinalizeWithError := func(v bool) { + mtx.Lock() + finalizeWithError = v + mtx.Unlock() + } + setChoosy := func(v bool) { + mtx.Lock() + choosy = v + mtx.Unlock() + } onTransaction := func(tx *transaction.Transaction) error { mtx.Lock() defer mtx.Unlock() @@ -91,6 +101,22 @@ func TestNotary(t *testing.T) { completedTxes[tx.Hash()] = tx return nil } + getCompletedTx := func(t *testing.T, waitForNonNil bool, h util.Uint256) *transaction.Transaction { + if !waitForNonNil { + mtx.RLock() + defer mtx.RUnlock() + return completedTxes[h] + } + + var completedTx *transaction.Transaction + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + completedTx = completedTxes[h] + return completedTx != nil + }, time.Second*3, time.Millisecond*50, errors.New("main transaction expected to be completed")) + return completedTx + } acc1, ntr1, mp1 := getTestNotary(t, bc, "./testdata/notary1.json", "one", onTransaction) acc2, _, _ := getTestNotary(t, bc, "./testdata/notary2.json", "two", onTransaction) @@ -102,6 +128,13 @@ func TestNotary(t *testing.T) { ntr1.PostPersist() }) + mp1.RunSubscriptions() + go ntr1.Run() + t.Cleanup(func() { + ntr1.Stop() + mp1.StopSubscriptions() + }) + notaryNodes := keys.PublicKeys{acc1.PrivateKey().PublicKey(), acc2.PrivateKey().PublicKey()} bc.setNodesByRole(t, true, noderoles.P2PNotary, notaryNodes) @@ -254,9 +287,8 @@ func TestNotary(t *testing.T) { } checkSigTx := func(t *testing.T, requests []*payload.P2PNotaryRequest, sentCount int, shouldComplete bool) { nKeys := len(requests) - completedTx := completedTxes[requests[0].MainTransaction.Hash()] if sentCount == nKeys && shouldComplete { - require.NotNil(t, completedTx, errors.New("main transaction expected to be completed")) + completedTx := getCompletedTx(t, true, requests[0].MainTransaction.Hash()) require.Equal(t, nKeys+1, len(completedTx.Signers)) require.Equal(t, nKeys+1, len(completedTx.Scripts)) @@ -274,13 +306,13 @@ func TestNotary(t *testing.T) { VerificationScript: []byte{}, }, completedTx.Scripts[nKeys]) } else { + completedTx := getCompletedTx(t, false, requests[0].MainTransaction.Hash()) require.Nil(t, completedTx, fmt.Errorf("main transaction shouldn't be completed: sent %d out of %d requests", sentCount, nKeys)) } } checkMultisigTx := func(t *testing.T, nSigs int, requests []*payload.P2PNotaryRequest, sentCount int, shouldComplete bool) { - completedTx := completedTxes[requests[0].MainTransaction.Hash()] if sentCount >= nSigs && shouldComplete { - require.NotNil(t, completedTx, errors.New("main transaction expected to be completed")) + completedTx := getCompletedTx(t, true, requests[0].MainTransaction.Hash()) require.Equal(t, 2, len(completedTx.Signers)) require.Equal(t, 2, len(completedTx.Scripts)) interopCtx := bc.newInteropContext(trigger.Verification, bc.dao, nil, completedTx) @@ -299,14 +331,14 @@ func TestNotary(t *testing.T) { require.False(t, bytes.Contains(completedTx.Scripts[0].InvocationScript, req.MainTransaction.Scripts[0].InvocationScript), fmt.Errorf("signature from extra request #%d shouldn't be presented in the main tx", i)) } } else { + completedTx := getCompletedTx(t, false, requests[0].MainTransaction.Hash()) require.Nil(t, completedTx, fmt.Errorf("main transaction shouldn't be completed: sent %d out of %d requests", sentCount, nSigs)) } } checkFallbackTxs := func(t *testing.T, requests []*payload.P2PNotaryRequest, shouldComplete bool) { for i, req := range requests { - completedTx := completedTxes[req.FallbackTransaction.Hash()] if shouldComplete { - require.NotNil(t, completedTx, fmt.Errorf("fallback transaction for request #%d expected to be completed", i)) + completedTx := getCompletedTx(t, true, req.FallbackTransaction.Hash()) require.Equal(t, 2, len(completedTx.Signers)) require.Equal(t, 2, len(completedTx.Scripts)) require.Equal(t, transaction.Witness{ @@ -321,6 +353,7 @@ func TestNotary(t *testing.T) { _, err := bc.verifyHashAgainstScript(completedTx.Signers[1].Account, &completedTx.Scripts[1], interopCtx, -1) require.NoError(t, err) } else { + completedTx := getCompletedTx(t, false, req.FallbackTransaction.Hash()) require.Nil(t, completedTx, fmt.Errorf("fallback transaction for request #%d shouldn't be completed", i)) } } @@ -419,11 +452,11 @@ func TestNotary(t *testing.T) { checkFallbackTxs(t, r, false) // PostPersist: missing account - finalizeWithError = true + setFinalizeWithError(true) r = checkCompleteStandardRequest(t, 1, false) checkFallbackTxs(t, r, false) - finalizeWithError = false ntr1.UpdateNotaryNodes(keys.PublicKeys{randomAcc.PublicKey()}) + setFinalizeWithError(false) require.NoError(t, bc.AddBlock(bc.newBlock())) checkSigTx(t, r, 1, false) checkFallbackTxs(t, r, false) @@ -431,34 +464,34 @@ func TestNotary(t *testing.T) { ntr1.UpdateNotaryNodes(keys.PublicKeys{acc1.PrivateKey().PublicKey()}) // PostPersist: complete main transaction, signature request - finalizeWithError = true + setFinalizeWithError(true) requests := checkCompleteStandardRequest(t, 3, false) // check PostPersist with finalisation error - finalizeWithError = true + setFinalizeWithError(true) require.NoError(t, bc.AddBlock(bc.newBlock())) checkSigTx(t, requests, len(requests), false) // check PostPersist without finalisation error - finalizeWithError = false + setFinalizeWithError(false) require.NoError(t, bc.AddBlock(bc.newBlock())) checkSigTx(t, requests, len(requests), true) // PostPersist: complete main transaction, multisignature account - finalizeWithError = true + setFinalizeWithError(true) requests = checkCompleteMultisigRequest(t, 3, 4, false) checkFallbackTxs(t, requests, false) // check PostPersist with finalisation error - finalizeWithError = true + setFinalizeWithError(true) require.NoError(t, bc.AddBlock(bc.newBlock())) checkMultisigTx(t, 3, requests, len(requests), false) checkFallbackTxs(t, requests, false) // check PostPersist without finalisation error - finalizeWithError = false + setFinalizeWithError(false) require.NoError(t, bc.AddBlock(bc.newBlock())) checkMultisigTx(t, 3, requests, len(requests), true) checkFallbackTxs(t, requests, false) // PostPersist: complete fallback, signature request - finalizeWithError = true + setFinalizeWithError(true) requests = checkCompleteStandardRequest(t, 3, false) checkFallbackTxs(t, requests, false) // make fallbacks valid @@ -469,7 +502,7 @@ func TestNotary(t *testing.T) { checkSigTx(t, requests, len(requests), false) checkFallbackTxs(t, requests, false) // check PostPersist for valid fallbacks without finalisation error - finalizeWithError = false + setFinalizeWithError(false) require.NoError(t, bc.AddBlock(bc.newBlock())) checkSigTx(t, requests, len(requests), false) checkFallbackTxs(t, requests, true) @@ -477,7 +510,7 @@ func TestNotary(t *testing.T) { // PostPersist: complete fallback, multisignature request nSigs, nKeys := 3, 5 // check OnNewRequest with finalization error - finalizeWithError = true + setFinalizeWithError(true) requests = checkCompleteMultisigRequest(t, nSigs, nKeys, false) checkFallbackTxs(t, requests, false) // make fallbacks valid @@ -488,7 +521,7 @@ func TestNotary(t *testing.T) { checkMultisigTx(t, nSigs, requests, len(requests), false) checkFallbackTxs(t, requests, false) // check PostPersist for valid fallbacks without finalisation error - finalizeWithError = false + setFinalizeWithError(false) require.NoError(t, bc.AddBlock(bc.newBlock())) checkMultisigTx(t, nSigs, requests, len(requests), false) checkFallbackTxs(t, requests[:nSigs], true) @@ -496,7 +529,7 @@ func TestNotary(t *testing.T) { checkFallbackTxs(t, requests[nSigs:], true) // PostPersist: partial fallbacks completion due to finalisation errors - finalizeWithError = true + setFinalizeWithError(true) requests = checkCompleteStandardRequest(t, 5, false) checkFallbackTxs(t, requests, false) // make fallbacks valid @@ -505,15 +538,15 @@ func TestNotary(t *testing.T) { // some of fallbacks should fail finalisation unluckies = []*payload.P2PNotaryRequest{requests[0], requests[4]} lucky := requests[1:4] - choosy = true + setChoosy(true) // check PostPersist for lucky fallbacks require.NoError(t, bc.AddBlock(bc.newBlock())) checkSigTx(t, requests, len(requests), false) checkFallbackTxs(t, lucky, true) checkFallbackTxs(t, unluckies, false) // reset finalisation function for unlucky fallbacks to finalise without an error - choosy = false - finalizeWithError = false + setChoosy(false) + setFinalizeWithError(false) // check PostPersist for unlucky fallbacks require.NoError(t, bc.AddBlock(bc.newBlock())) checkSigTx(t, requests, len(requests), false) @@ -522,16 +555,29 @@ func TestNotary(t *testing.T) { // PostPersist: different NVBs // check OnNewRequest with finalization error and different NVBs - finalizeWithError = true + setFinalizeWithError(true) requests = checkCompleteStandardRequest(t, 5, false, 1, 2, 3, 4, 5) checkFallbackTxs(t, requests, false) // generate blocks to reach the most earlier fallback's NVB _, err = bc.genBlocks(int(nvbDiffFallback)) require.NoError(t, err) // check PostPersist for valid fallbacks without finalisation error - finalizeWithError = false + // Add block before allowing tx to finalize to exclude race condition when + // main transaction is finalized between `finalizeWithError` restore and adding new block. + require.NoError(t, bc.AddBlock(bc.newBlock())) + mtx.RLock() + start := len(completedTxes) + mtx.RUnlock() + setFinalizeWithError(false) for i := range requests { - require.NoError(t, bc.AddBlock(bc.newBlock())) + if i != 0 { + require.NoError(t, bc.AddBlock(bc.newBlock())) + } + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + return len(completedTxes)-start >= i+1 + }, time.Second*3, time.Millisecond) checkSigTx(t, requests, len(requests), false) checkFallbackTxs(t, requests[:i+1], true) checkFallbackTxs(t, requests[i+1:], false) @@ -539,7 +585,7 @@ func TestNotary(t *testing.T) { // OnRequestRemoval: missing account // check OnNewRequest with finalization error - finalizeWithError = true + setFinalizeWithError(true) requests = checkCompleteStandardRequest(t, 4, false) checkFallbackTxs(t, requests, false) // make fallbacks valid and remove one fallback @@ -548,7 +594,7 @@ func TestNotary(t *testing.T) { ntr1.UpdateNotaryNodes(keys.PublicKeys{randomAcc.PublicKey()}) ntr1.OnRequestRemoval(requests[3]) // non of the fallbacks should be completed - finalizeWithError = false + setFinalizeWithError(false) require.NoError(t, bc.AddBlock(bc.newBlock())) checkSigTx(t, requests, len(requests), false) checkFallbackTxs(t, requests, false) @@ -557,7 +603,7 @@ func TestNotary(t *testing.T) { // OnRequestRemoval: signature request, remove one fallback // check OnNewRequest with finalization error - finalizeWithError = true + setFinalizeWithError(true) requests = checkCompleteStandardRequest(t, 4, false) checkFallbackTxs(t, requests, false) // make fallbacks valid and remove one fallback @@ -566,14 +612,14 @@ func TestNotary(t *testing.T) { unlucky := requests[3] ntr1.OnRequestRemoval(unlucky) // rest of the fallbacks should be completed - finalizeWithError = false + setFinalizeWithError(false) require.NoError(t, bc.AddBlock(bc.newBlock())) checkSigTx(t, requests, len(requests), false) checkFallbackTxs(t, requests[:3], true) require.Nil(t, completedTxes[unlucky.FallbackTransaction.Hash()]) // OnRequestRemoval: signature request, remove all fallbacks - finalizeWithError = true + setFinalizeWithError(true) requests = checkCompleteStandardRequest(t, 4, false) // remove all fallbacks _, err = bc.genBlocks(int(nvbDiffFallback)) @@ -582,7 +628,7 @@ func TestNotary(t *testing.T) { ntr1.OnRequestRemoval(requests[i]) } // then the whole request should be removed, i.e. there are no completed transactions - finalizeWithError = false + setFinalizeWithError(false) require.NoError(t, bc.AddBlock(bc.newBlock())) checkSigTx(t, requests, len(requests), false) checkFallbackTxs(t, requests, false) @@ -596,7 +642,7 @@ func TestNotary(t *testing.T) { // OnRequestRemoval: multisignature request, remove one fallback nSigs, nKeys = 3, 5 // check OnNewRequest with finalization error - finalizeWithError = true + setFinalizeWithError(true) requests = checkCompleteMultisigRequest(t, nSigs, nKeys, false) checkMultisigTx(t, nSigs, requests, len(requests), false) checkFallbackTxs(t, requests, false) @@ -606,7 +652,7 @@ func TestNotary(t *testing.T) { unlucky = requests[nSigs-1] ntr1.OnRequestRemoval(unlucky) // then (m-1) out of n fallbacks should be completed - finalizeWithError = false + setFinalizeWithError(false) require.NoError(t, bc.AddBlock(bc.newBlock())) checkMultisigTx(t, nSigs, requests, len(requests), false) checkFallbackTxs(t, requests[:nSigs-1], true) @@ -615,7 +661,7 @@ func TestNotary(t *testing.T) { checkFallbackTxs(t, requests[nSigs:], true) // OnRequestRemoval: multisignature request, remove all fallbacks - finalizeWithError = true + setFinalizeWithError(true) requests = checkCompleteMultisigRequest(t, nSigs, nKeys, false) // make fallbacks valid and then remove all of them _, err = bc.genBlocks(int(nvbDiffFallback)) @@ -624,7 +670,7 @@ func TestNotary(t *testing.T) { ntr1.OnRequestRemoval(requests[i]) } // then the whole request should be removed, i.e. there are no completed transactions - finalizeWithError = false + setFinalizeWithError(false) require.NoError(t, bc.AddBlock(bc.newBlock())) checkMultisigTx(t, nSigs, requests, len(requests), false) checkFallbackTxs(t, requests, false) @@ -636,13 +682,7 @@ func TestNotary(t *testing.T) { checkFallbackTxs(t, requests, false) // Subscriptions test - mp1.RunSubscriptions() - go ntr1.Run() - t.Cleanup(func() { - ntr1.Stop() - mp1.StopSubscriptions() - }) - finalizeWithError = false + setFinalizeWithError(false) requester1, _ := wallet.NewAccount() requester2, _ := wallet.NewAccount() amount := int64(100_0000_0000) diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 2e5c35be6..702c178fb 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -35,6 +35,9 @@ type ( // onTransaction is a callback for completed transactions (mains or fallbacks) sending. onTransaction func(tx *transaction.Transaction) error + // newTxs is a channel where new transactions are sent + // to be processed in a `onTransaction` callback. + newTxs chan txHashPair // reqMtx protects requests list. reqMtx sync.RWMutex @@ -62,6 +65,8 @@ type ( } ) +const defaultTxChannelCapacity = 100 + // request represents Notary service request. type request struct { typ RequestType @@ -109,6 +114,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu Network: net, wallet: wallet, onTransaction: onTransaction, + newTxs: make(chan txHashPair, defaultTxChannelCapacity), mp: mp, reqCh: make(chan mempoolevent.Event), blocksCh: make(chan *block.Block), @@ -121,6 +127,7 @@ func (n *Notary) Run() { n.Config.Log.Info("starting notary service") n.Config.Chain.SubscribeForBlocks(n.blocksCh) n.mp.SubscribeForTransactions(n.reqCh) + go n.newTxCallbackLoop() for { select { case <-n.stopCh: @@ -236,10 +243,8 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) { } } if r.typ != Unknown && r.nSigsCollected == nSigs && r.minNotValidBefore > n.Config.Chain.BlockHeight() { - if err := n.finalize(r.main); err != nil { + if err := n.finalize(r.main, payload.MainTransaction.Hash()); err != nil { n.Config.Log.Error("failed to finalize main transaction", zap.Error(err)) - } else { - r.isSent = true } } } @@ -280,35 +285,24 @@ func (n *Notary) PostPersist() { currHeight := n.Config.Chain.BlockHeight() for h, r := range n.requests { if !r.isSent && r.typ != Unknown && r.nSigs == r.nSigsCollected && r.minNotValidBefore > currHeight { - if err := n.finalize(r.main); err != nil { + if err := n.finalize(r.main, h); err != nil { n.Config.Log.Error("failed to finalize main transaction", zap.Error(err)) - } else { - r.isSent = true } continue } if r.minNotValidBefore <= currHeight { // then at least one of the fallbacks can already be sent. - newFallbacks := r.fallbacks[:0] for _, fb := range r.fallbacks { if nvb := fb.GetAttributes(transaction.NotValidBeforeT)[0].Value.(*transaction.NotValidBefore).Height; nvb <= currHeight { - if err := n.finalize(fb); err != nil { - newFallbacks = append(newFallbacks, fb) // wait for the next block to resend them - } - } else { - newFallbacks = append(newFallbacks, fb) + // Ignore the error, wait for the next block to resend them + _ = n.finalize(fb, h) } } - if len(newFallbacks) == 0 { - delete(n.requests, h) - } else { - r.fallbacks = newFallbacks - } } } } // finalize adds missing Notary witnesses to the transaction (main or fallback) and pushes it to the network. -func (n *Notary) finalize(tx *transaction.Transaction) error { +func (n *Notary) finalize(tx *transaction.Transaction, h util.Uint256) error { acc := n.getAccount() if acc == nil { panic(errors.New("no available Notary account")) // unreachable code, because all callers of `finalize` check that acc != nil @@ -328,7 +322,76 @@ func (n *Notary) finalize(tx *transaction.Transaction) error { return fmt.Errorf("failed to update completed transaction's size: %w", err) } - return n.onTransaction(newTx) + n.pushNewTx(newTx, h) + + return nil +} + +type txHashPair struct { + tx *transaction.Transaction + mainHash util.Uint256 +} + +func (n *Notary) pushNewTx(tx *transaction.Transaction, h util.Uint256) { + select { + case n.newTxs <- txHashPair{tx, h}: + default: + } +} + +func (n *Notary) newTxCallbackLoop() { + for { + select { + case tx := <-n.newTxs: + isMain := tx.tx.Hash() == tx.mainHash + + n.reqMtx.Lock() + r, ok := n.requests[tx.mainHash] + if !ok || isMain && (r.isSent || r.minNotValidBefore <= n.Config.Chain.BlockHeight()) { + n.reqMtx.Unlock() + continue + } + if !isMain { + // Ensure that fallback was not already completed. + var isPending bool + for _, fb := range r.fallbacks { + if fb.Hash() == tx.tx.Hash() { + isPending = true + break + } + } + if !isPending { + n.reqMtx.Unlock() + continue + } + } + + n.reqMtx.Unlock() + err := n.onTransaction(tx.tx) + if err != nil { + n.Config.Log.Error("new transaction callback finished with error", zap.Error(err)) + continue + } + + n.reqMtx.Lock() + if isMain { + r.isSent = true + } else { + for i := range r.fallbacks { + if r.fallbacks[i].Hash() == tx.tx.Hash() { + r.fallbacks = append(r.fallbacks[:i], r.fallbacks[i+1:]...) + break + } + } + if len(r.fallbacks) == 0 { + delete(n.requests, tx.mainHash) + } + } + n.reqMtx.Unlock() + case <-n.stopCh: + return + } + } } // updateTxSize returns transaction with re-calculated size and an error.