Merge pull request #2064 from nspcc-dev/fix-remove-stale-hang

mempool: send events in a separate goroutine
This commit is contained in:
Roman Khimov 2021-07-23 18:16:14 +03:00 committed by GitHub
commit 50d99464e0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 168 additions and 67 deletions

View file

@ -70,6 +70,16 @@ func TestNotary(t *testing.T) {
finalizeWithError bool
choosy bool
)
setFinalizeWithError := func(v bool) {
mtx.Lock()
finalizeWithError = v
mtx.Unlock()
}
setChoosy := func(v bool) {
mtx.Lock()
choosy = v
mtx.Unlock()
}
onTransaction := func(tx *transaction.Transaction) error {
mtx.Lock()
defer mtx.Unlock()
@ -91,6 +101,22 @@ func TestNotary(t *testing.T) {
completedTxes[tx.Hash()] = tx
return nil
}
getCompletedTx := func(t *testing.T, waitForNonNil bool, h util.Uint256) *transaction.Transaction {
if !waitForNonNil {
mtx.RLock()
defer mtx.RUnlock()
return completedTxes[h]
}
var completedTx *transaction.Transaction
require.Eventually(t, func() bool {
mtx.RLock()
defer mtx.RUnlock()
completedTx = completedTxes[h]
return completedTx != nil
}, time.Second*3, time.Millisecond*50, errors.New("main transaction expected to be completed"))
return completedTx
}
acc1, ntr1, mp1 := getTestNotary(t, bc, "./testdata/notary1.json", "one", onTransaction)
acc2, _, _ := getTestNotary(t, bc, "./testdata/notary2.json", "two", onTransaction)
@ -102,6 +128,13 @@ func TestNotary(t *testing.T) {
ntr1.PostPersist()
})
mp1.RunSubscriptions()
go ntr1.Run()
t.Cleanup(func() {
ntr1.Stop()
mp1.StopSubscriptions()
})
notaryNodes := keys.PublicKeys{acc1.PrivateKey().PublicKey(), acc2.PrivateKey().PublicKey()}
bc.setNodesByRole(t, true, noderoles.P2PNotary, notaryNodes)
@ -254,9 +287,8 @@ func TestNotary(t *testing.T) {
}
checkSigTx := func(t *testing.T, requests []*payload.P2PNotaryRequest, sentCount int, shouldComplete bool) {
nKeys := len(requests)
completedTx := completedTxes[requests[0].MainTransaction.Hash()]
if sentCount == nKeys && shouldComplete {
require.NotNil(t, completedTx, errors.New("main transaction expected to be completed"))
completedTx := getCompletedTx(t, true, requests[0].MainTransaction.Hash())
require.Equal(t, nKeys+1, len(completedTx.Signers))
require.Equal(t, nKeys+1, len(completedTx.Scripts))
@ -274,13 +306,13 @@ func TestNotary(t *testing.T) {
VerificationScript: []byte{},
}, completedTx.Scripts[nKeys])
} else {
completedTx := getCompletedTx(t, false, requests[0].MainTransaction.Hash())
require.Nil(t, completedTx, fmt.Errorf("main transaction shouldn't be completed: sent %d out of %d requests", sentCount, nKeys))
}
}
checkMultisigTx := func(t *testing.T, nSigs int, requests []*payload.P2PNotaryRequest, sentCount int, shouldComplete bool) {
completedTx := completedTxes[requests[0].MainTransaction.Hash()]
if sentCount >= nSigs && shouldComplete {
require.NotNil(t, completedTx, errors.New("main transaction expected to be completed"))
completedTx := getCompletedTx(t, true, requests[0].MainTransaction.Hash())
require.Equal(t, 2, len(completedTx.Signers))
require.Equal(t, 2, len(completedTx.Scripts))
interopCtx := bc.newInteropContext(trigger.Verification, bc.dao, nil, completedTx)
@ -299,14 +331,14 @@ func TestNotary(t *testing.T) {
require.False(t, bytes.Contains(completedTx.Scripts[0].InvocationScript, req.MainTransaction.Scripts[0].InvocationScript), fmt.Errorf("signature from extra request #%d shouldn't be presented in the main tx", i))
}
} else {
completedTx := getCompletedTx(t, false, requests[0].MainTransaction.Hash())
require.Nil(t, completedTx, fmt.Errorf("main transaction shouldn't be completed: sent %d out of %d requests", sentCount, nSigs))
}
}
checkFallbackTxs := func(t *testing.T, requests []*payload.P2PNotaryRequest, shouldComplete bool) {
for i, req := range requests {
completedTx := completedTxes[req.FallbackTransaction.Hash()]
if shouldComplete {
require.NotNil(t, completedTx, fmt.Errorf("fallback transaction for request #%d expected to be completed", i))
completedTx := getCompletedTx(t, true, req.FallbackTransaction.Hash())
require.Equal(t, 2, len(completedTx.Signers))
require.Equal(t, 2, len(completedTx.Scripts))
require.Equal(t, transaction.Witness{
@ -321,6 +353,7 @@ func TestNotary(t *testing.T) {
_, err := bc.verifyHashAgainstScript(completedTx.Signers[1].Account, &completedTx.Scripts[1], interopCtx, -1)
require.NoError(t, err)
} else {
completedTx := getCompletedTx(t, false, req.FallbackTransaction.Hash())
require.Nil(t, completedTx, fmt.Errorf("fallback transaction for request #%d shouldn't be completed", i))
}
}
@ -419,11 +452,11 @@ func TestNotary(t *testing.T) {
checkFallbackTxs(t, r, false)
// PostPersist: missing account
finalizeWithError = true
setFinalizeWithError(true)
r = checkCompleteStandardRequest(t, 1, false)
checkFallbackTxs(t, r, false)
finalizeWithError = false
ntr1.UpdateNotaryNodes(keys.PublicKeys{randomAcc.PublicKey()})
setFinalizeWithError(false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkSigTx(t, r, 1, false)
checkFallbackTxs(t, r, false)
@ -431,34 +464,34 @@ func TestNotary(t *testing.T) {
ntr1.UpdateNotaryNodes(keys.PublicKeys{acc1.PrivateKey().PublicKey()})
// PostPersist: complete main transaction, signature request
finalizeWithError = true
setFinalizeWithError(true)
requests := checkCompleteStandardRequest(t, 3, false)
// check PostPersist with finalisation error
finalizeWithError = true
setFinalizeWithError(true)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkSigTx(t, requests, len(requests), false)
// check PostPersist without finalisation error
finalizeWithError = false
setFinalizeWithError(false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkSigTx(t, requests, len(requests), true)
// PostPersist: complete main transaction, multisignature account
finalizeWithError = true
setFinalizeWithError(true)
requests = checkCompleteMultisigRequest(t, 3, 4, false)
checkFallbackTxs(t, requests, false)
// check PostPersist with finalisation error
finalizeWithError = true
setFinalizeWithError(true)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkMultisigTx(t, 3, requests, len(requests), false)
checkFallbackTxs(t, requests, false)
// check PostPersist without finalisation error
finalizeWithError = false
setFinalizeWithError(false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkMultisigTx(t, 3, requests, len(requests), true)
checkFallbackTxs(t, requests, false)
// PostPersist: complete fallback, signature request
finalizeWithError = true
setFinalizeWithError(true)
requests = checkCompleteStandardRequest(t, 3, false)
checkFallbackTxs(t, requests, false)
// make fallbacks valid
@ -469,7 +502,7 @@ func TestNotary(t *testing.T) {
checkSigTx(t, requests, len(requests), false)
checkFallbackTxs(t, requests, false)
// check PostPersist for valid fallbacks without finalisation error
finalizeWithError = false
setFinalizeWithError(false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkSigTx(t, requests, len(requests), false)
checkFallbackTxs(t, requests, true)
@ -477,7 +510,7 @@ func TestNotary(t *testing.T) {
// PostPersist: complete fallback, multisignature request
nSigs, nKeys := 3, 5
// check OnNewRequest with finalization error
finalizeWithError = true
setFinalizeWithError(true)
requests = checkCompleteMultisigRequest(t, nSigs, nKeys, false)
checkFallbackTxs(t, requests, false)
// make fallbacks valid
@ -488,7 +521,7 @@ func TestNotary(t *testing.T) {
checkMultisigTx(t, nSigs, requests, len(requests), false)
checkFallbackTxs(t, requests, false)
// check PostPersist for valid fallbacks without finalisation error
finalizeWithError = false
setFinalizeWithError(false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkMultisigTx(t, nSigs, requests, len(requests), false)
checkFallbackTxs(t, requests[:nSigs], true)
@ -496,7 +529,7 @@ func TestNotary(t *testing.T) {
checkFallbackTxs(t, requests[nSigs:], true)
// PostPersist: partial fallbacks completion due to finalisation errors
finalizeWithError = true
setFinalizeWithError(true)
requests = checkCompleteStandardRequest(t, 5, false)
checkFallbackTxs(t, requests, false)
// make fallbacks valid
@ -505,15 +538,15 @@ func TestNotary(t *testing.T) {
// some of fallbacks should fail finalisation
unluckies = []*payload.P2PNotaryRequest{requests[0], requests[4]}
lucky := requests[1:4]
choosy = true
setChoosy(true)
// check PostPersist for lucky fallbacks
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkSigTx(t, requests, len(requests), false)
checkFallbackTxs(t, lucky, true)
checkFallbackTxs(t, unluckies, false)
// reset finalisation function for unlucky fallbacks to finalise without an error
choosy = false
finalizeWithError = false
setChoosy(false)
setFinalizeWithError(false)
// check PostPersist for unlucky fallbacks
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkSigTx(t, requests, len(requests), false)
@ -522,16 +555,29 @@ func TestNotary(t *testing.T) {
// PostPersist: different NVBs
// check OnNewRequest with finalization error and different NVBs
finalizeWithError = true
setFinalizeWithError(true)
requests = checkCompleteStandardRequest(t, 5, false, 1, 2, 3, 4, 5)
checkFallbackTxs(t, requests, false)
// generate blocks to reach the most earlier fallback's NVB
_, err = bc.genBlocks(int(nvbDiffFallback))
require.NoError(t, err)
// check PostPersist for valid fallbacks without finalisation error
finalizeWithError = false
for i := range requests {
// Add block before allowing tx to finalize to exclude race condition when
// main transaction is finalized between `finalizeWithError` restore and adding new block.
require.NoError(t, bc.AddBlock(bc.newBlock()))
mtx.RLock()
start := len(completedTxes)
mtx.RUnlock()
setFinalizeWithError(false)
for i := range requests {
if i != 0 {
require.NoError(t, bc.AddBlock(bc.newBlock()))
}
require.Eventually(t, func() bool {
mtx.RLock()
defer mtx.RUnlock()
return len(completedTxes)-start >= i+1
}, time.Second*3, time.Millisecond)
checkSigTx(t, requests, len(requests), false)
checkFallbackTxs(t, requests[:i+1], true)
checkFallbackTxs(t, requests[i+1:], false)
@ -539,7 +585,7 @@ func TestNotary(t *testing.T) {
// OnRequestRemoval: missing account
// check OnNewRequest with finalization error
finalizeWithError = true
setFinalizeWithError(true)
requests = checkCompleteStandardRequest(t, 4, false)
checkFallbackTxs(t, requests, false)
// make fallbacks valid and remove one fallback
@ -548,7 +594,7 @@ func TestNotary(t *testing.T) {
ntr1.UpdateNotaryNodes(keys.PublicKeys{randomAcc.PublicKey()})
ntr1.OnRequestRemoval(requests[3])
// non of the fallbacks should be completed
finalizeWithError = false
setFinalizeWithError(false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkSigTx(t, requests, len(requests), false)
checkFallbackTxs(t, requests, false)
@ -557,7 +603,7 @@ func TestNotary(t *testing.T) {
// OnRequestRemoval: signature request, remove one fallback
// check OnNewRequest with finalization error
finalizeWithError = true
setFinalizeWithError(true)
requests = checkCompleteStandardRequest(t, 4, false)
checkFallbackTxs(t, requests, false)
// make fallbacks valid and remove one fallback
@ -566,14 +612,14 @@ func TestNotary(t *testing.T) {
unlucky := requests[3]
ntr1.OnRequestRemoval(unlucky)
// rest of the fallbacks should be completed
finalizeWithError = false
setFinalizeWithError(false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkSigTx(t, requests, len(requests), false)
checkFallbackTxs(t, requests[:3], true)
require.Nil(t, completedTxes[unlucky.FallbackTransaction.Hash()])
// OnRequestRemoval: signature request, remove all fallbacks
finalizeWithError = true
setFinalizeWithError(true)
requests = checkCompleteStandardRequest(t, 4, false)
// remove all fallbacks
_, err = bc.genBlocks(int(nvbDiffFallback))
@ -582,7 +628,7 @@ func TestNotary(t *testing.T) {
ntr1.OnRequestRemoval(requests[i])
}
// then the whole request should be removed, i.e. there are no completed transactions
finalizeWithError = false
setFinalizeWithError(false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkSigTx(t, requests, len(requests), false)
checkFallbackTxs(t, requests, false)
@ -596,7 +642,7 @@ func TestNotary(t *testing.T) {
// OnRequestRemoval: multisignature request, remove one fallback
nSigs, nKeys = 3, 5
// check OnNewRequest with finalization error
finalizeWithError = true
setFinalizeWithError(true)
requests = checkCompleteMultisigRequest(t, nSigs, nKeys, false)
checkMultisigTx(t, nSigs, requests, len(requests), false)
checkFallbackTxs(t, requests, false)
@ -606,7 +652,7 @@ func TestNotary(t *testing.T) {
unlucky = requests[nSigs-1]
ntr1.OnRequestRemoval(unlucky)
// then (m-1) out of n fallbacks should be completed
finalizeWithError = false
setFinalizeWithError(false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkMultisigTx(t, nSigs, requests, len(requests), false)
checkFallbackTxs(t, requests[:nSigs-1], true)
@ -615,7 +661,7 @@ func TestNotary(t *testing.T) {
checkFallbackTxs(t, requests[nSigs:], true)
// OnRequestRemoval: multisignature request, remove all fallbacks
finalizeWithError = true
setFinalizeWithError(true)
requests = checkCompleteMultisigRequest(t, nSigs, nKeys, false)
// make fallbacks valid and then remove all of them
_, err = bc.genBlocks(int(nvbDiffFallback))
@ -624,7 +670,7 @@ func TestNotary(t *testing.T) {
ntr1.OnRequestRemoval(requests[i])
}
// then the whole request should be removed, i.e. there are no completed transactions
finalizeWithError = false
setFinalizeWithError(false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
checkMultisigTx(t, nSigs, requests, len(requests), false)
checkFallbackTxs(t, requests, false)
@ -636,13 +682,7 @@ func TestNotary(t *testing.T) {
checkFallbackTxs(t, requests, false)
// Subscriptions test
mp1.RunSubscriptions()
go ntr1.Run()
t.Cleanup(func() {
ntr1.Stop()
mp1.StopSubscriptions()
})
finalizeWithError = false
setFinalizeWithError(false)
requester1, _ := wallet.NewAccount()
requester2, _ := wallet.NewAccount()
amount := int64(100_0000_0000)

View file

@ -35,6 +35,9 @@ type (
// onTransaction is a callback for completed transactions (mains or fallbacks) sending.
onTransaction func(tx *transaction.Transaction) error
// newTxs is a channel where new transactions are sent
// to be processed in a `onTransaction` callback.
newTxs chan txHashPair
// reqMtx protects requests list.
reqMtx sync.RWMutex
@ -62,6 +65,8 @@ type (
}
)
const defaultTxChannelCapacity = 100
// request represents Notary service request.
type request struct {
typ RequestType
@ -109,6 +114,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu
Network: net,
wallet: wallet,
onTransaction: onTransaction,
newTxs: make(chan txHashPair, defaultTxChannelCapacity),
mp: mp,
reqCh: make(chan mempoolevent.Event),
blocksCh: make(chan *block.Block),
@ -121,6 +127,7 @@ func (n *Notary) Run() {
n.Config.Log.Info("starting notary service")
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh)
go n.newTxCallbackLoop()
for {
select {
case <-n.stopCh:
@ -150,7 +157,8 @@ func (n *Notary) Stop() {
// OnNewRequest is a callback method which is called after new notary request is added to the notary request pool.
func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
if n.getAccount() == nil {
acc := n.getAccount()
if acc == nil {
return
}
@ -236,10 +244,8 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
}
}
if r.typ != Unknown && r.nSigsCollected == nSigs && r.minNotValidBefore > n.Config.Chain.BlockHeight() {
if err := n.finalize(r.main); err != nil {
if err := n.finalize(acc, r.main, payload.MainTransaction.Hash()); err != nil {
n.Config.Log.Error("failed to finalize main transaction", zap.Error(err))
} else {
r.isSent = true
}
}
}
@ -271,7 +277,8 @@ func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) {
// PostPersist is a callback which is called after new block event is received.
// PostPersist must not be called under the blockchain lock, because it uses finalization function.
func (n *Notary) PostPersist() {
if n.getAccount() == nil {
acc := n.getAccount()
if acc == nil {
return
}
@ -280,39 +287,24 @@ func (n *Notary) PostPersist() {
currHeight := n.Config.Chain.BlockHeight()
for h, r := range n.requests {
if !r.isSent && r.typ != Unknown && r.nSigs == r.nSigsCollected && r.minNotValidBefore > currHeight {
if err := n.finalize(r.main); err != nil {
if err := n.finalize(acc, r.main, h); err != nil {
n.Config.Log.Error("failed to finalize main transaction", zap.Error(err))
} else {
r.isSent = true
}
continue
}
if r.minNotValidBefore <= currHeight { // then at least one of the fallbacks can already be sent.
newFallbacks := r.fallbacks[:0]
for _, fb := range r.fallbacks {
if nvb := fb.GetAttributes(transaction.NotValidBeforeT)[0].Value.(*transaction.NotValidBefore).Height; nvb <= currHeight {
if err := n.finalize(fb); err != nil {
newFallbacks = append(newFallbacks, fb) // wait for the next block to resend them
// Ignore the error, wait for the next block to resend them
_ = n.finalize(acc, fb, h)
}
} else {
newFallbacks = append(newFallbacks, fb)
}
}
if len(newFallbacks) == 0 {
delete(n.requests, h)
} else {
r.fallbacks = newFallbacks
}
}
}
}
// finalize adds missing Notary witnesses to the transaction (main or fallback) and pushes it to the network.
func (n *Notary) finalize(tx *transaction.Transaction) error {
acc := n.getAccount()
if acc == nil {
panic(errors.New("no available Notary account")) // unreachable code, because all callers of `finalize` check that acc != nil
}
func (n *Notary) finalize(acc *wallet.Account, tx *transaction.Transaction, h util.Uint256) error {
notaryWitness := transaction.Witness{
InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, acc.PrivateKey().SignHashable(uint32(n.Network), tx)...),
VerificationScript: []byte{},
@ -328,7 +320,76 @@ func (n *Notary) finalize(tx *transaction.Transaction) error {
return fmt.Errorf("failed to update completed transaction's size: %w", err)
}
return n.onTransaction(newTx)
n.pushNewTx(newTx, h)
return nil
}
type txHashPair struct {
tx *transaction.Transaction
mainHash util.Uint256
}
func (n *Notary) pushNewTx(tx *transaction.Transaction, h util.Uint256) {
select {
case n.newTxs <- txHashPair{tx, h}:
default:
}
}
func (n *Notary) newTxCallbackLoop() {
for {
select {
case tx := <-n.newTxs:
isMain := tx.tx.Hash() == tx.mainHash
n.reqMtx.Lock()
r, ok := n.requests[tx.mainHash]
if !ok || isMain && (r.isSent || r.minNotValidBefore <= n.Config.Chain.BlockHeight()) {
n.reqMtx.Unlock()
continue
}
if !isMain {
// Ensure that fallback was not already completed.
var isPending bool
for _, fb := range r.fallbacks {
if fb.Hash() == tx.tx.Hash() {
isPending = true
break
}
}
if !isPending {
n.reqMtx.Unlock()
continue
}
}
n.reqMtx.Unlock()
err := n.onTransaction(tx.tx)
if err != nil {
n.Config.Log.Error("new transaction callback finished with error", zap.Error(err))
continue
}
n.reqMtx.Lock()
if isMain {
r.isSent = true
} else {
for i := range r.fallbacks {
if r.fallbacks[i].Hash() == tx.tx.Hash() {
r.fallbacks = append(r.fallbacks[:i], r.fallbacks[i+1:]...)
break
}
}
if len(r.fallbacks) == 0 {
delete(n.requests, tx.mainHash)
}
}
n.reqMtx.Unlock()
case <-n.stopCh:
return
}
}
}
// updateTxSize returns transaction with re-calculated size and an error.