From 5fbe838fd4a9c25fa1c8f9b773be1399b4a83e9f Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 22 Nov 2021 10:41:40 +0300 Subject: [PATCH] core: add and use synchronous persist to avoid OOM b9be892bf9f658652e2d1f074f366914dc62e830 has made Persist asynchronous which is very effective in allowing the system to continue processing blocks/transactions while flushing things to disk. It at the same time is very dangerous in that if the disk is slow and it takes much time to flush KV set (more than persisting interval), there might be even bigger new KV set in MemCachedStore by the time it finishes. Even if the system immediately starts to flush this new data set it (being bigger) can take more time than the previous one. And while doing so a new data set will appear in memory, potentially again bigger than this. So we can easily end up with the system going out of control, consuming more and more memory and taking more and more time to persist a single set of data. To avoid this we need to detect such condition and just wait for Persist to really finish its job and release the resources. --- pkg/core/blockchain.go | 14 ++++++++++---- pkg/core/blockchain_test.go | 10 +++++----- pkg/core/dao/dao.go | 10 +++++++++- pkg/core/native_contract_test.go | 2 +- pkg/core/native_policy_test.go | 20 ++++++++++---------- pkg/core/storage/memcached_store.go | 22 +++++++++++++++++++--- 6 files changed, 54 insertions(+), 24 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 1f92796d2..5666389e1 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -611,7 +611,7 @@ func (bc *Blockchain) Run() { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() - if _, err := bc.persist(); err != nil { + if _, err := bc.persist(true); err != nil { bc.log.Warn("failed to persist", zap.Error(err)) } if err := bc.dao.Store.Close(); err != nil { @@ -620,15 +620,17 @@ func (bc *Blockchain) Run() { close(bc.runToExitCh) }() go bc.notificationDispatcher() + var nextSync bool for { select { case <-bc.stopCh: return case <-persistTimer.C: - dur, err := bc.persist() + dur, err := bc.persist(nextSync) if err != nil { bc.log.Warn("failed to persist blockchain", zap.Error(err)) } + nextSync = dur > persistInterval*2 interval := persistInterval - dur if interval <= 0 { interval = time.Microsecond // Reset doesn't work with zero value @@ -1520,7 +1522,7 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch { } // persist flushes current in-memory Store contents to the persistent storage. -func (bc *Blockchain) persist() (time.Duration, error) { +func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { var ( start = time.Now() duration time.Duration @@ -1528,7 +1530,11 @@ func (bc *Blockchain) persist() (time.Duration, error) { err error ) - persisted, err = bc.dao.Persist() + if isSync { + persisted, err = bc.dao.PersistSync() + } else { + persisted, err = bc.dao.Persist() + } if err != nil { return 0, err } diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 36b45f85a..4870be1d6 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -122,7 +122,7 @@ func TestAddBlock(t *testing.T) { assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) // This one tests persisting blocks, so it does need to persist() - _, err = bc.persist() + _, err = bc.persist(false) require.NoError(t, err) for _, block := range blocks { @@ -241,7 +241,7 @@ func TestGetHeader(t *testing.T) { b2 := bc.newBlock() _, err = bc.GetHeader(b2.Hash()) assert.Error(t, err) - _, err = bc.persist() + _, err = bc.persist(false) assert.NoError(t, err) } } @@ -259,7 +259,7 @@ func TestGetBlock(t *testing.T) { assert.Equal(t, blocks[i].Index, block.Index) assert.Equal(t, blocks[i].Hash(), block.Hash()) } - _, err = bc.persist() + _, err = bc.persist(false) assert.NoError(t, err) } @@ -1319,7 +1319,7 @@ func TestHasBlock(t *testing.T) { } newBlock := bc.newBlock() assert.False(t, bc.HasBlock(newBlock.Hash())) - _, err = bc.persist() + _, err = bc.persist(true) assert.NoError(t, err) } } @@ -1355,7 +1355,7 @@ func TestGetTransaction(t *testing.T) { assert.Equal(t, block.Transactions[0], tx) assert.Equal(t, 1, io.GetVarSize(tx.Attributes)) assert.Equal(t, 1, io.GetVarSize(tx.Scripts)) - _, err = bc.persist() + _, err = bc.persist(true) assert.NoError(t, err) } } diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 52c30ec62..9cf17c306 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -54,6 +54,7 @@ type DAO interface { GetWrapped() DAO HasTransaction(hash util.Uint256) error Persist() (int, error) + PersistSync() (int, error) PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error PutContractID(id int32, hash util.Uint160) error PutCurrentHeader(hashAndIndex []byte) error @@ -651,11 +652,18 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, } // Persist flushes all the changes made into the (supposedly) persistent -// underlying store. +// underlying store. It doesn't block accesses to DAO from other threads. func (dao *Simple) Persist() (int, error) { return dao.Store.Persist() } +// PersistSync flushes all the changes made into the (supposedly) persistent +// underlying store. It's a synchronous version of Persist that doesn't allow +// other threads to work with DAO while flushing the Store. +func (dao *Simple) PersistSync() (int, error) { + return dao.Store.PersistSync() +} + // GetMPTBatch storage changes to be applied to MPT. func (dao *Simple) GetMPTBatch() mpt.Batch { var b mpt.Batch diff --git a/pkg/core/native_contract_test.go b/pkg/core/native_contract_test.go index aea370ef4..b9b652b63 100644 --- a/pkg/core/native_contract_test.go +++ b/pkg/core/native_contract_test.go @@ -196,7 +196,7 @@ func TestNativeContract_Invoke(t *testing.T) { res, err := invokeContractMethod(chain, price, tn.Metadata().Hash, "sum", int64(14), int64(28)) require.NoError(t, err) checkResult(t, res, stackitem.Make(42)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) select { diff --git a/pkg/core/native_policy_test.go b/pkg/core/native_policy_test.go index 5a041121c..8dd05fc3c 100644 --- a/pkg/core/native_policy_test.go +++ b/pkg/core/native_policy_test.go @@ -36,7 +36,7 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string, res, err := invokeContractMethod(chain, 100000000, hash, getName) require.NoError(t, err) checkResult(t, res, stackitem.Make(defaultValue)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) }) @@ -69,14 +69,14 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string, if name != "GasPerBlock" { // GasPerBlock is set on the next block checkResult(t, aers[1], stackitem.Make(defaultValue+1)) } - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) // Get in the next block. res, err := invokeContractMethod(chain, 100000000, hash, getName) require.NoError(t, err) checkResult(t, res, stackitem.Make(defaultValue+1)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) }) } @@ -131,7 +131,7 @@ func TestBlockedAccounts(t *testing.T) { res, err := invokeContractMethod(chain, 100000000, policyHash, "isBlocked", random.Uint160()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) }) @@ -142,7 +142,7 @@ func TestBlockedAccounts(t *testing.T) { isBlocked := chain.contracts.Policy.IsBlockedInternal(chain.dao, account) require.Equal(t, isBlocked, true) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) @@ -151,7 +151,7 @@ func TestBlockedAccounts(t *testing.T) { isBlocked = chain.contracts.Policy.IsBlockedInternal(chain.dao, account) require.Equal(t, false, isBlocked) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) }) @@ -160,28 +160,28 @@ func TestBlockedAccounts(t *testing.T) { res, err := invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(true)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) // double-block should fail res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) // unblock res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(true)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) // unblock the same account should fail as we don't have it blocked res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) }) diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index afe32481b..065c7303f 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -218,8 +218,20 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f } // Persist flushes all the MemoryStore contents into the (supposedly) persistent -// store ps. +// store ps. MemCachedStore remains accessible for the most part of this action +// (any new changes will be cached in memory). func (s *MemCachedStore) Persist() (int, error) { + return s.persist(false) +} + +// PersistSync flushes all the MemoryStore contents into the (supposedly) persistent +// store ps. It's different from Persist in that it blocks MemCachedStore completely +// while flushing things from memory to persistent store. +func (s *MemCachedStore) PersistSync() (int, error) { + return s.persist(true) +} + +func (s *MemCachedStore) persist(isSync bool) (int, error) { var err error var keys, dkeys int @@ -242,11 +254,15 @@ func (s *MemCachedStore) Persist() (int, error) { s.ps = tempstore s.mem = make(map[string][]byte) s.del = make(map[string]bool) - s.mut.Unlock() + if !isSync { + s.mut.Unlock() + } err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del) - s.mut.Lock() + if !isSync { + s.mut.Lock() + } if err == nil { // tempstore.mem and tempstore.del are completely flushed now // to tempstore.ps, so all KV pairs are the same and this