From 8277b7a19a5cbb74708e5ba14f3b94644d5c6925 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 30 Jul 2021 23:47:48 +0300 Subject: [PATCH] core: don't spawn goroutine for persist function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It doesn't make any sense, in some situations it leads to a number of goroutines created that will Persist one after another (as we can't Persist concurrently). We can manage it better in a single thread. This doesn't change performance in any way, but somewhat reduces resource consumption. It was tested neo-bench (single node, 10 workers, LevelDB) on two machines and block dump processing (RC4 testnet up to 62800 with VerifyBlocks set to false) on i7-8565U. Reference (b9be892bf9f658652e2d1f074f366914dc62e830): Ryzen 9 5950X: RPS 27747.349 27407.726 27520.210 ≈ 27558 ± 0.63% TPS 26992.010 26993.468 27010.966 ≈ 26999 ± 0.04% CPU % 28.928 28.096 29.105 ≈ 28.7 ± 1.88% Mem MB 760.385 726.320 756.118 ≈ 748 ± 2.48% Core i7-8565U: RPS 7783.229 7628.409 7542.340 ≈ 7651 ± 1.60% TPS 7708.436 7607.397 7489.459 ≈ 7602 ± 1.44% CPU % 74.899 71.020 72.697 ≈ 72.9 ± 2.67% Mem MB 438.047 436.967 416.350 ≈ 430 ± 2.84% DB restore: real 0m20.838s 0m21.895s 0m21.794s ≈ 21.51 ± 2.71% user 0m39.091s 0m40.565s 0m41.493s ≈ 40.38 ± 3.00% sys 0m3.184s 0m2.923s 0m3.062s ≈ 3.06 ± 4.27% Patched: Ryzen 9 5950X: RPS 27636.957 27246.911 27462.036 ≈ 27449 ± 0.71% ↓ 0.40% TPS 27003.672 26993.468 27011.696 ≈ 27003 ± 0.03% ↑ 0.01% CPU % 28.562 28.475 28.012 ≈ 28.3 ± 1.04% ↓ 1.39% Mem MB 627.007 648.110 794.895 ≈ 690 ± 13.25% ↓ 7.75% Core i7-8565U: RPS 7497.210 7527.797 7897.532 ≈ 7641 ± 2.92% ↓ 0.13% TPS 7461.128 7482.678 7841.723 ≈ 7595 ± 2.81% ↓ 0.09% CPU % 71.559 73.423 69.005 ≈ 71.3 ± 3.11% ↓ 2.19% Mem MB 393.090 395.899 482.264 ≈ 424 ± 11.96% ↓ 1.40% DB restore: real 0m20.773s 0m21.583s 0m20.522s ≈ 20.96 ± 2.65% ↓ 2.56% user 0m39.322s 0m42.268s 0m38.626s ≈ 40.07 ± 4.82% ↓ 0.77% sys 0m3.006s 0m3.597s 0m3.042s ≈ 3.22 ± 10.31% ↑ 5.23% --- pkg/core/blockchain.go | 32 ++++++++++++++++++-------------- pkg/core/blockchain_test.go | 15 ++++++++++----- pkg/core/native_contract_test.go | 3 ++- pkg/core/native_policy_test.go | 30 ++++++++++++++++++++---------- 4 files changed, 50 insertions(+), 30 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 11d0d78de..3014f87fb 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -394,7 +394,7 @@ func (bc *Blockchain) Run() { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() - if err := bc.persist(); err != nil { + if _, err := bc.persist(); err != nil { bc.log.Warn("failed to persist", zap.Error(err)) } if err := bc.dao.Store.Close(); err != nil { @@ -408,13 +408,15 @@ func (bc *Blockchain) Run() { case <-bc.stopCh: return case <-persistTimer.C: - go func() { - err := bc.persist() - if err != nil { - bc.log.Warn("failed to persist blockchain", zap.Error(err)) - } - persistTimer.Reset(persistInterval) - }() + dur, err := bc.persist() + if err != nil { + bc.log.Warn("failed to persist blockchain", zap.Error(err)) + } + interval := persistInterval - dur + if interval <= 0 { + interval = time.Microsecond // Reset doesn't work with zero value + } + persistTimer.Reset(interval) } } } @@ -1175,41 +1177,43 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch { } // persist flushes current in-memory Store contents to the persistent storage. -func (bc *Blockchain) persist() error { +func (bc *Blockchain) persist() (time.Duration, error) { var ( start = time.Now() + duration time.Duration persisted int err error ) persisted, err = bc.dao.Persist() if err != nil { - return err + return 0, err } if persisted > 0 { bHeight, err := bc.dao.GetCurrentBlockHeight() if err != nil { - return err + return 0, err } oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight) diff := bHeight - oldHeight storedHeaderHeight, _, err := bc.dao.GetCurrentHeaderHeight() if err != nil { - return err + return 0, err } + duration = time.Since(start) bc.log.Info("persisted to disk", zap.Uint32("blocks", diff), zap.Int("keys", persisted), zap.Uint32("headerHeight", storedHeaderHeight), zap.Uint32("blockHeight", bHeight), - zap.Duration("took", time.Since(start))) + zap.Duration("took", duration)) // update monitoring metrics. updatePersistedHeightMetric(bHeight) } - return nil + return duration, nil } // GetTransaction returns a TX and its height by the given hash. The height is MaxUint32 if tx is in the mempool. diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index d3a2b7ead..8f0d00d51 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -118,7 +118,8 @@ func TestAddBlock(t *testing.T) { assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) // This one tests persisting blocks, so it does need to persist() - require.NoError(t, bc.persist()) + _, err = bc.persist() + require.NoError(t, err) for _, block := range blocks { key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesBE()) @@ -236,7 +237,8 @@ func TestGetHeader(t *testing.T) { b2 := bc.newBlock() _, err = bc.GetHeader(b2.Hash()) assert.Error(t, err) - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } } @@ -253,7 +255,8 @@ func TestGetBlock(t *testing.T) { assert.Equal(t, blocks[i].Index, block.Index) assert.Equal(t, blocks[i].Hash(), block.Hash()) } - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } t.Run("store only header", func(t *testing.T) { @@ -1305,7 +1308,8 @@ func TestHasBlock(t *testing.T) { } newBlock := bc.newBlock() assert.False(t, bc.HasBlock(newBlock.Hash())) - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } } @@ -1340,7 +1344,8 @@ func TestGetTransaction(t *testing.T) { assert.Equal(t, block.Transactions[0], tx) assert.Equal(t, 1, io.GetVarSize(tx.Attributes)) assert.Equal(t, 1, io.GetVarSize(tx.Scripts)) - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } } diff --git a/pkg/core/native_contract_test.go b/pkg/core/native_contract_test.go index 9c7b81fcb..eb2403b4b 100644 --- a/pkg/core/native_contract_test.go +++ b/pkg/core/native_contract_test.go @@ -196,7 +196,8 @@ func TestNativeContract_Invoke(t *testing.T) { res, err := invokeContractMethod(chain, price, tn.Metadata().Hash, "sum", int64(14), int64(28)) require.NoError(t, err) checkResult(t, res, stackitem.Make(42)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) select { case index := <-tn.blocks: diff --git a/pkg/core/native_policy_test.go b/pkg/core/native_policy_test.go index 3b315b28c..631a1718f 100644 --- a/pkg/core/native_policy_test.go +++ b/pkg/core/native_policy_test.go @@ -36,7 +36,8 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string, res, err := invokeContractMethod(chain, 100000000, hash, getName) require.NoError(t, err) checkResult(t, res, stackitem.Make(defaultValue)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("set, too small value", func(t *testing.T) { @@ -68,13 +69,15 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string, if name != "GasPerBlock" { // GasPerBlock is set on the next block checkResult(t, aers[1], stackitem.Make(defaultValue+1)) } - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // Get in the next block. res, err := invokeContractMethod(chain, 100000000, hash, getName) require.NoError(t, err) checkResult(t, res, stackitem.Make(defaultValue+1)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) } @@ -128,7 +131,8 @@ func TestBlockedAccounts(t *testing.T) { res, err := invokeContractMethod(chain, 100000000, policyHash, "isBlocked", random.Uint160()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("block-unblock account", func(t *testing.T) { @@ -138,7 +142,8 @@ func TestBlockedAccounts(t *testing.T) { isBlocked := chain.contracts.Policy.IsBlockedInternal(chain.dao, account) require.Equal(t, isBlocked, true) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) @@ -146,7 +151,8 @@ func TestBlockedAccounts(t *testing.T) { isBlocked = chain.contracts.Policy.IsBlockedInternal(chain.dao, account) require.Equal(t, false, isBlocked) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("double-block", func(t *testing.T) { @@ -154,25 +160,29 @@ func TestBlockedAccounts(t *testing.T) { res, err := invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(true)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // double-block should fail res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // unblock res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(true)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // unblock the same account should fail as we don't have it blocked res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("not signed by committee", func(t *testing.T) {