From 3a71aafc4315d669ebe0efd3b2e1397a26d6eeed Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Thu, 13 Apr 2023 14:03:02 +0300 Subject: [PATCH] core: distinguish notarypool/mempool metrics Move them to the core/network packages, close #2950. The name of mempool's unsorted transactions metrics has been changed along the way to match the core's metrics naming convention. Signed-off-by: Anna Shaleva --- internal/fakechain/fakechain.go | 2 +- pkg/consensus/consensus.go | 2 +- pkg/core/blockchain.go | 6 +++--- pkg/core/blockchain_neotest_test.go | 4 ++-- pkg/core/mempool/mem_pool.go | 18 ++++++++++++------ pkg/core/mempool/mem_pool_test.go | 18 +++++++++--------- pkg/core/mempool/prometheus.go | 24 ------------------------ pkg/core/mempool/subscriptions_test.go | 4 ++-- pkg/core/prometheus.go | 14 ++++++++++++++ pkg/network/prometheus.go | 16 ++++++++++++++++ pkg/network/server.go | 2 +- pkg/services/notary/core_test.go | 2 +- pkg/services/notary/node_test.go | 2 +- pkg/services/notary/notary_test.go | 6 +++--- 14 files changed, 66 insertions(+), 54 deletions(-) delete mode 100644 pkg/core/mempool/prometheus.go diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index 83f889f10..51e3a8a0a 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -62,7 +62,7 @@ func NewFakeChainWithCustomCfg(protocolCfg func(c *config.Blockchain)) *FakeChai protocolCfg(&cfg) } return &FakeChain{ - Pool: mempool.New(10, 0, false), + Pool: mempool.New(10, 0, false, nil), PoolTxF: func(*transaction.Transaction) error { return nil }, poolTxWithData: func(*transaction.Transaction, any, *mempool.Pool) error { return nil }, blocks: make(map[util.Uint256]*block.Block), diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 777f5d341..7cd1e52d0 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -506,7 +506,7 @@ func (s *service) verifyBlock(b block.Block) bool { } var fee int64 - var pool = mempool.New(len(coreb.Transactions), 0, false) + var pool = mempool.New(len(coreb.Transactions), 0, false, nil) var mainPool = s.Chain.GetMemPool() for _, tx := range coreb.Transactions { var err error diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index f9996d3b4..07ada76bd 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -309,7 +309,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl store: s, stopCh: make(chan struct{}), runToExitCh: make(chan struct{}), - memPool: mempool.New(cfg.MemPoolSize, 0, false), + memPool: mempool.New(cfg.MemPoolSize, 0, false, updateMempoolMetrics), log: log, events: make(chan bcEvent), subCh: make(chan any), @@ -1446,7 +1446,7 @@ func (bc *Blockchain) AddBlock(block *block.Block) error { if !block.MerkleRoot.Equals(merkle) { return errors.New("invalid block: MerkleRoot mismatch") } - mp = mempool.New(len(block.Transactions), 0, false) + mp = mempool.New(len(block.Transactions), 0, false, nil) for _, tx := range block.Transactions { var err error // Transactions are verified before adding them @@ -2651,7 +2651,7 @@ func (bc *Blockchain) IsTxStillRelevant(t *transaction.Transaction, txpool *memp // current blockchain state. Note that this verification is completely isolated // from the main node's mempool. func (bc *Blockchain) VerifyTx(t *transaction.Transaction) error { - var mp = mempool.New(1, 0, false) + var mp = mempool.New(1, 0, false, nil) bc.lock.RLock() defer bc.lock.RUnlock() return bc.verifyAndPoolTx(t, mp, bc) diff --git a/pkg/core/blockchain_neotest_test.go b/pkg/core/blockchain_neotest_test.go index da59af3cf..ad8c74b5b 100644 --- a/pkg/core/blockchain_neotest_test.go +++ b/pkg/core/blockchain_neotest_test.go @@ -1301,7 +1301,7 @@ func TestBlockchain_VerifyTx(t *testing.T) { require.True(t, errors.Is(err, core.ErrAlreadyExists)) }) t.Run("MemPoolOOM", func(t *testing.T) { - mp := mempool.New(1, 0, false) + mp := mempool.New(1, 0, false, nil) tx1 := newTestTx(t, h, testScript) tx1.NetworkFee += 10000 // Give it more priority. require.NoError(t, accs[0].SignTx(netmode.UnitTestNet, tx1)) @@ -1860,7 +1860,7 @@ func TestBlockchain_VerifyTx(t *testing.T) { return tx } - mp := mempool.New(10, 1, false) + mp := mempool.New(10, 1, false, nil) verificationF := func(tx *transaction.Transaction, data any) error { if data.(int) > 5 { return errors.New("bad data") diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 0faf79c96..901b528bb 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -65,9 +65,10 @@ type Pool struct { // oracleResp contains the ids of oracle responses for the tx in the pool. oracleResp map[uint64]util.Uint256 - capacity int - feePerByte int64 - payerIndex int + capacity int + feePerByte int64 + payerIndex int + updateMetricsCb func(int) resendThreshold uint32 resendFunc func(*transaction.Transaction, any) @@ -286,7 +287,9 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...any) error { // we already checked balance in checkTxConflicts, so don't need to check again mp.tryAddSendersFee(pItem.txn, fee, false) - updateMempoolMetrics(len(mp.verifiedTxes)) + if mp.updateMetricsCb != nil { + mp.updateMetricsCb(len(mp.verifiedTxes)) + } mp.lock.Unlock() if mp.subscriptionsOn.Load() { @@ -342,7 +345,9 @@ func (mp *Pool) removeInternal(hash util.Uint256, feer Feer) { } } } - updateMempoolMetrics(len(mp.verifiedTxes)) + if mp.updateMetricsCb != nil { + mp.updateMetricsCb(len(mp.verifiedTxes)) + } } // RemoveStale filters verified transactions through the given function keeping @@ -420,7 +425,7 @@ func (mp *Pool) checkPolicy(tx *transaction.Transaction, policyChanged bool) boo } // New returns a new Pool struct. -func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool { +func New(capacity int, payerIndex int, enableSubscriptions bool, updateMetricsCb func(int)) *Pool { mp := &Pool{ verifiedMap: make(map[util.Uint256]*transaction.Transaction, capacity), verifiedTxes: make([]item, 0, capacity), @@ -434,6 +439,7 @@ func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool { events: make(chan mempoolevent.Event), subCh: make(chan chan<- mempoolevent.Event), unsubCh: make(chan chan<- mempoolevent.Event), + updateMetricsCb: updateMetricsCb, } mp.subscriptionsOn.Store(false) return mp diff --git a/pkg/core/mempool/mem_pool_test.go b/pkg/core/mempool/mem_pool_test.go index 43a1f0cef..66882e4f3 100644 --- a/pkg/core/mempool/mem_pool_test.go +++ b/pkg/core/mempool/mem_pool_test.go @@ -45,7 +45,7 @@ func (fs *FeerStub) P2PSigExtensionsEnabled() bool { } func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { - mp := New(10, 0, false) + mp := New(10, 0, false, nil) tx := transaction.New([]byte{byte(opcode.PUSH1)}, 0) tx.Nonce = 0 tx.Signers = []transaction.Signer{{Account: util.Uint160{1, 2, 3}}} @@ -66,7 +66,7 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { } func TestMemPoolRemoveStale(t *testing.T) { - mp := New(5, 0, false) + mp := New(5, 0, false, nil) txs := make([]*transaction.Transaction, 5) for i := range txs { txs[i] = transaction.New([]byte{byte(opcode.PUSH1)}, 0) @@ -117,7 +117,7 @@ func TestMemPoolAddRemove(t *testing.T) { func TestOverCapacity(t *testing.T) { var fs = &FeerStub{balance: 10000000} const mempoolSize = 10 - mp := New(mempoolSize, 0, false) + mp := New(mempoolSize, 0, false, nil) for i := 0; i < mempoolSize; i++ { tx := transaction.New([]byte{byte(opcode.PUSH1)}, 0) @@ -193,7 +193,7 @@ func TestOverCapacity(t *testing.T) { func TestGetVerified(t *testing.T) { var fs = &FeerStub{} const mempoolSize = 10 - mp := New(mempoolSize, 0, false) + mp := New(mempoolSize, 0, false, nil) txes := make([]*transaction.Transaction, 0, mempoolSize) for i := 0; i < mempoolSize; i++ { @@ -217,7 +217,7 @@ func TestGetVerified(t *testing.T) { func TestRemoveStale(t *testing.T) { var fs = &FeerStub{} const mempoolSize = 10 - mp := New(mempoolSize, 0, false) + mp := New(mempoolSize, 0, false, nil) txes1 := make([]*transaction.Transaction, 0, mempoolSize/2) txes2 := make([]*transaction.Transaction, 0, mempoolSize/2) @@ -250,7 +250,7 @@ func TestRemoveStale(t *testing.T) { } func TestMemPoolFees(t *testing.T) { - mp := New(10, 0, false) + mp := New(10, 0, false, nil) fs := &FeerStub{balance: 10000000} sender0 := util.Uint160{1, 2, 3} tx0 := transaction.New([]byte{byte(opcode.PUSH1)}, 0) @@ -355,7 +355,7 @@ func TestMempoolItemsOrder(t *testing.T) { } func TestMempoolAddRemoveOracleResponse(t *testing.T) { - mp := New(3, 0, false) + mp := New(3, 0, false, nil) nonce := uint32(0) fs := &FeerStub{balance: 10000} newTx := func(netFee int64, id uint64) *transaction.Transaction { @@ -425,7 +425,7 @@ func TestMempoolAddRemoveOracleResponse(t *testing.T) { func TestMempoolAddRemoveConflicts(t *testing.T) { capacity := 6 - mp := New(capacity, 0, false) + mp := New(capacity, 0, false, nil) var ( fs = &FeerStub{p2pSigExt: true, balance: 100000} nonce uint32 = 1 @@ -555,7 +555,7 @@ func TestMempoolAddWithDataGetData(t *testing.T) { blockHeight: 5, balance: 100, } - mp := New(10, 1, false) + mp := New(10, 1, false, nil) newTx := func(t *testing.T, netFee int64) *transaction.Transaction { tx := transaction.New([]byte{byte(opcode.RET)}, 0) tx.Signers = []transaction.Signer{{}, {}} diff --git a/pkg/core/mempool/prometheus.go b/pkg/core/mempool/prometheus.go deleted file mode 100644 index ca19c2c83..000000000 --- a/pkg/core/mempool/prometheus.go +++ /dev/null @@ -1,24 +0,0 @@ -package mempool - -import "github.com/prometheus/client_golang/prometheus" - -var ( - // mempoolUnsortedTx prometheus metric. - mempoolUnsortedTx = prometheus.NewGauge( - prometheus.GaugeOpts{ - Help: "Mempool Unsorted TXs", - Name: "mempool_unsorted_tx", - Namespace: "neogo", - }, - ) -) - -func init() { - prometheus.MustRegister( - mempoolUnsortedTx, - ) -} - -func updateMempoolMetrics(unsortedTxnLen int) { - mempoolUnsortedTx.Set(float64(unsortedTxnLen)) -} diff --git a/pkg/core/mempool/subscriptions_test.go b/pkg/core/mempool/subscriptions_test.go index bbe1bc2a3..d476f20bc 100644 --- a/pkg/core/mempool/subscriptions_test.go +++ b/pkg/core/mempool/subscriptions_test.go @@ -13,7 +13,7 @@ import ( func TestSubscriptions(t *testing.T) { t.Run("disabled subscriptions", func(t *testing.T) { - mp := New(5, 0, false) + mp := New(5, 0, false, nil) require.Panics(t, func() { mp.RunSubscriptions() }) @@ -24,7 +24,7 @@ func TestSubscriptions(t *testing.T) { t.Run("enabled subscriptions", func(t *testing.T) { fs := &FeerStub{balance: 100} - mp := New(2, 0, true) + mp := New(2, 0, true, nil) mp.RunSubscriptions() subChan1 := make(chan mempoolevent.Event, 3) subChan2 := make(chan mempoolevent.Event, 3) diff --git a/pkg/core/prometheus.go b/pkg/core/prometheus.go index ab1b2f405..8e429d044 100644 --- a/pkg/core/prometheus.go +++ b/pkg/core/prometheus.go @@ -30,6 +30,14 @@ var ( Namespace: "neogo", }, ) + // mempoolUnsortedTx prometheus metric. + mempoolUnsortedTx = prometheus.NewGauge( + prometheus.GaugeOpts{ + Help: "Mempool unsorted transactions", + Name: "mempool_unsorted_tx", + Namespace: "neogo", + }, + ) ) func init() { @@ -37,6 +45,7 @@ func init() { blockHeight, persistedHeight, headerHeight, + mempoolUnsortedTx, ) } @@ -51,3 +60,8 @@ func updateHeaderHeightMetric(hHeight uint32) { func updateBlockHeightMetric(bHeight uint32) { blockHeight.Set(float64(bHeight)) } + +// updateMempoolMetrics updates metric of the number of unsorted txs inside the mempool. +func updateMempoolMetrics(unsortedTxnLen int) { + mempoolUnsortedTx.Set(float64(unsortedTxnLen)) +} diff --git a/pkg/network/prometheus.go b/pkg/network/prometheus.go index bc1156afe..483e877f7 100644 --- a/pkg/network/prometheus.go +++ b/pkg/network/prometheus.go @@ -50,6 +50,15 @@ var ( }, ) p2pCmds = make(map[CommandType]prometheus.Histogram) + + // notarypoolUnsortedTx prometheus metric. + notarypoolUnsortedTx = prometheus.NewGauge( + prometheus.GaugeOpts{ + Help: "Notary request pool fallback txs", + Name: "notarypool_unsorted_tx", + Namespace: "neogo", + }, + ) ) func init() { @@ -59,6 +68,7 @@ func init() { servAndNodeVersion, poolCount, blockQueueLength, + notarypoolUnsortedTx, ) for _, cmd := range []CommandType{CMDVersion, CMDVerack, CMDGetAddr, CMDAddr, CMDPing, CMDPong, CMDGetHeaders, CMDHeaders, CMDGetBlocks, @@ -103,3 +113,9 @@ func addCmdTimeMetric(cmd CommandType, t time.Duration) { } p2pCmds[cmd].Observe(t.Seconds()) } + +// updateNotarypoolMetrics updates metric of the number of fallback txs inside +// the notary request pool. +func updateNotarypoolMetrics(unsortedTxnLen int) { + notarypoolUnsortedTx.Set(float64(unsortedTxnLen)) +} diff --git a/pkg/network/server.go b/pkg/network/server.go index b6661b2a4..1cb62d637 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -198,7 +198,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy } if chain.P2PSigExtensionsEnabled() { s.notaryFeer = NewNotaryFeer(chain) - s.notaryRequestPool = mempool.New(s.config.P2PNotaryRequestPayloadPoolSize, 1, true) + s.notaryRequestPool = mempool.New(s.config.P2PNotaryRequestPayloadPoolSize, 1, true, updateNotarypoolMetrics) chain.RegisterPostBlock(func(isRelevant func(*transaction.Transaction, *mempool.Pool, bool) bool, txpool *mempool.Pool, _ *block.Block) { s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool { return isRelevant(t, txpool, true) diff --git a/pkg/services/notary/core_test.go b/pkg/services/notary/core_test.go index 657a2bc90..638da3887 100644 --- a/pkg/services/notary/core_test.go +++ b/pkg/services/notary/core_test.go @@ -47,7 +47,7 @@ func getTestNotary(t *testing.T, bc *core.Blockchain, walletPath, pass string, o Chain: bc, Log: zaptest.NewLogger(t), } - mp := mempool.New(10, 1, true) + mp := mempool.New(10, 1, true, nil) ntr, err := notary.NewNotary(cfg, netmode.UnitTestNet, mp, onTx) require.NoError(t, err) diff --git a/pkg/services/notary/node_test.go b/pkg/services/notary/node_test.go index b1779cd6c..93c666dc6 100644 --- a/pkg/services/notary/node_test.go +++ b/pkg/services/notary/node_test.go @@ -21,7 +21,7 @@ func getTestNotary(t *testing.T, bc Ledger, walletPath, pass string) (*wallet.Ac Password: pass, }, } - mp := mempool.New(10, 1, true) + mp := mempool.New(10, 1, true, nil) cfg := Config{ MainCfg: mainCfg, Chain: bc, diff --git a/pkg/services/notary/notary_test.go b/pkg/services/notary/notary_test.go index d3d173e99..9df86c72f 100644 --- a/pkg/services/notary/notary_test.go +++ b/pkg/services/notary/notary_test.go @@ -27,21 +27,21 @@ func TestWallet(t *testing.T) { } t.Run("unexisting wallet", func(t *testing.T) { cfg.MainCfg.UnlockWallet.Path = "./testdata/does_not_exists.json" - _, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil) + _, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil) require.Error(t, err) }) t.Run("bad password", func(t *testing.T) { cfg.MainCfg.UnlockWallet.Path = "./testdata/notary1.json" cfg.MainCfg.UnlockWallet.Password = "invalid" - _, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil) + _, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil) require.Error(t, err) }) t.Run("good", func(t *testing.T) { cfg.MainCfg.UnlockWallet.Path = "./testdata/notary1.json" cfg.MainCfg.UnlockWallet.Password = "one" - _, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true), nil) + _, err := NewNotary(cfg, netmode.UnitTestNet, mempool.New(1, 1, true, nil), nil) require.NoError(t, err) }) }