From b9be892bf9f658652e2d1f074f366914dc62e830 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 30 Jul 2021 23:35:03 +0300 Subject: [PATCH 1/3] storage: allow accessing MemCachedStore during Persist MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Persist by its definition doesn't change MemCachedStore visible state, all KV pairs that were acessible via it before Persist remain accessible after Persist. The only thing it does is flushing of the current set of KV pairs from memory to peristent store. To do that it needs read-only access to the current KV pair set, but technically it then replaces maps, so we have to use full write lock which makes MemCachedStore inaccessible for the duration of Persist. And Persist can take a lot of time, it's about disk access for regular DBs. What we do here is we create new in-memory maps for MemCachedStore before flushing old ones to the persistent store. Then a fake persistent store is created which actually is a MemCachedStore with old maps, so it has exactly the same visible state. This Store is never accessed for writes, so we can read it without taking any internal locks and at the same time we no longer need write locks for original MemCachedStore, we're not using it. All of this makes it possible to use MemCachedStore as normally reads are handled going down to whatever level is needed and writes are handled by new maps. So while Persist for (*Blockchain).dao does its most time-consuming work we can process other blocks (reading data for transactions and persisting storeBlock caches to (*Blockchain).dao). The change was tested for performance with neo-bench (single node, 10 workers, LevelDB) on two machines and block dump processing (RC4 testnet up to 62800 with VerifyBlocks set to false) on i7-8565U. Reference results (bbe4e9cd7bb33428633586f080f64494cd6ac9cf): Ryzen 9 5950X: RPS 23616.969 22817.086 23222.378 ≈ 23218 ± 1.72% TPS 23047.316 22608.578 22735.540 ≈ 22797 ± 0.99% CPU % 23.434 25.553 23.848 ≈ 24.3 ± 4.63% Mem MB 600.636 503.060 582.043 ≈ 562 ± 9.22% Core i7-8565U: RPS 6594.007 6499.501 6572.902 ≈ 6555 ± 0.76% TPS 6561.680 6444.545 6510.120 ≈ 6505 ± 0.90% CPU % 58.452 60.568 62.474 ≈ 60.5 ± 3.33% Mem MB 234.893 285.067 269.081 ≈ 263 ± 9.75% DB restore: real 0m22.237s 0m23.471s 0m23.409s ≈ 23.04 ± 3.02% user 0m35.435s 0m38.943s 0m39.247s ≈ 37.88 ± 5.59% sys 0m3.085s 0m3.360s 0m3.144s ≈ 3.20 ± 4.53% After the change: Ryzen 9 5950X: RPS 27747.349 27407.726 27520.210 ≈ 27558 ± 0.63% ↑ 18.69% TPS 26992.010 26993.468 27010.966 ≈ 26999 ± 0.04% ↑ 18.43% CPU % 28.928 28.096 29.105 ≈ 28.7 ± 1.88% ↑ 18.1% Mem MB 760.385 726.320 756.118 ≈ 748 ± 2.48% ↑ 33.10% Core i7-8565U: RPS 7783.229 7628.409 7542.340 ≈ 7651 ± 1.60% ↑ 16.72% TPS 7708.436 7607.397 7489.459 ≈ 7602 ± 1.44% ↑ 16.85% CPU % 74.899 71.020 72.697 ≈ 72.9 ± 2.67% ↑ 20.50% Mem MB 438.047 436.967 416.350 ≈ 430 ± 2.84% ↑ 63.50% DB restore: real 0m20.838s 0m21.895s 0m21.794s ≈ 21.51 ± 2.71% ↓ 6.64% user 0m39.091s 0m40.565s 0m41.493s ≈ 40.38 ± 3.00% ↑ 6.60% sys 0m3.184s 0m2.923s 0m3.062s ≈ 3.06 ± 4.27% ↓ 4.38% It obviously uses more memory now and utilizes CPU more aggressively, but at the same time it allows to improve all relevant metrics and finally reach a situation where we process 50K transactions in less than second on Ryzen 9 5950X (going higher than 25K TPS). The other observation is much more stable block time, on Ryzen 9 it's as close to 1 second as it could be. --- pkg/core/storage/memcached_store.go | 58 +++++++++++++++++----- pkg/core/storage/memcached_store_test.go | 62 ++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 13 deletions(-) diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 6c2cddf67..4be37f44c 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -1,10 +1,14 @@ package storage +import "sync" + // MemCachedStore is a wrapper around persistent store that caches all changes // being made for them to be later flushed in one batch. type MemCachedStore struct { MemoryStore + // plock protects Persist from double entrance. + plock sync.Mutex // Persistent Store. ps Store } @@ -96,45 +100,73 @@ func (s *MemCachedStore) Persist() (int, error) { var err error var keys, dkeys int + s.plock.Lock() + defer s.plock.Unlock() s.mut.Lock() - defer s.mut.Unlock() keys = len(s.mem) dkeys = len(s.del) if keys == 0 && dkeys == 0 { + s.mut.Unlock() return 0, nil } - memStore, ok := s.ps.(*MemoryStore) + // tempstore technically copies current s in lower layer while real s + // starts using fresh new maps. This tempstore is only known here and + // nothing ever changes it, therefore accesses to it (reads) can go + // unprotected while writes are handled by s proper. + var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, del: s.del}, ps: s.ps} + s.ps = tempstore + s.mem = make(map[string][]byte) + s.del = make(map[string]bool) + s.mut.Unlock() + + memStore, ok := tempstore.ps.(*MemoryStore) if !ok { - memCachedStore, ok := s.ps.(*MemCachedStore) + memCachedStore, ok := tempstore.ps.(*MemCachedStore) if ok { memStore = &memCachedStore.MemoryStore } } if memStore != nil { memStore.mut.Lock() - for k := range s.mem { - memStore.put(k, s.mem[k]) + for k := range tempstore.mem { + memStore.put(k, tempstore.mem[k]) } - for k := range s.del { + for k := range tempstore.del { memStore.drop(k) } memStore.mut.Unlock() } else { - batch := s.ps.Batch() - for k := range s.mem { - batch.Put([]byte(k), s.mem[k]) + batch := tempstore.ps.Batch() + for k := range tempstore.mem { + batch.Put([]byte(k), tempstore.mem[k]) } - for k := range s.del { + for k := range tempstore.del { batch.Delete([]byte(k)) } - err = s.ps.PutBatch(batch) + err = tempstore.ps.PutBatch(batch) } + s.mut.Lock() if err == nil { - s.mem = make(map[string][]byte) - s.del = make(map[string]bool) + // tempstore.mem and tempstore.del are completely flushed now + // to tempstore.ps, so all KV pairs are the same and this + // substitution has no visible effects. + s.ps = tempstore.ps + } else { + // We're toast. We'll try to still keep proper state, but OOM + // killer will get to us eventually. + for k := range s.mem { + tempstore.put(k, s.mem[k]) + } + for k := range s.del { + tempstore.drop(k) + } + s.ps = tempstore.ps + s.mem = tempstore.mem + s.del = tempstore.del } + s.mut.Unlock() return keys, err } diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index 8addd4088..7dfee3027 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -177,3 +177,65 @@ func TestCachedSeek(t *testing.T) { func newMemCachedStoreForTesting(t *testing.T) Store { return NewMemCachedStore(NewMemoryStore()) } + +type BadBatch struct{} + +func (b BadBatch) Delete(k []byte) {} +func (b BadBatch) Put(k, v []byte) {} + +type BadStore struct { + onPutBatch func() +} + +func (b *BadStore) Batch() Batch { + return BadBatch{} +} +func (b *BadStore) Delete(k []byte) error { + return nil +} +func (b *BadStore) Get([]byte) ([]byte, error) { + return nil, ErrKeyNotFound +} +func (b *BadStore) Put(k, v []byte) error { + return nil +} +func (b *BadStore) PutBatch(Batch) error { + b.onPutBatch() + return ErrKeyNotFound +} +func (b *BadStore) Seek(k []byte, f func(k, v []byte)) { +} +func (b *BadStore) Close() error { + return nil +} + +func TestMemCachedPersistFailing(t *testing.T) { + var ( + bs BadStore + t1 = []byte("t1") + t2 = []byte("t2") + b1 = []byte("b1") + ) + // cached Store + ts := NewMemCachedStore(&bs) + // Set a pair of keys. + require.NoError(t, ts.Put(t1, t1)) + require.NoError(t, ts.Put(t2, t2)) + // This will be called during Persist(). + bs.onPutBatch = func() { + // Drop one, add one. + require.NoError(t, ts.Put(b1, b1)) + require.NoError(t, ts.Delete(t1)) + } + _, err := ts.Persist() + require.Error(t, err) + // PutBatch() failed in Persist, but we still should have proper state. + _, err = ts.Get(t1) + require.Error(t, err) + res, err := ts.Get(t2) + require.NoError(t, err) + require.Equal(t, t2, res) + res, err = ts.Get(b1) + require.NoError(t, err) + require.Equal(t, b1, res) +} From 8277b7a19a5cbb74708e5ba14f3b94644d5c6925 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 30 Jul 2021 23:47:48 +0300 Subject: [PATCH 2/3] core: don't spawn goroutine for persist function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It doesn't make any sense, in some situations it leads to a number of goroutines created that will Persist one after another (as we can't Persist concurrently). We can manage it better in a single thread. This doesn't change performance in any way, but somewhat reduces resource consumption. It was tested neo-bench (single node, 10 workers, LevelDB) on two machines and block dump processing (RC4 testnet up to 62800 with VerifyBlocks set to false) on i7-8565U. Reference (b9be892bf9f658652e2d1f074f366914dc62e830): Ryzen 9 5950X: RPS 27747.349 27407.726 27520.210 ≈ 27558 ± 0.63% TPS 26992.010 26993.468 27010.966 ≈ 26999 ± 0.04% CPU % 28.928 28.096 29.105 ≈ 28.7 ± 1.88% Mem MB 760.385 726.320 756.118 ≈ 748 ± 2.48% Core i7-8565U: RPS 7783.229 7628.409 7542.340 ≈ 7651 ± 1.60% TPS 7708.436 7607.397 7489.459 ≈ 7602 ± 1.44% CPU % 74.899 71.020 72.697 ≈ 72.9 ± 2.67% Mem MB 438.047 436.967 416.350 ≈ 430 ± 2.84% DB restore: real 0m20.838s 0m21.895s 0m21.794s ≈ 21.51 ± 2.71% user 0m39.091s 0m40.565s 0m41.493s ≈ 40.38 ± 3.00% sys 0m3.184s 0m2.923s 0m3.062s ≈ 3.06 ± 4.27% Patched: Ryzen 9 5950X: RPS 27636.957 27246.911 27462.036 ≈ 27449 ± 0.71% ↓ 0.40% TPS 27003.672 26993.468 27011.696 ≈ 27003 ± 0.03% ↑ 0.01% CPU % 28.562 28.475 28.012 ≈ 28.3 ± 1.04% ↓ 1.39% Mem MB 627.007 648.110 794.895 ≈ 690 ± 13.25% ↓ 7.75% Core i7-8565U: RPS 7497.210 7527.797 7897.532 ≈ 7641 ± 2.92% ↓ 0.13% TPS 7461.128 7482.678 7841.723 ≈ 7595 ± 2.81% ↓ 0.09% CPU % 71.559 73.423 69.005 ≈ 71.3 ± 3.11% ↓ 2.19% Mem MB 393.090 395.899 482.264 ≈ 424 ± 11.96% ↓ 1.40% DB restore: real 0m20.773s 0m21.583s 0m20.522s ≈ 20.96 ± 2.65% ↓ 2.56% user 0m39.322s 0m42.268s 0m38.626s ≈ 40.07 ± 4.82% ↓ 0.77% sys 0m3.006s 0m3.597s 0m3.042s ≈ 3.22 ± 10.31% ↑ 5.23% --- pkg/core/blockchain.go | 32 ++++++++++++++++++-------------- pkg/core/blockchain_test.go | 15 ++++++++++----- pkg/core/native_contract_test.go | 3 ++- pkg/core/native_policy_test.go | 30 ++++++++++++++++++++---------- 4 files changed, 50 insertions(+), 30 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 11d0d78de..3014f87fb 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -394,7 +394,7 @@ func (bc *Blockchain) Run() { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() - if err := bc.persist(); err != nil { + if _, err := bc.persist(); err != nil { bc.log.Warn("failed to persist", zap.Error(err)) } if err := bc.dao.Store.Close(); err != nil { @@ -408,13 +408,15 @@ func (bc *Blockchain) Run() { case <-bc.stopCh: return case <-persistTimer.C: - go func() { - err := bc.persist() - if err != nil { - bc.log.Warn("failed to persist blockchain", zap.Error(err)) - } - persistTimer.Reset(persistInterval) - }() + dur, err := bc.persist() + if err != nil { + bc.log.Warn("failed to persist blockchain", zap.Error(err)) + } + interval := persistInterval - dur + if interval <= 0 { + interval = time.Microsecond // Reset doesn't work with zero value + } + persistTimer.Reset(interval) } } } @@ -1175,41 +1177,43 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch { } // persist flushes current in-memory Store contents to the persistent storage. -func (bc *Blockchain) persist() error { +func (bc *Blockchain) persist() (time.Duration, error) { var ( start = time.Now() + duration time.Duration persisted int err error ) persisted, err = bc.dao.Persist() if err != nil { - return err + return 0, err } if persisted > 0 { bHeight, err := bc.dao.GetCurrentBlockHeight() if err != nil { - return err + return 0, err } oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight) diff := bHeight - oldHeight storedHeaderHeight, _, err := bc.dao.GetCurrentHeaderHeight() if err != nil { - return err + return 0, err } + duration = time.Since(start) bc.log.Info("persisted to disk", zap.Uint32("blocks", diff), zap.Int("keys", persisted), zap.Uint32("headerHeight", storedHeaderHeight), zap.Uint32("blockHeight", bHeight), - zap.Duration("took", time.Since(start))) + zap.Duration("took", duration)) // update monitoring metrics. updatePersistedHeightMetric(bHeight) } - return nil + return duration, nil } // GetTransaction returns a TX and its height by the given hash. The height is MaxUint32 if tx is in the mempool. diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index d3a2b7ead..8f0d00d51 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -118,7 +118,8 @@ func TestAddBlock(t *testing.T) { assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) // This one tests persisting blocks, so it does need to persist() - require.NoError(t, bc.persist()) + _, err = bc.persist() + require.NoError(t, err) for _, block := range blocks { key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesBE()) @@ -236,7 +237,8 @@ func TestGetHeader(t *testing.T) { b2 := bc.newBlock() _, err = bc.GetHeader(b2.Hash()) assert.Error(t, err) - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } } @@ -253,7 +255,8 @@ func TestGetBlock(t *testing.T) { assert.Equal(t, blocks[i].Index, block.Index) assert.Equal(t, blocks[i].Hash(), block.Hash()) } - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } t.Run("store only header", func(t *testing.T) { @@ -1305,7 +1308,8 @@ func TestHasBlock(t *testing.T) { } newBlock := bc.newBlock() assert.False(t, bc.HasBlock(newBlock.Hash())) - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } } @@ -1340,7 +1344,8 @@ func TestGetTransaction(t *testing.T) { assert.Equal(t, block.Transactions[0], tx) assert.Equal(t, 1, io.GetVarSize(tx.Attributes)) assert.Equal(t, 1, io.GetVarSize(tx.Scripts)) - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } } diff --git a/pkg/core/native_contract_test.go b/pkg/core/native_contract_test.go index 9c7b81fcb..eb2403b4b 100644 --- a/pkg/core/native_contract_test.go +++ b/pkg/core/native_contract_test.go @@ -196,7 +196,8 @@ func TestNativeContract_Invoke(t *testing.T) { res, err := invokeContractMethod(chain, price, tn.Metadata().Hash, "sum", int64(14), int64(28)) require.NoError(t, err) checkResult(t, res, stackitem.Make(42)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) select { case index := <-tn.blocks: diff --git a/pkg/core/native_policy_test.go b/pkg/core/native_policy_test.go index 3b315b28c..631a1718f 100644 --- a/pkg/core/native_policy_test.go +++ b/pkg/core/native_policy_test.go @@ -36,7 +36,8 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string, res, err := invokeContractMethod(chain, 100000000, hash, getName) require.NoError(t, err) checkResult(t, res, stackitem.Make(defaultValue)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("set, too small value", func(t *testing.T) { @@ -68,13 +69,15 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string, if name != "GasPerBlock" { // GasPerBlock is set on the next block checkResult(t, aers[1], stackitem.Make(defaultValue+1)) } - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // Get in the next block. res, err := invokeContractMethod(chain, 100000000, hash, getName) require.NoError(t, err) checkResult(t, res, stackitem.Make(defaultValue+1)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) } @@ -128,7 +131,8 @@ func TestBlockedAccounts(t *testing.T) { res, err := invokeContractMethod(chain, 100000000, policyHash, "isBlocked", random.Uint160()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("block-unblock account", func(t *testing.T) { @@ -138,7 +142,8 @@ func TestBlockedAccounts(t *testing.T) { isBlocked := chain.contracts.Policy.IsBlockedInternal(chain.dao, account) require.Equal(t, isBlocked, true) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) @@ -146,7 +151,8 @@ func TestBlockedAccounts(t *testing.T) { isBlocked = chain.contracts.Policy.IsBlockedInternal(chain.dao, account) require.Equal(t, false, isBlocked) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("double-block", func(t *testing.T) { @@ -154,25 +160,29 @@ func TestBlockedAccounts(t *testing.T) { res, err := invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(true)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // double-block should fail res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // unblock res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(true)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // unblock the same account should fail as we don't have it blocked res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("not signed by committee", func(t *testing.T) { From f8174ca64c58b8435479087d12532f4125bc3a5c Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 2 Aug 2021 16:03:53 +0300 Subject: [PATCH 3/3] core: ensure data logged is from persistent store Using bc.dao here is wrong, it can contain unpersisted data. --- pkg/core/blockchain.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 3014f87fb..00a7f58be 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -98,9 +98,13 @@ type Blockchain struct { // with the only writer being the block addition logic. lock sync.RWMutex - // Data access object for CRUD operations around storage. + // Data access object for CRUD operations around storage. It's write-cached. dao *dao.Simple + // persistent is the same DB as dao, but we never write to it, so all reads + // are directly from underlying persistent store. + persistent *dao.Simple + // Current index/height of the highest block. // Read access should always be called by BlockHeight(). // Write access should only happen in storeBlock(). @@ -215,6 +219,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L bc := &Blockchain{ config: cfg, dao: dao.NewSimple(s, cfg.StateRootInHeader), + persistent: dao.NewSimple(s, cfg.StateRootInHeader), stopCh: make(chan struct{}), runToExitCh: make(chan struct{}), memPool: mempool.New(cfg.MemPoolSize, 0, false), @@ -1190,14 +1195,14 @@ func (bc *Blockchain) persist() (time.Duration, error) { return 0, err } if persisted > 0 { - bHeight, err := bc.dao.GetCurrentBlockHeight() + bHeight, err := bc.persistent.GetCurrentBlockHeight() if err != nil { return 0, err } oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight) diff := bHeight - oldHeight - storedHeaderHeight, _, err := bc.dao.GetCurrentHeaderHeight() + storedHeaderHeight, _, err := bc.persistent.GetCurrentHeaderHeight() if err != nil { return 0, err }