Merge pull request #2969 from nspcc-dev/mp-metrics

Split notarypool and mempool metrics. Also, do not use metrics for temporary pools
created during block/transaction verification.

Close #2950.
This commit is contained in:
Roman Khimov 2023-04-14 15:43:37 +03:00 committed by GitHub
commit e0abe2b858
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 69 additions and 57 deletions

View file

@ -62,7 +62,7 @@ func NewFakeChainWithCustomCfg(protocolCfg func(c *config.Blockchain)) *FakeChai
protocolCfg(&cfg) protocolCfg(&cfg)
} }
return &FakeChain{ return &FakeChain{
Pool: mempool.New(10, 0, false), Pool: mempool.New(10, 0, false, nil),
PoolTxF: func(*transaction.Transaction) error { return nil }, PoolTxF: func(*transaction.Transaction) error { return nil },
poolTxWithData: func(*transaction.Transaction, any, *mempool.Pool) error { return nil }, poolTxWithData: func(*transaction.Transaction, any, *mempool.Pool) error { return nil },
blocks: make(map[util.Uint256]*block.Block), blocks: make(map[util.Uint256]*block.Block),

View file

@ -506,7 +506,7 @@ func (s *service) verifyBlock(b block.Block) bool {
} }
var fee int64 var fee int64
var pool = mempool.New(len(coreb.Transactions), 0, false) var pool = mempool.New(len(coreb.Transactions), 0, false, nil)
var mainPool = s.Chain.GetMemPool() var mainPool = s.Chain.GetMemPool()
for _, tx := range coreb.Transactions { for _, tx := range coreb.Transactions {
var err error var err error

View file

@ -309,7 +309,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
store: s, store: s,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
runToExitCh: make(chan struct{}), runToExitCh: make(chan struct{}),
memPool: mempool.New(cfg.MemPoolSize, 0, false), memPool: mempool.New(cfg.MemPoolSize, 0, false, updateMempoolMetrics),
log: log, log: log,
events: make(chan bcEvent), events: make(chan bcEvent),
subCh: make(chan any), subCh: make(chan any),
@ -1446,7 +1446,7 @@ func (bc *Blockchain) AddBlock(block *block.Block) error {
if !block.MerkleRoot.Equals(merkle) { if !block.MerkleRoot.Equals(merkle) {
return errors.New("invalid block: MerkleRoot mismatch") return errors.New("invalid block: MerkleRoot mismatch")
} }
mp = mempool.New(len(block.Transactions), 0, false) mp = mempool.New(len(block.Transactions), 0, false, nil)
for _, tx := range block.Transactions { for _, tx := range block.Transactions {
var err error var err error
// Transactions are verified before adding them // Transactions are verified before adding them
@ -2651,7 +2651,7 @@ func (bc *Blockchain) IsTxStillRelevant(t *transaction.Transaction, txpool *memp
// current blockchain state. Note that this verification is completely isolated // current blockchain state. Note that this verification is completely isolated
// from the main node's mempool. // from the main node's mempool.
func (bc *Blockchain) VerifyTx(t *transaction.Transaction) error { func (bc *Blockchain) VerifyTx(t *transaction.Transaction) error {
var mp = mempool.New(1, 0, false) var mp = mempool.New(1, 0, false, nil)
bc.lock.RLock() bc.lock.RLock()
defer bc.lock.RUnlock() defer bc.lock.RUnlock()
return bc.verifyAndPoolTx(t, mp, bc) return bc.verifyAndPoolTx(t, mp, bc)

View file

@ -1301,7 +1301,7 @@ func TestBlockchain_VerifyTx(t *testing.T) {
require.True(t, errors.Is(err, core.ErrAlreadyExists)) require.True(t, errors.Is(err, core.ErrAlreadyExists))
}) })
t.Run("MemPoolOOM", func(t *testing.T) { t.Run("MemPoolOOM", func(t *testing.T) {
mp := mempool.New(1, 0, false) mp := mempool.New(1, 0, false, nil)
tx1 := newTestTx(t, h, testScript) tx1 := newTestTx(t, h, testScript)
tx1.NetworkFee += 10000 // Give it more priority. tx1.NetworkFee += 10000 // Give it more priority.
require.NoError(t, accs[0].SignTx(netmode.UnitTestNet, tx1)) require.NoError(t, accs[0].SignTx(netmode.UnitTestNet, tx1))
@ -1860,7 +1860,7 @@ func TestBlockchain_VerifyTx(t *testing.T) {
return tx return tx
} }
mp := mempool.New(10, 1, false) mp := mempool.New(10, 1, false, nil)
verificationF := func(tx *transaction.Transaction, data any) error { verificationF := func(tx *transaction.Transaction, data any) error {
if data.(int) > 5 { if data.(int) > 5 {
return errors.New("bad data") return errors.New("bad data")

View file

@ -65,9 +65,10 @@ type Pool struct {
// oracleResp contains the ids of oracle responses for the tx in the pool. // oracleResp contains the ids of oracle responses for the tx in the pool.
oracleResp map[uint64]util.Uint256 oracleResp map[uint64]util.Uint256
capacity int capacity int
feePerByte int64 feePerByte int64
payerIndex int payerIndex int
updateMetricsCb func(int)
resendThreshold uint32 resendThreshold uint32
resendFunc func(*transaction.Transaction, any) resendFunc func(*transaction.Transaction, any)
@ -286,7 +287,9 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...any) error {
// we already checked balance in checkTxConflicts, so don't need to check again // we already checked balance in checkTxConflicts, so don't need to check again
mp.tryAddSendersFee(pItem.txn, fee, false) mp.tryAddSendersFee(pItem.txn, fee, false)
updateMempoolMetrics(len(mp.verifiedTxes)) if mp.updateMetricsCb != nil {
mp.updateMetricsCb(len(mp.verifiedTxes))
}
mp.lock.Unlock() mp.lock.Unlock()
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
@ -342,7 +345,9 @@ func (mp *Pool) removeInternal(hash util.Uint256, feer Feer) {
} }
} }
} }
updateMempoolMetrics(len(mp.verifiedTxes)) if mp.updateMetricsCb != nil {
mp.updateMetricsCb(len(mp.verifiedTxes))
}
} }
// RemoveStale filters verified transactions through the given function keeping // RemoveStale filters verified transactions through the given function keeping
@ -420,7 +425,7 @@ func (mp *Pool) checkPolicy(tx *transaction.Transaction, policyChanged bool) boo
} }
// New returns a new Pool struct. // New returns a new Pool struct.
func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool { func New(capacity int, payerIndex int, enableSubscriptions bool, updateMetricsCb func(int)) *Pool {
mp := &Pool{ mp := &Pool{
verifiedMap: make(map[util.Uint256]*transaction.Transaction, capacity), verifiedMap: make(map[util.Uint256]*transaction.Transaction, capacity),
verifiedTxes: make([]item, 0, capacity), verifiedTxes: make([]item, 0, capacity),
@ -434,6 +439,7 @@ func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool {
events: make(chan mempoolevent.Event), events: make(chan mempoolevent.Event),
subCh: make(chan chan<- mempoolevent.Event), subCh: make(chan chan<- mempoolevent.Event),
unsubCh: make(chan chan<- mempoolevent.Event), unsubCh: make(chan chan<- mempoolevent.Event),
updateMetricsCb: updateMetricsCb,
} }
mp.subscriptionsOn.Store(false) mp.subscriptionsOn.Store(false)
return mp return mp

View file

@ -45,7 +45,7 @@ func (fs *FeerStub) P2PSigExtensionsEnabled() bool {
} }
func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
mp := New(10, 0, false) mp := New(10, 0, false, nil)
tx := transaction.New([]byte{byte(opcode.PUSH1)}, 0) tx := transaction.New([]byte{byte(opcode.PUSH1)}, 0)
tx.Nonce = 0 tx.Nonce = 0
tx.Signers = []transaction.Signer{{Account: util.Uint160{1, 2, 3}}} tx.Signers = []transaction.Signer{{Account: util.Uint160{1, 2, 3}}}
@ -66,7 +66,7 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
} }
func TestMemPoolRemoveStale(t *testing.T) { func TestMemPoolRemoveStale(t *testing.T) {
mp := New(5, 0, false) mp := New(5, 0, false, nil)
txs := make([]*transaction.Transaction, 5) txs := make([]*transaction.Transaction, 5)
for i := range txs { for i := range txs {
txs[i] = transaction.New([]byte{byte(opcode.PUSH1)}, 0) txs[i] = transaction.New([]byte{byte(opcode.PUSH1)}, 0)
@ -117,7 +117,7 @@ func TestMemPoolAddRemove(t *testing.T) {
func TestOverCapacity(t *testing.T) { func TestOverCapacity(t *testing.T) {
var fs = &FeerStub{balance: 10000000} var fs = &FeerStub{balance: 10000000}
const mempoolSize = 10 const mempoolSize = 10
mp := New(mempoolSize, 0, false) mp := New(mempoolSize, 0, false, nil)
for i := 0; i < mempoolSize; i++ { for i := 0; i < mempoolSize; i++ {
tx := transaction.New([]byte{byte(opcode.PUSH1)}, 0) tx := transaction.New([]byte{byte(opcode.PUSH1)}, 0)
@ -193,7 +193,7 @@ func TestOverCapacity(t *testing.T) {
func TestGetVerified(t *testing.T) { func TestGetVerified(t *testing.T) {
var fs = &FeerStub{} var fs = &FeerStub{}
const mempoolSize = 10 const mempoolSize = 10
mp := New(mempoolSize, 0, false) mp := New(mempoolSize, 0, false, nil)
txes := make([]*transaction.Transaction, 0, mempoolSize) txes := make([]*transaction.Transaction, 0, mempoolSize)
for i := 0; i < mempoolSize; i++ { for i := 0; i < mempoolSize; i++ {
@ -217,7 +217,7 @@ func TestGetVerified(t *testing.T) {
func TestRemoveStale(t *testing.T) { func TestRemoveStale(t *testing.T) {
var fs = &FeerStub{} var fs = &FeerStub{}
const mempoolSize = 10 const mempoolSize = 10
mp := New(mempoolSize, 0, false) mp := New(mempoolSize, 0, false, nil)
txes1 := make([]*transaction.Transaction, 0, mempoolSize/2) txes1 := make([]*transaction.Transaction, 0, mempoolSize/2)
txes2 := make([]*transaction.Transaction, 0, mempoolSize/2) txes2 := make([]*transaction.Transaction, 0, mempoolSize/2)
@ -250,7 +250,7 @@ func TestRemoveStale(t *testing.T) {
} }
func TestMemPoolFees(t *testing.T) { func TestMemPoolFees(t *testing.T) {
mp := New(10, 0, false) mp := New(10, 0, false, nil)
fs := &FeerStub{balance: 10000000} fs := &FeerStub{balance: 10000000}
sender0 := util.Uint160{1, 2, 3} sender0 := util.Uint160{1, 2, 3}
tx0 := transaction.New([]byte{byte(opcode.PUSH1)}, 0) tx0 := transaction.New([]byte{byte(opcode.PUSH1)}, 0)
@ -355,7 +355,7 @@ func TestMempoolItemsOrder(t *testing.T) {
} }
func TestMempoolAddRemoveOracleResponse(t *testing.T) { func TestMempoolAddRemoveOracleResponse(t *testing.T) {
mp := New(3, 0, false) mp := New(3, 0, false, nil)
nonce := uint32(0) nonce := uint32(0)
fs := &FeerStub{balance: 10000} fs := &FeerStub{balance: 10000}
newTx := func(netFee int64, id uint64) *transaction.Transaction { newTx := func(netFee int64, id uint64) *transaction.Transaction {
@ -425,7 +425,7 @@ func TestMempoolAddRemoveOracleResponse(t *testing.T) {
func TestMempoolAddRemoveConflicts(t *testing.T) { func TestMempoolAddRemoveConflicts(t *testing.T) {
capacity := 6 capacity := 6
mp := New(capacity, 0, false) mp := New(capacity, 0, false, nil)
var ( var (
fs = &FeerStub{p2pSigExt: true, balance: 100000} fs = &FeerStub{p2pSigExt: true, balance: 100000}
nonce uint32 = 1 nonce uint32 = 1
@ -555,7 +555,7 @@ func TestMempoolAddWithDataGetData(t *testing.T) {
blockHeight: 5, blockHeight: 5,
balance: 100, balance: 100,
} }
mp := New(10, 1, false) mp := New(10, 1, false, nil)
newTx := func(t *testing.T, netFee int64) *transaction.Transaction { newTx := func(t *testing.T, netFee int64) *transaction.Transaction {
tx := transaction.New([]byte{byte(opcode.RET)}, 0) tx := transaction.New([]byte{byte(opcode.RET)}, 0)
tx.Signers = []transaction.Signer{{}, {}} tx.Signers = []transaction.Signer{{}, {}}

View file

@ -1,24 +0,0 @@
package mempool
import "github.com/prometheus/client_golang/prometheus"
var (
//mempoolUnsortedTx prometheus metric.
mempoolUnsortedTx = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Mempool Unsorted TXs",
Name: "mempool_unsorted_tx",
Namespace: "neogo",
},
)
)
func init() {
prometheus.MustRegister(
mempoolUnsortedTx,
)
}
func updateMempoolMetrics(unsortedTxnLen int) {
mempoolUnsortedTx.Set(float64(unsortedTxnLen))
}

View file

@ -13,7 +13,7 @@ import (
func TestSubscriptions(t *testing.T) { func TestSubscriptions(t *testing.T) {
t.Run("disabled subscriptions", func(t *testing.T) { t.Run("disabled subscriptions", func(t *testing.T) {
mp := New(5, 0, false) mp := New(5, 0, false, nil)
require.Panics(t, func() { require.Panics(t, func() {
mp.RunSubscriptions() mp.RunSubscriptions()
}) })
@ -24,7 +24,7 @@ func TestSubscriptions(t *testing.T) {
t.Run("enabled subscriptions", func(t *testing.T) { t.Run("enabled subscriptions", func(t *testing.T) {
fs := &FeerStub{balance: 100} fs := &FeerStub{balance: 100}
mp := New(2, 0, true) mp := New(2, 0, true, nil)
mp.RunSubscriptions() mp.RunSubscriptions()
subChan1 := make(chan mempoolevent.Event, 3) subChan1 := make(chan mempoolevent.Event, 3)
subChan2 := make(chan mempoolevent.Event, 3) subChan2 := make(chan mempoolevent.Event, 3)

View file

@ -6,7 +6,7 @@ import (
// Metrics for monitoring service. // Metrics for monitoring service.
var ( var (
//blockHeight prometheus metric. // blockHeight prometheus metric.
blockHeight = prometheus.NewGauge( blockHeight = prometheus.NewGauge(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Help: "Current index of processed block", Help: "Current index of processed block",
@ -14,7 +14,7 @@ var (
Namespace: "neogo", Namespace: "neogo",
}, },
) )
//persistedHeight prometheus metric. // persistedHeight prometheus metric.
persistedHeight = prometheus.NewGauge( persistedHeight = prometheus.NewGauge(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Help: "Current persisted block count", Help: "Current persisted block count",
@ -22,7 +22,7 @@ var (
Namespace: "neogo", Namespace: "neogo",
}, },
) )
//headerHeight prometheus metric. // headerHeight prometheus metric.
headerHeight = prometheus.NewGauge( headerHeight = prometheus.NewGauge(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Help: "Current header height", Help: "Current header height",
@ -30,6 +30,14 @@ var (
Namespace: "neogo", Namespace: "neogo",
}, },
) )
// mempoolUnsortedTx prometheus metric.
mempoolUnsortedTx = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Mempool unsorted transactions",
Name: "mempool_unsorted_tx",
Namespace: "neogo",
},
)
) )
func init() { func init() {
@ -37,6 +45,7 @@ func init() {
blockHeight, blockHeight,
persistedHeight, persistedHeight,
headerHeight, headerHeight,
mempoolUnsortedTx,
) )
} }
@ -51,3 +60,8 @@ func updateHeaderHeightMetric(hHeight uint32) {
func updateBlockHeightMetric(bHeight uint32) { func updateBlockHeightMetric(bHeight uint32) {
blockHeight.Set(float64(bHeight)) blockHeight.Set(float64(bHeight))
} }
// updateMempoolMetrics updates metric of the number of unsorted txs inside the mempool.
func updateMempoolMetrics(unsortedTxnLen int) {
mempoolUnsortedTx.Set(float64(unsortedTxnLen))
}

View file

@ -50,6 +50,15 @@ var (
}, },
) )
p2pCmds = make(map[CommandType]prometheus.Histogram) p2pCmds = make(map[CommandType]prometheus.Histogram)
// notarypoolUnsortedTx prometheus metric.
notarypoolUnsortedTx = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Notary request pool fallback txs",
Name: "notarypool_unsorted_tx",
Namespace: "neogo",
},
)
) )
func init() { func init() {
@ -59,6 +68,7 @@ func init() {
servAndNodeVersion, servAndNodeVersion,
poolCount, poolCount,
blockQueueLength, blockQueueLength,
notarypoolUnsortedTx,
) )
for _, cmd := range []CommandType{CMDVersion, CMDVerack, CMDGetAddr, for _, cmd := range []CommandType{CMDVersion, CMDVerack, CMDGetAddr,
CMDAddr, CMDPing, CMDPong, CMDGetHeaders, CMDHeaders, CMDGetBlocks, CMDAddr, CMDPing, CMDPong, CMDGetHeaders, CMDHeaders, CMDGetBlocks,
@ -103,3 +113,9 @@ func addCmdTimeMetric(cmd CommandType, t time.Duration) {
} }
p2pCmds[cmd].Observe(t.Seconds()) p2pCmds[cmd].Observe(t.Seconds())
} }
// updateNotarypoolMetrics updates metric of the number of fallback txs inside
// the notary request pool.
func updateNotarypoolMetrics(unsortedTxnLen int) {
notarypoolUnsortedTx.Set(float64(unsortedTxnLen))
}

View file

@ -198,7 +198,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
} }
if chain.P2PSigExtensionsEnabled() { if chain.P2PSigExtensionsEnabled() {
s.notaryFeer = NewNotaryFeer(chain) s.notaryFeer = NewNotaryFeer(chain)
s.notaryRequestPool = mempool.New(s.config.P2PNotaryRequestPayloadPoolSize, 1, true) s.notaryRequestPool = mempool.New(s.config.P2PNotaryRequestPayloadPoolSize, 1, true, updateNotarypoolMetrics)
chain.RegisterPostBlock(func(isRelevant func(*transaction.Transaction, *mempool.Pool, bool) bool, txpool *mempool.Pool, _ *block.Block) { chain.RegisterPostBlock(func(isRelevant func(*transaction.Transaction, *mempool.Pool, bool) bool, txpool *mempool.Pool, _ *block.Block) {
s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool { s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool {
return isRelevant(t, txpool, true) return isRelevant(t, txpool, true)

View file

@ -47,7 +47,7 @@ func getTestNotary(t *testing.T, bc *core.Blockchain, walletPath, pass string, o
Chain: bc, Chain: bc,
Log: zaptest.NewLogger(t), Log: zaptest.NewLogger(t),
} }
mp := mempool.New(10, 1, true) mp := mempool.New(10, 1, true, nil)
ntr, err := notary.NewNotary(cfg, netmode.UnitTestNet, mp, onTx) ntr, err := notary.NewNotary(cfg, netmode.UnitTestNet, mp, onTx)
require.NoError(t, err) require.NoError(t, err)

View file

@ -21,7 +21,7 @@ func getTestNotary(t *testing.T, bc Ledger, walletPath, pass string) (*wallet.Ac
Password: pass, Password: pass,
}, },
} }
mp := mempool.New(10, 1, true) mp := mempool.New(10, 1, true, nil)
cfg := Config{ cfg := Config{
MainCfg: mainCfg, MainCfg: mainCfg,
Chain: bc, Chain: bc,

View file

@ -27,21 +27,21 @@ func TestWallet(t *testing.T) {
} }
t.Run("unexisting wallet", func(t *testing.T) { t.Run("unexisting wallet", func(t *testing.T) {
cfg.MainCfg.UnlockWallet.Path = "./testdata/does_not_exists.json" cfg.MainCfg.UnlockWallet.Path = "./testdata/does_not_exists.json"
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil) _, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil)
require.Error(t, err) require.Error(t, err)
}) })
t.Run("bad password", func(t *testing.T) { t.Run("bad password", func(t *testing.T) {
cfg.MainCfg.UnlockWallet.Path = "./testdata/notary1.json" cfg.MainCfg.UnlockWallet.Path = "./testdata/notary1.json"
cfg.MainCfg.UnlockWallet.Password = "invalid" cfg.MainCfg.UnlockWallet.Password = "invalid"
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil) _, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil)
require.Error(t, err) require.Error(t, err)
}) })
t.Run("good", func(t *testing.T) { t.Run("good", func(t *testing.T) {
cfg.MainCfg.UnlockWallet.Path = "./testdata/notary1.json" cfg.MainCfg.UnlockWallet.Path = "./testdata/notary1.json"
cfg.MainCfg.UnlockWallet.Password = "one" cfg.MainCfg.UnlockWallet.Password = "one"
_, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil) _, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil)
require.NoError(t, err) require.NoError(t, err)
}) })
} }