core: add and use synchronous persist to avoid OOM

b9be892bf9 has made Persist asynchronous which
is very effective in allowing the system to continue processing
blocks/transactions while flushing things to disk. It at the same time is very
dangerous in that if the disk is slow and it takes much time to flush KV set
(more than persisting interval), there might be even bigger new KV set in
MemCachedStore by the time it finishes. Even if the system immediately starts
to flush this new data set it (being bigger) can take more time than the
previous one. And while doing so a new data set will appear in memory,
potentially again bigger than this.

So we can easily end up with the system going out of control, consuming more
and more memory and taking more and more time to persist a single set of
data. To avoid this we need to detect such condition and just wait for Persist
to really finish its job and release the resources.
This commit is contained in:
Roman Khimov 2021-11-22 10:41:40 +03:00
parent aa06770b3d
commit 5fbe838fd4
6 changed files with 54 additions and 24 deletions

View file

@ -611,7 +611,7 @@ func (bc *Blockchain) Run() {
persistTimer := time.NewTimer(persistInterval) persistTimer := time.NewTimer(persistInterval)
defer func() { defer func() {
persistTimer.Stop() persistTimer.Stop()
if _, err := bc.persist(); err != nil { if _, err := bc.persist(true); err != nil {
bc.log.Warn("failed to persist", zap.Error(err)) bc.log.Warn("failed to persist", zap.Error(err))
} }
if err := bc.dao.Store.Close(); err != nil { if err := bc.dao.Store.Close(); err != nil {
@ -620,15 +620,17 @@ func (bc *Blockchain) Run() {
close(bc.runToExitCh) close(bc.runToExitCh)
}() }()
go bc.notificationDispatcher() go bc.notificationDispatcher()
var nextSync bool
for { for {
select { select {
case <-bc.stopCh: case <-bc.stopCh:
return return
case <-persistTimer.C: case <-persistTimer.C:
dur, err := bc.persist() dur, err := bc.persist(nextSync)
if err != nil { if err != nil {
bc.log.Warn("failed to persist blockchain", zap.Error(err)) bc.log.Warn("failed to persist blockchain", zap.Error(err))
} }
nextSync = dur > persistInterval*2
interval := persistInterval - dur interval := persistInterval - dur
if interval <= 0 { if interval <= 0 {
interval = time.Microsecond // Reset doesn't work with zero value interval = time.Microsecond // Reset doesn't work with zero value
@ -1520,7 +1522,7 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch {
} }
// persist flushes current in-memory Store contents to the persistent storage. // persist flushes current in-memory Store contents to the persistent storage.
func (bc *Blockchain) persist() (time.Duration, error) { func (bc *Blockchain) persist(isSync bool) (time.Duration, error) {
var ( var (
start = time.Now() start = time.Now()
duration time.Duration duration time.Duration
@ -1528,7 +1530,11 @@ func (bc *Blockchain) persist() (time.Duration, error) {
err error err error
) )
if isSync {
persisted, err = bc.dao.PersistSync()
} else {
persisted, err = bc.dao.Persist() persisted, err = bc.dao.Persist()
}
if err != nil { if err != nil {
return 0, err return 0, err
} }

View file

@ -122,7 +122,7 @@ func TestAddBlock(t *testing.T) {
assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash())
// This one tests persisting blocks, so it does need to persist() // This one tests persisting blocks, so it does need to persist()
_, err = bc.persist() _, err = bc.persist(false)
require.NoError(t, err) require.NoError(t, err)
for _, block := range blocks { for _, block := range blocks {
@ -241,7 +241,7 @@ func TestGetHeader(t *testing.T) {
b2 := bc.newBlock() b2 := bc.newBlock()
_, err = bc.GetHeader(b2.Hash()) _, err = bc.GetHeader(b2.Hash())
assert.Error(t, err) assert.Error(t, err)
_, err = bc.persist() _, err = bc.persist(false)
assert.NoError(t, err) assert.NoError(t, err)
} }
} }
@ -259,7 +259,7 @@ func TestGetBlock(t *testing.T) {
assert.Equal(t, blocks[i].Index, block.Index) assert.Equal(t, blocks[i].Index, block.Index)
assert.Equal(t, blocks[i].Hash(), block.Hash()) assert.Equal(t, blocks[i].Hash(), block.Hash())
} }
_, err = bc.persist() _, err = bc.persist(false)
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -1319,7 +1319,7 @@ func TestHasBlock(t *testing.T) {
} }
newBlock := bc.newBlock() newBlock := bc.newBlock()
assert.False(t, bc.HasBlock(newBlock.Hash())) assert.False(t, bc.HasBlock(newBlock.Hash()))
_, err = bc.persist() _, err = bc.persist(true)
assert.NoError(t, err) assert.NoError(t, err)
} }
} }
@ -1355,7 +1355,7 @@ func TestGetTransaction(t *testing.T) {
assert.Equal(t, block.Transactions[0], tx) assert.Equal(t, block.Transactions[0], tx)
assert.Equal(t, 1, io.GetVarSize(tx.Attributes)) assert.Equal(t, 1, io.GetVarSize(tx.Attributes))
assert.Equal(t, 1, io.GetVarSize(tx.Scripts)) assert.Equal(t, 1, io.GetVarSize(tx.Scripts))
_, err = bc.persist() _, err = bc.persist(true)
assert.NoError(t, err) assert.NoError(t, err)
} }
} }

View file

@ -54,6 +54,7 @@ type DAO interface {
GetWrapped() DAO GetWrapped() DAO
HasTransaction(hash util.Uint256) error HasTransaction(hash util.Uint256) error
Persist() (int, error) Persist() (int, error)
PersistSync() (int, error)
PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error
PutContractID(id int32, hash util.Uint160) error PutContractID(id int32, hash util.Uint160) error
PutCurrentHeader(hashAndIndex []byte) error PutCurrentHeader(hashAndIndex []byte) error
@ -651,11 +652,18 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32,
} }
// Persist flushes all the changes made into the (supposedly) persistent // Persist flushes all the changes made into the (supposedly) persistent
// underlying store. // underlying store. It doesn't block accesses to DAO from other threads.
func (dao *Simple) Persist() (int, error) { func (dao *Simple) Persist() (int, error) {
return dao.Store.Persist() return dao.Store.Persist()
} }
// PersistSync flushes all the changes made into the (supposedly) persistent
// underlying store. It's a synchronous version of Persist that doesn't allow
// other threads to work with DAO while flushing the Store.
func (dao *Simple) PersistSync() (int, error) {
return dao.Store.PersistSync()
}
// GetMPTBatch storage changes to be applied to MPT. // GetMPTBatch storage changes to be applied to MPT.
func (dao *Simple) GetMPTBatch() mpt.Batch { func (dao *Simple) GetMPTBatch() mpt.Batch {
var b mpt.Batch var b mpt.Batch

View file

@ -196,7 +196,7 @@ func TestNativeContract_Invoke(t *testing.T) {
res, err := invokeContractMethod(chain, price, tn.Metadata().Hash, "sum", int64(14), int64(28)) res, err := invokeContractMethod(chain, price, tn.Metadata().Hash, "sum", int64(14), int64(28))
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.Make(42)) checkResult(t, res, stackitem.Make(42))
_, err = chain.persist() _, err = chain.persist(false)
require.NoError(t, err) require.NoError(t, err)
select { select {

View file

@ -36,7 +36,7 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string,
res, err := invokeContractMethod(chain, 100000000, hash, getName) res, err := invokeContractMethod(chain, 100000000, hash, getName)
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.Make(defaultValue)) checkResult(t, res, stackitem.Make(defaultValue))
_, err = chain.persist() _, err = chain.persist(false)
require.NoError(t, err) require.NoError(t, err)
}) })
@ -69,14 +69,14 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string,
if name != "GasPerBlock" { // GasPerBlock is set on the next block if name != "GasPerBlock" { // GasPerBlock is set on the next block
checkResult(t, aers[1], stackitem.Make(defaultValue+1)) checkResult(t, aers[1], stackitem.Make(defaultValue+1))
} }
_, err = chain.persist() _, err = chain.persist(false)
require.NoError(t, err) require.NoError(t, err)
// Get in the next block. // Get in the next block.
res, err := invokeContractMethod(chain, 100000000, hash, getName) res, err := invokeContractMethod(chain, 100000000, hash, getName)
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.Make(defaultValue+1)) checkResult(t, res, stackitem.Make(defaultValue+1))
_, err = chain.persist() _, err = chain.persist(false)
require.NoError(t, err) require.NoError(t, err)
}) })
} }
@ -131,7 +131,7 @@ func TestBlockedAccounts(t *testing.T) {
res, err := invokeContractMethod(chain, 100000000, policyHash, "isBlocked", random.Uint160()) res, err := invokeContractMethod(chain, 100000000, policyHash, "isBlocked", random.Uint160())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false)) checkResult(t, res, stackitem.NewBool(false))
_, err = chain.persist() _, err = chain.persist(false)
require.NoError(t, err) require.NoError(t, err)
}) })
@ -142,7 +142,7 @@ func TestBlockedAccounts(t *testing.T) {
isBlocked := chain.contracts.Policy.IsBlockedInternal(chain.dao, account) isBlocked := chain.contracts.Policy.IsBlockedInternal(chain.dao, account)
require.Equal(t, isBlocked, true) require.Equal(t, isBlocked, true)
_, err = chain.persist() _, err = chain.persist(false)
require.NoError(t, err) require.NoError(t, err)
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
@ -151,7 +151,7 @@ func TestBlockedAccounts(t *testing.T) {
isBlocked = chain.contracts.Policy.IsBlockedInternal(chain.dao, account) isBlocked = chain.contracts.Policy.IsBlockedInternal(chain.dao, account)
require.Equal(t, false, isBlocked) require.Equal(t, false, isBlocked)
_, err = chain.persist() _, err = chain.persist(false)
require.NoError(t, err) require.NoError(t, err)
}) })
@ -160,28 +160,28 @@ func TestBlockedAccounts(t *testing.T) {
res, err := invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) res, err := invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(true)) checkResult(t, res, stackitem.NewBool(true))
_, err = chain.persist() _, err = chain.persist(false)
require.NoError(t, err) require.NoError(t, err)
// double-block should fail // double-block should fail
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false)) checkResult(t, res, stackitem.NewBool(false))
_, err = chain.persist() _, err = chain.persist(false)
require.NoError(t, err) require.NoError(t, err)
// unblock // unblock
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(true)) checkResult(t, res, stackitem.NewBool(true))
_, err = chain.persist() _, err = chain.persist(false)
require.NoError(t, err) require.NoError(t, err)
// unblock the same account should fail as we don't have it blocked // unblock the same account should fail as we don't have it blocked
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false)) checkResult(t, res, stackitem.NewBool(false))
_, err = chain.persist() _, err = chain.persist(false)
require.NoError(t, err) require.NoError(t, err)
}) })

View file

@ -218,8 +218,20 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
} }
// Persist flushes all the MemoryStore contents into the (supposedly) persistent // Persist flushes all the MemoryStore contents into the (supposedly) persistent
// store ps. // store ps. MemCachedStore remains accessible for the most part of this action
// (any new changes will be cached in memory).
func (s *MemCachedStore) Persist() (int, error) { func (s *MemCachedStore) Persist() (int, error) {
return s.persist(false)
}
// PersistSync flushes all the MemoryStore contents into the (supposedly) persistent
// store ps. It's different from Persist in that it blocks MemCachedStore completely
// while flushing things from memory to persistent store.
func (s *MemCachedStore) PersistSync() (int, error) {
return s.persist(true)
}
func (s *MemCachedStore) persist(isSync bool) (int, error) {
var err error var err error
var keys, dkeys int var keys, dkeys int
@ -242,11 +254,15 @@ func (s *MemCachedStore) Persist() (int, error) {
s.ps = tempstore s.ps = tempstore
s.mem = make(map[string][]byte) s.mem = make(map[string][]byte)
s.del = make(map[string]bool) s.del = make(map[string]bool)
if !isSync {
s.mut.Unlock() s.mut.Unlock()
}
err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del) err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del)
if !isSync {
s.mut.Lock() s.mut.Lock()
}
if err == nil { if err == nil {
// tempstore.mem and tempstore.del are completely flushed now // tempstore.mem and tempstore.del are completely flushed now
// to tempstore.ps, so all KV pairs are the same and this // to tempstore.ps, so all KV pairs are the same and this