mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2024-11-22 19:29:39 +00:00
notary: process new transactions in a separate goroutine
Related #2063. Signed-off-by: Evgeniy Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
520133aee2
commit
3507f52c32
2 changed files with 164 additions and 61 deletions
|
@ -70,6 +70,16 @@ func TestNotary(t *testing.T) {
|
|||
finalizeWithError bool
|
||||
choosy bool
|
||||
)
|
||||
setFinalizeWithError := func(v bool) {
|
||||
mtx.Lock()
|
||||
finalizeWithError = v
|
||||
mtx.Unlock()
|
||||
}
|
||||
setChoosy := func(v bool) {
|
||||
mtx.Lock()
|
||||
choosy = v
|
||||
mtx.Unlock()
|
||||
}
|
||||
onTransaction := func(tx *transaction.Transaction) error {
|
||||
mtx.Lock()
|
||||
defer mtx.Unlock()
|
||||
|
@ -91,6 +101,22 @@ func TestNotary(t *testing.T) {
|
|||
completedTxes[tx.Hash()] = tx
|
||||
return nil
|
||||
}
|
||||
getCompletedTx := func(t *testing.T, waitForNonNil bool, h util.Uint256) *transaction.Transaction {
|
||||
if !waitForNonNil {
|
||||
mtx.RLock()
|
||||
defer mtx.RUnlock()
|
||||
return completedTxes[h]
|
||||
}
|
||||
|
||||
var completedTx *transaction.Transaction
|
||||
require.Eventually(t, func() bool {
|
||||
mtx.RLock()
|
||||
defer mtx.RUnlock()
|
||||
completedTx = completedTxes[h]
|
||||
return completedTx != nil
|
||||
}, time.Second*3, time.Millisecond*50, errors.New("main transaction expected to be completed"))
|
||||
return completedTx
|
||||
}
|
||||
|
||||
acc1, ntr1, mp1 := getTestNotary(t, bc, "./testdata/notary1.json", "one", onTransaction)
|
||||
acc2, _, _ := getTestNotary(t, bc, "./testdata/notary2.json", "two", onTransaction)
|
||||
|
@ -102,6 +128,13 @@ func TestNotary(t *testing.T) {
|
|||
ntr1.PostPersist()
|
||||
})
|
||||
|
||||
mp1.RunSubscriptions()
|
||||
go ntr1.Run()
|
||||
t.Cleanup(func() {
|
||||
ntr1.Stop()
|
||||
mp1.StopSubscriptions()
|
||||
})
|
||||
|
||||
notaryNodes := keys.PublicKeys{acc1.PrivateKey().PublicKey(), acc2.PrivateKey().PublicKey()}
|
||||
bc.setNodesByRole(t, true, noderoles.P2PNotary, notaryNodes)
|
||||
|
||||
|
@ -254,9 +287,8 @@ func TestNotary(t *testing.T) {
|
|||
}
|
||||
checkSigTx := func(t *testing.T, requests []*payload.P2PNotaryRequest, sentCount int, shouldComplete bool) {
|
||||
nKeys := len(requests)
|
||||
completedTx := completedTxes[requests[0].MainTransaction.Hash()]
|
||||
if sentCount == nKeys && shouldComplete {
|
||||
require.NotNil(t, completedTx, errors.New("main transaction expected to be completed"))
|
||||
completedTx := getCompletedTx(t, true, requests[0].MainTransaction.Hash())
|
||||
require.Equal(t, nKeys+1, len(completedTx.Signers))
|
||||
require.Equal(t, nKeys+1, len(completedTx.Scripts))
|
||||
|
||||
|
@ -274,13 +306,13 @@ func TestNotary(t *testing.T) {
|
|||
VerificationScript: []byte{},
|
||||
}, completedTx.Scripts[nKeys])
|
||||
} else {
|
||||
completedTx := getCompletedTx(t, false, requests[0].MainTransaction.Hash())
|
||||
require.Nil(t, completedTx, fmt.Errorf("main transaction shouldn't be completed: sent %d out of %d requests", sentCount, nKeys))
|
||||
}
|
||||
}
|
||||
checkMultisigTx := func(t *testing.T, nSigs int, requests []*payload.P2PNotaryRequest, sentCount int, shouldComplete bool) {
|
||||
completedTx := completedTxes[requests[0].MainTransaction.Hash()]
|
||||
if sentCount >= nSigs && shouldComplete {
|
||||
require.NotNil(t, completedTx, errors.New("main transaction expected to be completed"))
|
||||
completedTx := getCompletedTx(t, true, requests[0].MainTransaction.Hash())
|
||||
require.Equal(t, 2, len(completedTx.Signers))
|
||||
require.Equal(t, 2, len(completedTx.Scripts))
|
||||
interopCtx := bc.newInteropContext(trigger.Verification, bc.dao, nil, completedTx)
|
||||
|
@ -299,14 +331,14 @@ func TestNotary(t *testing.T) {
|
|||
require.False(t, bytes.Contains(completedTx.Scripts[0].InvocationScript, req.MainTransaction.Scripts[0].InvocationScript), fmt.Errorf("signature from extra request #%d shouldn't be presented in the main tx", i))
|
||||
}
|
||||
} else {
|
||||
completedTx := getCompletedTx(t, false, requests[0].MainTransaction.Hash())
|
||||
require.Nil(t, completedTx, fmt.Errorf("main transaction shouldn't be completed: sent %d out of %d requests", sentCount, nSigs))
|
||||
}
|
||||
}
|
||||
checkFallbackTxs := func(t *testing.T, requests []*payload.P2PNotaryRequest, shouldComplete bool) {
|
||||
for i, req := range requests {
|
||||
completedTx := completedTxes[req.FallbackTransaction.Hash()]
|
||||
if shouldComplete {
|
||||
require.NotNil(t, completedTx, fmt.Errorf("fallback transaction for request #%d expected to be completed", i))
|
||||
completedTx := getCompletedTx(t, true, req.FallbackTransaction.Hash())
|
||||
require.Equal(t, 2, len(completedTx.Signers))
|
||||
require.Equal(t, 2, len(completedTx.Scripts))
|
||||
require.Equal(t, transaction.Witness{
|
||||
|
@ -321,6 +353,7 @@ func TestNotary(t *testing.T) {
|
|||
_, err := bc.verifyHashAgainstScript(completedTx.Signers[1].Account, &completedTx.Scripts[1], interopCtx, -1)
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
completedTx := getCompletedTx(t, false, req.FallbackTransaction.Hash())
|
||||
require.Nil(t, completedTx, fmt.Errorf("fallback transaction for request #%d shouldn't be completed", i))
|
||||
}
|
||||
}
|
||||
|
@ -419,11 +452,11 @@ func TestNotary(t *testing.T) {
|
|||
checkFallbackTxs(t, r, false)
|
||||
|
||||
// PostPersist: missing account
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
r = checkCompleteStandardRequest(t, 1, false)
|
||||
checkFallbackTxs(t, r, false)
|
||||
finalizeWithError = false
|
||||
ntr1.UpdateNotaryNodes(keys.PublicKeys{randomAcc.PublicKey()})
|
||||
setFinalizeWithError(false)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkSigTx(t, r, 1, false)
|
||||
checkFallbackTxs(t, r, false)
|
||||
|
@ -431,34 +464,34 @@ func TestNotary(t *testing.T) {
|
|||
ntr1.UpdateNotaryNodes(keys.PublicKeys{acc1.PrivateKey().PublicKey()})
|
||||
|
||||
// PostPersist: complete main transaction, signature request
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
requests := checkCompleteStandardRequest(t, 3, false)
|
||||
// check PostPersist with finalisation error
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkSigTx(t, requests, len(requests), false)
|
||||
// check PostPersist without finalisation error
|
||||
finalizeWithError = false
|
||||
setFinalizeWithError(false)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkSigTx(t, requests, len(requests), true)
|
||||
|
||||
// PostPersist: complete main transaction, multisignature account
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
requests = checkCompleteMultisigRequest(t, 3, 4, false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
// check PostPersist with finalisation error
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkMultisigTx(t, 3, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
// check PostPersist without finalisation error
|
||||
finalizeWithError = false
|
||||
setFinalizeWithError(false)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkMultisigTx(t, 3, requests, len(requests), true)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
|
||||
// PostPersist: complete fallback, signature request
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
requests = checkCompleteStandardRequest(t, 3, false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
// make fallbacks valid
|
||||
|
@ -469,7 +502,7 @@ func TestNotary(t *testing.T) {
|
|||
checkSigTx(t, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
// check PostPersist for valid fallbacks without finalisation error
|
||||
finalizeWithError = false
|
||||
setFinalizeWithError(false)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkSigTx(t, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests, true)
|
||||
|
@ -477,7 +510,7 @@ func TestNotary(t *testing.T) {
|
|||
// PostPersist: complete fallback, multisignature request
|
||||
nSigs, nKeys := 3, 5
|
||||
// check OnNewRequest with finalization error
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
requests = checkCompleteMultisigRequest(t, nSigs, nKeys, false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
// make fallbacks valid
|
||||
|
@ -488,7 +521,7 @@ func TestNotary(t *testing.T) {
|
|||
checkMultisigTx(t, nSigs, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
// check PostPersist for valid fallbacks without finalisation error
|
||||
finalizeWithError = false
|
||||
setFinalizeWithError(false)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkMultisigTx(t, nSigs, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests[:nSigs], true)
|
||||
|
@ -496,7 +529,7 @@ func TestNotary(t *testing.T) {
|
|||
checkFallbackTxs(t, requests[nSigs:], true)
|
||||
|
||||
// PostPersist: partial fallbacks completion due to finalisation errors
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
requests = checkCompleteStandardRequest(t, 5, false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
// make fallbacks valid
|
||||
|
@ -505,15 +538,15 @@ func TestNotary(t *testing.T) {
|
|||
// some of fallbacks should fail finalisation
|
||||
unluckies = []*payload.P2PNotaryRequest{requests[0], requests[4]}
|
||||
lucky := requests[1:4]
|
||||
choosy = true
|
||||
setChoosy(true)
|
||||
// check PostPersist for lucky fallbacks
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkSigTx(t, requests, len(requests), false)
|
||||
checkFallbackTxs(t, lucky, true)
|
||||
checkFallbackTxs(t, unluckies, false)
|
||||
// reset finalisation function for unlucky fallbacks to finalise without an error
|
||||
choosy = false
|
||||
finalizeWithError = false
|
||||
setChoosy(false)
|
||||
setFinalizeWithError(false)
|
||||
// check PostPersist for unlucky fallbacks
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkSigTx(t, requests, len(requests), false)
|
||||
|
@ -522,16 +555,29 @@ func TestNotary(t *testing.T) {
|
|||
|
||||
// PostPersist: different NVBs
|
||||
// check OnNewRequest with finalization error and different NVBs
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
requests = checkCompleteStandardRequest(t, 5, false, 1, 2, 3, 4, 5)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
// generate blocks to reach the most earlier fallback's NVB
|
||||
_, err = bc.genBlocks(int(nvbDiffFallback))
|
||||
require.NoError(t, err)
|
||||
// check PostPersist for valid fallbacks without finalisation error
|
||||
finalizeWithError = false
|
||||
// Add block before allowing tx to finalize to exclude race condition when
|
||||
// main transaction is finalized between `finalizeWithError` restore and adding new block.
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
mtx.RLock()
|
||||
start := len(completedTxes)
|
||||
mtx.RUnlock()
|
||||
setFinalizeWithError(false)
|
||||
for i := range requests {
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
if i != 0 {
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
}
|
||||
require.Eventually(t, func() bool {
|
||||
mtx.RLock()
|
||||
defer mtx.RUnlock()
|
||||
return len(completedTxes)-start >= i+1
|
||||
}, time.Second*3, time.Millisecond)
|
||||
checkSigTx(t, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests[:i+1], true)
|
||||
checkFallbackTxs(t, requests[i+1:], false)
|
||||
|
@ -539,7 +585,7 @@ func TestNotary(t *testing.T) {
|
|||
|
||||
// OnRequestRemoval: missing account
|
||||
// check OnNewRequest with finalization error
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
requests = checkCompleteStandardRequest(t, 4, false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
// make fallbacks valid and remove one fallback
|
||||
|
@ -548,7 +594,7 @@ func TestNotary(t *testing.T) {
|
|||
ntr1.UpdateNotaryNodes(keys.PublicKeys{randomAcc.PublicKey()})
|
||||
ntr1.OnRequestRemoval(requests[3])
|
||||
// non of the fallbacks should be completed
|
||||
finalizeWithError = false
|
||||
setFinalizeWithError(false)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkSigTx(t, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
|
@ -557,7 +603,7 @@ func TestNotary(t *testing.T) {
|
|||
|
||||
// OnRequestRemoval: signature request, remove one fallback
|
||||
// check OnNewRequest with finalization error
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
requests = checkCompleteStandardRequest(t, 4, false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
// make fallbacks valid and remove one fallback
|
||||
|
@ -566,14 +612,14 @@ func TestNotary(t *testing.T) {
|
|||
unlucky := requests[3]
|
||||
ntr1.OnRequestRemoval(unlucky)
|
||||
// rest of the fallbacks should be completed
|
||||
finalizeWithError = false
|
||||
setFinalizeWithError(false)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkSigTx(t, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests[:3], true)
|
||||
require.Nil(t, completedTxes[unlucky.FallbackTransaction.Hash()])
|
||||
|
||||
// OnRequestRemoval: signature request, remove all fallbacks
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
requests = checkCompleteStandardRequest(t, 4, false)
|
||||
// remove all fallbacks
|
||||
_, err = bc.genBlocks(int(nvbDiffFallback))
|
||||
|
@ -582,7 +628,7 @@ func TestNotary(t *testing.T) {
|
|||
ntr1.OnRequestRemoval(requests[i])
|
||||
}
|
||||
// then the whole request should be removed, i.e. there are no completed transactions
|
||||
finalizeWithError = false
|
||||
setFinalizeWithError(false)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkSigTx(t, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
|
@ -596,7 +642,7 @@ func TestNotary(t *testing.T) {
|
|||
// OnRequestRemoval: multisignature request, remove one fallback
|
||||
nSigs, nKeys = 3, 5
|
||||
// check OnNewRequest with finalization error
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
requests = checkCompleteMultisigRequest(t, nSigs, nKeys, false)
|
||||
checkMultisigTx(t, nSigs, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
|
@ -606,7 +652,7 @@ func TestNotary(t *testing.T) {
|
|||
unlucky = requests[nSigs-1]
|
||||
ntr1.OnRequestRemoval(unlucky)
|
||||
// then (m-1) out of n fallbacks should be completed
|
||||
finalizeWithError = false
|
||||
setFinalizeWithError(false)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkMultisigTx(t, nSigs, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests[:nSigs-1], true)
|
||||
|
@ -615,7 +661,7 @@ func TestNotary(t *testing.T) {
|
|||
checkFallbackTxs(t, requests[nSigs:], true)
|
||||
|
||||
// OnRequestRemoval: multisignature request, remove all fallbacks
|
||||
finalizeWithError = true
|
||||
setFinalizeWithError(true)
|
||||
requests = checkCompleteMultisigRequest(t, nSigs, nKeys, false)
|
||||
// make fallbacks valid and then remove all of them
|
||||
_, err = bc.genBlocks(int(nvbDiffFallback))
|
||||
|
@ -624,7 +670,7 @@ func TestNotary(t *testing.T) {
|
|||
ntr1.OnRequestRemoval(requests[i])
|
||||
}
|
||||
// then the whole request should be removed, i.e. there are no completed transactions
|
||||
finalizeWithError = false
|
||||
setFinalizeWithError(false)
|
||||
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||
checkMultisigTx(t, nSigs, requests, len(requests), false)
|
||||
checkFallbackTxs(t, requests, false)
|
||||
|
@ -636,13 +682,7 @@ func TestNotary(t *testing.T) {
|
|||
checkFallbackTxs(t, requests, false)
|
||||
|
||||
// Subscriptions test
|
||||
mp1.RunSubscriptions()
|
||||
go ntr1.Run()
|
||||
t.Cleanup(func() {
|
||||
ntr1.Stop()
|
||||
mp1.StopSubscriptions()
|
||||
})
|
||||
finalizeWithError = false
|
||||
setFinalizeWithError(false)
|
||||
requester1, _ := wallet.NewAccount()
|
||||
requester2, _ := wallet.NewAccount()
|
||||
amount := int64(100_0000_0000)
|
||||
|
|
|
@ -35,6 +35,9 @@ type (
|
|||
|
||||
// onTransaction is a callback for completed transactions (mains or fallbacks) sending.
|
||||
onTransaction func(tx *transaction.Transaction) error
|
||||
// newTxs is a channel where new transactions are sent
|
||||
// to be processed in a `onTransaction` callback.
|
||||
newTxs chan txHashPair
|
||||
|
||||
// reqMtx protects requests list.
|
||||
reqMtx sync.RWMutex
|
||||
|
@ -62,6 +65,8 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
const defaultTxChannelCapacity = 100
|
||||
|
||||
// request represents Notary service request.
|
||||
type request struct {
|
||||
typ RequestType
|
||||
|
@ -109,6 +114,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu
|
|||
Network: net,
|
||||
wallet: wallet,
|
||||
onTransaction: onTransaction,
|
||||
newTxs: make(chan txHashPair, defaultTxChannelCapacity),
|
||||
mp: mp,
|
||||
reqCh: make(chan mempoolevent.Event),
|
||||
blocksCh: make(chan *block.Block),
|
||||
|
@ -121,6 +127,7 @@ func (n *Notary) Run() {
|
|||
n.Config.Log.Info("starting notary service")
|
||||
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
|
||||
n.mp.SubscribeForTransactions(n.reqCh)
|
||||
go n.newTxCallbackLoop()
|
||||
for {
|
||||
select {
|
||||
case <-n.stopCh:
|
||||
|
@ -236,10 +243,8 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
|
|||
}
|
||||
}
|
||||
if r.typ != Unknown && r.nSigsCollected == nSigs && r.minNotValidBefore > n.Config.Chain.BlockHeight() {
|
||||
if err := n.finalize(r.main); err != nil {
|
||||
if err := n.finalize(r.main, payload.MainTransaction.Hash()); err != nil {
|
||||
n.Config.Log.Error("failed to finalize main transaction", zap.Error(err))
|
||||
} else {
|
||||
r.isSent = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -280,35 +285,24 @@ func (n *Notary) PostPersist() {
|
|||
currHeight := n.Config.Chain.BlockHeight()
|
||||
for h, r := range n.requests {
|
||||
if !r.isSent && r.typ != Unknown && r.nSigs == r.nSigsCollected && r.minNotValidBefore > currHeight {
|
||||
if err := n.finalize(r.main); err != nil {
|
||||
if err := n.finalize(r.main, h); err != nil {
|
||||
n.Config.Log.Error("failed to finalize main transaction", zap.Error(err))
|
||||
} else {
|
||||
r.isSent = true
|
||||
}
|
||||
continue
|
||||
}
|
||||
if r.minNotValidBefore <= currHeight { // then at least one of the fallbacks can already be sent.
|
||||
newFallbacks := r.fallbacks[:0]
|
||||
for _, fb := range r.fallbacks {
|
||||
if nvb := fb.GetAttributes(transaction.NotValidBeforeT)[0].Value.(*transaction.NotValidBefore).Height; nvb <= currHeight {
|
||||
if err := n.finalize(fb); err != nil {
|
||||
newFallbacks = append(newFallbacks, fb) // wait for the next block to resend them
|
||||
}
|
||||
} else {
|
||||
newFallbacks = append(newFallbacks, fb)
|
||||
// Ignore the error, wait for the next block to resend them
|
||||
_ = n.finalize(fb, h)
|
||||
}
|
||||
}
|
||||
if len(newFallbacks) == 0 {
|
||||
delete(n.requests, h)
|
||||
} else {
|
||||
r.fallbacks = newFallbacks
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// finalize adds missing Notary witnesses to the transaction (main or fallback) and pushes it to the network.
|
||||
func (n *Notary) finalize(tx *transaction.Transaction) error {
|
||||
func (n *Notary) finalize(tx *transaction.Transaction, h util.Uint256) error {
|
||||
acc := n.getAccount()
|
||||
if acc == nil {
|
||||
panic(errors.New("no available Notary account")) // unreachable code, because all callers of `finalize` check that acc != nil
|
||||
|
@ -328,7 +322,76 @@ func (n *Notary) finalize(tx *transaction.Transaction) error {
|
|||
return fmt.Errorf("failed to update completed transaction's size: %w", err)
|
||||
}
|
||||
|
||||
return n.onTransaction(newTx)
|
||||
n.pushNewTx(newTx, h)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type txHashPair struct {
|
||||
tx *transaction.Transaction
|
||||
mainHash util.Uint256
|
||||
}
|
||||
|
||||
func (n *Notary) pushNewTx(tx *transaction.Transaction, h util.Uint256) {
|
||||
select {
|
||||
case n.newTxs <- txHashPair{tx, h}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notary) newTxCallbackLoop() {
|
||||
for {
|
||||
select {
|
||||
case tx := <-n.newTxs:
|
||||
isMain := tx.tx.Hash() == tx.mainHash
|
||||
|
||||
n.reqMtx.Lock()
|
||||
r, ok := n.requests[tx.mainHash]
|
||||
if !ok || isMain && (r.isSent || r.minNotValidBefore <= n.Config.Chain.BlockHeight()) {
|
||||
n.reqMtx.Unlock()
|
||||
continue
|
||||
}
|
||||
if !isMain {
|
||||
// Ensure that fallback was not already completed.
|
||||
var isPending bool
|
||||
for _, fb := range r.fallbacks {
|
||||
if fb.Hash() == tx.tx.Hash() {
|
||||
isPending = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !isPending {
|
||||
n.reqMtx.Unlock()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
n.reqMtx.Unlock()
|
||||
err := n.onTransaction(tx.tx)
|
||||
if err != nil {
|
||||
n.Config.Log.Error("new transaction callback finished with error", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
n.reqMtx.Lock()
|
||||
if isMain {
|
||||
r.isSent = true
|
||||
} else {
|
||||
for i := range r.fallbacks {
|
||||
if r.fallbacks[i].Hash() == tx.tx.Hash() {
|
||||
r.fallbacks = append(r.fallbacks[:i], r.fallbacks[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(r.fallbacks) == 0 {
|
||||
delete(n.requests, tx.mainHash)
|
||||
}
|
||||
}
|
||||
n.reqMtx.Unlock()
|
||||
case <-n.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateTxSize returns transaction with re-calculated size and an error.
|
||||
|
|
Loading…
Reference in a new issue