core: don't spawn goroutine for persist function

It doesn't make any sense, in some situations it leads to a number of
goroutines created that will Persist one after another (as we can't Persist
concurrently). We can manage it better in a single thread.

This doesn't change performance in any way, but somewhat reduces resource
consumption. It was tested neo-bench (single node, 10 workers, LevelDB) on two
machines and block dump processing (RC4 testnet up to 62800 with VerifyBlocks
set to false) on i7-8565U.

Reference (b9be892bf9):

Ryzen 9 5950X:
RPS     27747.349 27407.726 27520.210  ≈ 27558   ± 0.63%
TPS     26992.010 26993.468 27010.966  ≈ 26999   ± 0.04%
CPU %      28.928    28.096    29.105  ≈    28.7 ± 1.88%
Mem MB    760.385   726.320   756.118  ≈   748   ± 2.48%

Core i7-8565U:
RPS     7783.229 7628.409 7542.340  ≈ 7651   ± 1.60%
TPS     7708.436 7607.397 7489.459  ≈ 7602   ± 1.44%
CPU %     74.899   71.020   72.697  ≈   72.9 ± 2.67%
Mem MB   438.047  436.967  416.350  ≈  430   ± 2.84%

DB restore:
real    0m20.838s 0m21.895s 0m21.794s  ≈ 21.51 ± 2.71%
user    0m39.091s 0m40.565s 0m41.493s  ≈ 40.38 ± 3.00%
sys      0m3.184s  0m2.923s  0m3.062s  ≈  3.06 ± 4.27%

Patched:

Ryzen 9 5950X:
RPS     27636.957 27246.911 27462.036  ≈ 27449   ±  0.71%  ↓ 0.40%
TPS     27003.672 26993.468 27011.696  ≈ 27003   ±  0.03%  ↑ 0.01%
CPU %      28.562    28.475    28.012  ≈    28.3 ±  1.04%  ↓ 1.39%
Mem MB    627.007   648.110   794.895  ≈   690   ± 13.25%  ↓ 7.75%

Core i7-8565U:
RPS     7497.210 7527.797 7897.532  ≈ 7641   ±  2.92%  ↓ 0.13%
TPS     7461.128 7482.678 7841.723  ≈ 7595   ±  2.81%  ↓ 0.09%
CPU %     71.559   73.423   69.005  ≈   71.3 ±  3.11%  ↓ 2.19%
Mem MB   393.090  395.899  482.264  ≈  424   ± 11.96%  ↓ 1.40%

DB restore:
real    0m20.773s 0m21.583s 0m20.522s  ≈ 20.96 ±  2.65%  ↓ 2.56%
user    0m39.322s 0m42.268s 0m38.626s  ≈ 40.07 ±  4.82%  ↓ 0.77%
sys      0m3.006s  0m3.597s  0m3.042s  ≈  3.22 ± 10.31%  ↑ 5.23%
This commit is contained in:
Roman Khimov 2021-07-30 23:47:48 +03:00
parent b9be892bf9
commit 8277b7a19a
4 changed files with 50 additions and 30 deletions

View file

@ -394,7 +394,7 @@ func (bc *Blockchain) Run() {
persistTimer := time.NewTimer(persistInterval)
defer func() {
persistTimer.Stop()
if err := bc.persist(); err != nil {
if _, err := bc.persist(); err != nil {
bc.log.Warn("failed to persist", zap.Error(err))
}
if err := bc.dao.Store.Close(); err != nil {
@ -408,13 +408,15 @@ func (bc *Blockchain) Run() {
case <-bc.stopCh:
return
case <-persistTimer.C:
go func() {
err := bc.persist()
if err != nil {
bc.log.Warn("failed to persist blockchain", zap.Error(err))
}
persistTimer.Reset(persistInterval)
}()
dur, err := bc.persist()
if err != nil {
bc.log.Warn("failed to persist blockchain", zap.Error(err))
}
interval := persistInterval - dur
if interval <= 0 {
interval = time.Microsecond // Reset doesn't work with zero value
}
persistTimer.Reset(interval)
}
}
}
@ -1175,41 +1177,43 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch {
}
// persist flushes current in-memory Store contents to the persistent storage.
func (bc *Blockchain) persist() error {
func (bc *Blockchain) persist() (time.Duration, error) {
var (
start = time.Now()
duration time.Duration
persisted int
err error
)
persisted, err = bc.dao.Persist()
if err != nil {
return err
return 0, err
}
if persisted > 0 {
bHeight, err := bc.dao.GetCurrentBlockHeight()
if err != nil {
return err
return 0, err
}
oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight)
diff := bHeight - oldHeight
storedHeaderHeight, _, err := bc.dao.GetCurrentHeaderHeight()
if err != nil {
return err
return 0, err
}
duration = time.Since(start)
bc.log.Info("persisted to disk",
zap.Uint32("blocks", diff),
zap.Int("keys", persisted),
zap.Uint32("headerHeight", storedHeaderHeight),
zap.Uint32("blockHeight", bHeight),
zap.Duration("took", time.Since(start)))
zap.Duration("took", duration))
// update monitoring metrics.
updatePersistedHeightMetric(bHeight)
}
return nil
return duration, nil
}
// GetTransaction returns a TX and its height by the given hash. The height is MaxUint32 if tx is in the mempool.

View file

@ -118,7 +118,8 @@ func TestAddBlock(t *testing.T) {
assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash())
// This one tests persisting blocks, so it does need to persist()
require.NoError(t, bc.persist())
_, err = bc.persist()
require.NoError(t, err)
for _, block := range blocks {
key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesBE())
@ -236,7 +237,8 @@ func TestGetHeader(t *testing.T) {
b2 := bc.newBlock()
_, err = bc.GetHeader(b2.Hash())
assert.Error(t, err)
assert.NoError(t, bc.persist())
_, err = bc.persist()
assert.NoError(t, err)
}
}
@ -253,7 +255,8 @@ func TestGetBlock(t *testing.T) {
assert.Equal(t, blocks[i].Index, block.Index)
assert.Equal(t, blocks[i].Hash(), block.Hash())
}
assert.NoError(t, bc.persist())
_, err = bc.persist()
assert.NoError(t, err)
}
t.Run("store only header", func(t *testing.T) {
@ -1305,7 +1308,8 @@ func TestHasBlock(t *testing.T) {
}
newBlock := bc.newBlock()
assert.False(t, bc.HasBlock(newBlock.Hash()))
assert.NoError(t, bc.persist())
_, err = bc.persist()
assert.NoError(t, err)
}
}
@ -1340,7 +1344,8 @@ func TestGetTransaction(t *testing.T) {
assert.Equal(t, block.Transactions[0], tx)
assert.Equal(t, 1, io.GetVarSize(tx.Attributes))
assert.Equal(t, 1, io.GetVarSize(tx.Scripts))
assert.NoError(t, bc.persist())
_, err = bc.persist()
assert.NoError(t, err)
}
}

View file

@ -196,7 +196,8 @@ func TestNativeContract_Invoke(t *testing.T) {
res, err := invokeContractMethod(chain, price, tn.Metadata().Hash, "sum", int64(14), int64(28))
require.NoError(t, err)
checkResult(t, res, stackitem.Make(42))
require.NoError(t, chain.persist())
_, err = chain.persist()
require.NoError(t, err)
select {
case index := <-tn.blocks:

View file

@ -36,7 +36,8 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string,
res, err := invokeContractMethod(chain, 100000000, hash, getName)
require.NoError(t, err)
checkResult(t, res, stackitem.Make(defaultValue))
require.NoError(t, chain.persist())
_, err = chain.persist()
require.NoError(t, err)
})
t.Run("set, too small value", func(t *testing.T) {
@ -68,13 +69,15 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string,
if name != "GasPerBlock" { // GasPerBlock is set on the next block
checkResult(t, aers[1], stackitem.Make(defaultValue+1))
}
require.NoError(t, chain.persist())
_, err = chain.persist()
require.NoError(t, err)
// Get in the next block.
res, err := invokeContractMethod(chain, 100000000, hash, getName)
require.NoError(t, err)
checkResult(t, res, stackitem.Make(defaultValue+1))
require.NoError(t, chain.persist())
_, err = chain.persist()
require.NoError(t, err)
})
}
@ -128,7 +131,8 @@ func TestBlockedAccounts(t *testing.T) {
res, err := invokeContractMethod(chain, 100000000, policyHash, "isBlocked", random.Uint160())
require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false))
require.NoError(t, chain.persist())
_, err = chain.persist()
require.NoError(t, err)
})
t.Run("block-unblock account", func(t *testing.T) {
@ -138,7 +142,8 @@ func TestBlockedAccounts(t *testing.T) {
isBlocked := chain.contracts.Policy.IsBlockedInternal(chain.dao, account)
require.Equal(t, isBlocked, true)
require.NoError(t, chain.persist())
_, err = chain.persist()
require.NoError(t, err)
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
require.NoError(t, err)
@ -146,7 +151,8 @@ func TestBlockedAccounts(t *testing.T) {
isBlocked = chain.contracts.Policy.IsBlockedInternal(chain.dao, account)
require.Equal(t, false, isBlocked)
require.NoError(t, chain.persist())
_, err = chain.persist()
require.NoError(t, err)
})
t.Run("double-block", func(t *testing.T) {
@ -154,25 +160,29 @@ func TestBlockedAccounts(t *testing.T) {
res, err := invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE())
require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(true))
require.NoError(t, chain.persist())
_, err = chain.persist()
require.NoError(t, err)
// double-block should fail
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE())
require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false))
require.NoError(t, chain.persist())
_, err = chain.persist()
require.NoError(t, err)
// unblock
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(true))
require.NoError(t, chain.persist())
_, err = chain.persist()
require.NoError(t, err)
// unblock the same account should fail as we don't have it blocked
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false))
require.NoError(t, chain.persist())
_, err = chain.persist()
require.NoError(t, err)
})
t.Run("not signed by committee", func(t *testing.T) {