core: add senders' fees monitor to mempool

In order to simplify maintainance of overall current senders` fees
in the mempool we are to keep them in a separate map.
This commit is contained in:
Anna Shaleva 2020-05-18 11:20:41 +03:00
parent c3e08afe34
commit 1afd630169
6 changed files with 160 additions and 20 deletions

View file

@ -723,7 +723,7 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
bc.topBlock.Store(block)
atomic.StoreUint32(&bc.blockHeight, block.Index)
updateBlockHeightMetric(block.Index)
bc.memPool.RemoveStale(bc.isTxStillRelevant)
bc.memPool.RemoveStale(bc.isTxStillRelevant, bc)
return nil
}
@ -817,6 +817,11 @@ func (bc *Blockchain) GetNEP5Balances(acc util.Uint160) *state.NEP5Balances {
return bs
}
// GetUtilityTokenBalance returns utility token (GAS) balance for the acc.
func (bc *Blockchain) GetUtilityTokenBalance(acc util.Uint160) util.Fixed8 {
return util.Fixed8FromInt64(bc.GetNEP5Balances(acc).Trackers[bc.contracts.GAS.Hash].Balance)
}
// LastBatch returns last persisted storage batch.
func (bc *Blockchain) LastBatch() *storage.MemBatch {
return bc.lastBatch
@ -1171,7 +1176,7 @@ func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) e
if t.ValidUntilBlock <= height || t.ValidUntilBlock > height+transaction.MaxValidUntilBlockIncrement {
return errors.Errorf("transaction has expired. ValidUntilBlock = %d, current height = %d", t.ValidUntilBlock, height)
}
balance := util.Fixed8FromInt64(bc.GetNEP5Balances(t.Sender).Trackers[bc.contracts.GAS.Hash].Balance)
balance := bc.GetUtilityTokenBalance(t.Sender)
need := t.SystemFee.Add(t.NetworkFee)
if balance.LessThan(need) {
return errors.Errorf("insufficient funds: balance is %v, need: %v", balance, need)
@ -1189,7 +1194,7 @@ func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) e
return errors.New("invalid transaction's inputs")
}
if block == nil {
if ok := bc.memPool.Verify(t); !ok {
if ok := bc.memPool.Verify(t, bc); !ok {
return errors.New("invalid transaction due to conflicts with the memory pool")
}
}

View file

@ -8,4 +8,5 @@ import (
type Feer interface {
IsLowPriority(util.Fixed8) bool
FeePerByte() util.Fixed8
GetUtilityTokenBalance(util.Uint160) util.Fixed8
}

View file

@ -39,6 +39,13 @@ type TxWithFee struct {
Fee util.Fixed8
}
// utilityBalanceAndFees stores sender's balance and overall fees of
// sender's transactions which are currently in mempool
type utilityBalanceAndFees struct {
balance util.Fixed8
feeSum util.Fixed8
}
// Pool stores the unconfirms transactions.
type Pool struct {
lock sync.RWMutex
@ -46,6 +53,7 @@ type Pool struct {
verifiedTxes items
inputs []*transaction.Input
claims []*transaction.Input
fees map[util.Uint160]utilityBalanceAndFees
capacity int
}
@ -156,6 +164,38 @@ func dropInputFromSortedSlice(slice *[]*transaction.Input, input *transaction.In
*slice = (*slice)[:len(*slice)-1]
}
// tryAddSendersFee tries to add system fee and network fee to the total sender`s fee in mempool
// and returns false if sender has not enough GAS to pay
func (mp *Pool) tryAddSendersFee(tx *transaction.Transaction, feer Feer) bool {
if !mp.checkBalanceAndUpdate(tx, feer) {
return false
}
mp.addSendersFee(tx)
return true
}
// checkBalanceAndUpdate returns true in case when sender has enough GAS to pay for
// the transaction and sets sender's balance value in mempool in case if it was not set
func (mp *Pool) checkBalanceAndUpdate(tx *transaction.Transaction, feer Feer) bool {
senderFee, ok := mp.fees[tx.Sender]
if !ok {
senderFee.balance = feer.GetUtilityTokenBalance(tx.Sender)
mp.fees[tx.Sender] = senderFee
}
needFee := senderFee.feeSum + tx.SystemFee + tx.NetworkFee
if senderFee.balance < needFee {
return false
}
return true
}
// addSendersFee adds system fee and network fee to the total sender`s fee in mempool
func (mp *Pool) addSendersFee(tx *transaction.Transaction) {
senderFee := mp.fees[tx.Sender]
senderFee.feeSum += tx.SystemFee + tx.NetworkFee
mp.fees[tx.Sender] = senderFee
}
// Add tries to add given transaction to the Pool.
func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error {
var pItem = &item{
@ -164,7 +204,7 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error {
}
pItem.isLowPrio = fee.IsLowPriority(pItem.txn.NetworkFee)
mp.lock.Lock()
if !mp.checkTxConflicts(t) {
if !mp.checkTxConflicts(t, fee) {
mp.lock.Unlock()
return ErrConflict
}
@ -202,6 +242,7 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error {
copy(mp.verifiedTxes[n+1:], mp.verifiedTxes[n:])
mp.verifiedTxes[n] = pItem
}
mp.addSendersFee(pItem.txn)
// For lots of inputs it might be easier to push them all and sort
// afterwards, but that requires benchmarking.
@ -237,6 +278,9 @@ func (mp *Pool) Remove(hash util.Uint256) {
} else if num == len(mp.verifiedTxes)-1 {
mp.verifiedTxes = mp.verifiedTxes[:num]
}
senderFee := mp.fees[it.txn.Sender]
senderFee.feeSum -= it.txn.SystemFee + it.txn.NetworkFee
mp.fees[it.txn.Sender] = senderFee
for i := range it.txn.Inputs {
dropInputFromSortedSlice(&mp.inputs, &it.txn.Inputs[i])
}
@ -254,15 +298,16 @@ func (mp *Pool) Remove(hash util.Uint256) {
// RemoveStale filters verified transactions through the given function keeping
// only the transactions for which it returns a true result. It's used to quickly
// drop part of the mempool that is now invalid after the block acceptance.
func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool) {
func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool, feer Feer) {
mp.lock.Lock()
// We can reuse already allocated slice
// because items are iterated one-by-one in increasing order.
newVerifiedTxes := mp.verifiedTxes[:0]
newInputs := mp.inputs[:0]
newClaims := mp.claims[:0]
mp.fees = make(map[util.Uint160]utilityBalanceAndFees) // it'd be nice to reuse existing map, but we can't easily clear it
for _, itm := range mp.verifiedTxes {
if isOK(itm.txn) {
if isOK(itm.txn) && mp.tryAddSendersFee(itm.txn, feer) {
newVerifiedTxes = append(newVerifiedTxes, itm)
for i := range itm.txn.Inputs {
newInputs = append(newInputs, &itm.txn.Inputs[i])
@ -295,6 +340,7 @@ func NewMemPool(capacity int) Pool {
verifiedMap: make(map[util.Uint256]*item),
verifiedTxes: make([]*item, 0, capacity),
capacity: capacity,
fees: make(map[util.Uint160]utilityBalanceAndFees),
}
}
@ -338,10 +384,13 @@ func areInputsInPool(inputs []transaction.Input, pool []*transaction.Input) bool
}
// checkTxConflicts is an internal unprotected version of Verify.
func (mp *Pool) checkTxConflicts(tx *transaction.Transaction) bool {
func (mp *Pool) checkTxConflicts(tx *transaction.Transaction, fee Feer) bool {
if areInputsInPool(tx.Inputs, mp.inputs) {
return false
}
if !mp.checkBalanceAndUpdate(tx, fee) {
return false
}
switch tx.Type {
case transaction.ClaimType:
claim := tx.Data.(*transaction.ClaimTX)
@ -364,8 +413,8 @@ func (mp *Pool) checkTxConflicts(tx *transaction.Transaction) bool {
// Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool.
// If yes, the transaction tx is not a valid transaction and the function return false.
// If no, the transaction tx is a valid transaction and the function return true.
func (mp *Pool) Verify(tx *transaction.Transaction) bool {
func (mp *Pool) Verify(tx *transaction.Transaction, feer Feer) bool {
mp.lock.RLock()
defer mp.lock.RUnlock()
return mp.checkTxConflicts(tx)
return mp.checkTxConflicts(tx, feer)
}

View file

@ -24,6 +24,10 @@ func (fs *FeerStub) FeePerByte() util.Fixed8 {
return fs.feePerByte
}
func (fs *FeerStub) GetUtilityTokenBalance(uint160 util.Uint160) util.Fixed8 {
return util.Fixed8FromInt64(10000)
}
func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
mp := NewMemPool(10)
tx := transaction.NewContractTX()
@ -111,7 +115,7 @@ func TestMemPoolAddRemoveWithInputsAndClaims(t *testing.T) {
return false
}
return true
})
}, &FeerStub{})
assert.Equal(t, len(txm1.Inputs), len(mp.inputs))
assert.True(t, sort.SliceIsSorted(mp.inputs, mpLessInputs))
assert.Equal(t, len(claim2.Claims), len(mp.claims))
@ -124,24 +128,24 @@ func TestMemPoolVerifyInputs(t *testing.T) {
tx.Nonce = 1
inhash1 := random.Uint256()
tx.Inputs = append(tx.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 0})
require.Equal(t, true, mp.Verify(tx))
require.Equal(t, true, mp.Verify(tx, &FeerStub{}))
require.NoError(t, mp.Add(tx, &FeerStub{}))
tx2 := transaction.NewContractTX()
tx2.Nonce = 2
inhash2 := random.Uint256()
tx2.Inputs = append(tx2.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0})
require.Equal(t, true, mp.Verify(tx2))
require.Equal(t, true, mp.Verify(tx2, &FeerStub{}))
require.NoError(t, mp.Add(tx2, &FeerStub{}))
tx3 := transaction.NewContractTX()
tx3.Nonce = 3
// Different index number, but the same PrevHash as in tx1.
tx3.Inputs = append(tx3.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 1})
require.Equal(t, true, mp.Verify(tx3))
require.Equal(t, true, mp.Verify(tx3, &FeerStub{}))
// The same input as in tx2.
tx3.Inputs = append(tx3.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0})
require.Equal(t, false, mp.Verify(tx3))
require.Equal(t, false, mp.Verify(tx3, &FeerStub{}))
require.Error(t, mp.Add(tx3, &FeerStub{}))
}
@ -156,30 +160,30 @@ func TestMemPoolVerifyClaims(t *testing.T) {
claim1.Claims = append(claim1.Claims, transaction.Input{PrevHash: hash1, PrevIndex: uint16(i)})
claim1.Claims = append(claim1.Claims, transaction.Input{PrevHash: hash2, PrevIndex: uint16(i)})
}
require.Equal(t, true, mp.Verify(tx1))
require.Equal(t, true, mp.Verify(tx1, &FeerStub{}))
require.NoError(t, mp.Add(tx1, &FeerStub{}))
tx2, claim2 := newClaimTX()
for i := 0; i < 10; i++ {
claim2.Claims = append(claim2.Claims, transaction.Input{PrevHash: hash2, PrevIndex: uint16(i + 10)})
}
require.Equal(t, true, mp.Verify(tx2))
require.Equal(t, true, mp.Verify(tx2, &FeerStub{}))
require.NoError(t, mp.Add(tx2, &FeerStub{}))
tx3, claim3 := newClaimTX()
claim3.Claims = append(claim3.Claims, transaction.Input{PrevHash: hash1, PrevIndex: 0})
require.Equal(t, false, mp.Verify(tx3))
require.Equal(t, false, mp.Verify(tx3, &FeerStub{}))
require.Error(t, mp.Add(tx3, &FeerStub{}))
}
func TestMemPoolVerifyIssue(t *testing.T) {
mp := NewMemPool(50)
tx1 := newIssueTX()
require.Equal(t, true, mp.Verify(tx1))
require.Equal(t, true, mp.Verify(tx1, &FeerStub{}))
require.NoError(t, mp.Add(tx1, &FeerStub{}))
tx2 := newIssueTX()
require.Equal(t, false, mp.Verify(tx2))
require.Equal(t, false, mp.Verify(tx2, &FeerStub{}))
require.Error(t, mp.Add(tx2, &FeerStub{}))
}
@ -335,7 +339,7 @@ func TestRemoveStale(t *testing.T) {
}
}
return false
})
}, &FeerStub{})
require.Equal(t, mempoolSize/2, mp.Count())
verTxes := mp.GetVerifiedTransactions()
for _, txf := range verTxes {
@ -343,3 +347,76 @@ func TestRemoveStale(t *testing.T) {
require.Contains(t, txes2, txf.Tx)
}
}
func TestMemPoolFees(t *testing.T) {
mp := NewMemPool(10)
sender0 := util.Uint160{1, 2, 3}
tx0 := transaction.NewContractTX()
tx0.NetworkFee = util.Fixed8FromInt64(11000)
tx0.Sender = sender0
// insufficient funds to add transaction, but balance should be stored
require.Equal(t, false, mp.Verify(tx0, &FeerStub{}))
require.Error(t, mp.Add(tx0, &FeerStub{}))
require.Equal(t, 1, len(mp.fees))
require.Equal(t, utilityBalanceAndFees{
balance: util.Fixed8FromInt64(10000),
feeSum: 0,
}, mp.fees[sender0])
// no problems with adding another transaction with lower fee
tx1 := transaction.NewContractTX()
tx1.NetworkFee = util.Fixed8FromInt64(7000)
tx1.Sender = sender0
require.NoError(t, mp.Add(tx1, &FeerStub{}))
require.Equal(t, 1, len(mp.fees))
require.Equal(t, utilityBalanceAndFees{
balance: util.Fixed8FromInt64(10000),
feeSum: util.Fixed8FromInt64(7000),
}, mp.fees[sender0])
// balance shouldn't change after adding one more transaction
tx2 := transaction.NewContractTX()
tx2.NetworkFee = util.Fixed8FromFloat(3000)
tx2.Sender = sender0
require.NoError(t, mp.Add(tx2, &FeerStub{}))
require.Equal(t, 2, len(mp.verifiedTxes))
require.Equal(t, 1, len(mp.fees))
require.Equal(t, utilityBalanceAndFees{
balance: util.Fixed8FromInt64(10000),
feeSum: util.Fixed8FromInt64(10000),
}, mp.fees[sender0])
// can't add more transactions as we don't have enough GAS
tx3 := transaction.NewContractTX()
tx3.NetworkFee = util.Fixed8FromFloat(0.5)
tx3.Sender = sender0
require.Equal(t, false, mp.Verify(tx3, &FeerStub{}))
require.Error(t, mp.Add(tx3, &FeerStub{}))
require.Equal(t, 1, len(mp.fees))
require.Equal(t, utilityBalanceAndFees{
balance: util.Fixed8FromInt64(10000),
feeSum: util.Fixed8FromInt64(10000),
}, mp.fees[sender0])
// check whether sender's fee updates correctly
mp.RemoveStale(func(t *transaction.Transaction) bool {
if t == tx2 {
return true
}
return false
}, &FeerStub{})
require.Equal(t, 1, len(mp.fees))
require.Equal(t, utilityBalanceAndFees{
balance: util.Fixed8FromInt64(10000),
feeSum: util.Fixed8FromFloat(3000),
}, mp.fees[sender0])
// there should be nothing left
mp.RemoveStale(func(t *transaction.Transaction) bool {
if t == tx3 {
return true
}
return false
}, &FeerStub{})
require.Equal(t, 0, len(mp.fees))
}

View file

@ -137,6 +137,10 @@ func (chain testChain) IsLowPriority(util.Fixed8) bool {
panic("TODO")
}
func (chain testChain) GetUtilityTokenBalance(uint160 util.Uint160) util.Fixed8 {
panic("TODO")
}
func (chain testChain) PoolTx(*transaction.Transaction) error {
panic("TODO")
}

View file

@ -71,3 +71,7 @@ func (fs *FeerStub) IsLowPriority(util.Fixed8) bool {
func (fs *FeerStub) FeePerByte() util.Fixed8 {
return 0
}
func (fs *FeerStub) GetUtilityTokenBalance(acc util.Uint160) util.Fixed8 {
return util.Fixed8FromInt64(1000000)
}