Merge pull request #2271 from nspcc-dev/fix-sync-oom

core: add and use synchronous persist to avoid OOM
This commit is contained in:
Roman Khimov 2021-11-23 12:37:48 +03:00 committed by GitHub
commit 019af7de46
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 54 additions and 24 deletions

View file

@ -611,7 +611,7 @@ func (bc *Blockchain) Run() {
persistTimer := time.NewTimer(persistInterval)
defer func() {
persistTimer.Stop()
if _, err := bc.persist(); err != nil {
if _, err := bc.persist(true); err != nil {
bc.log.Warn("failed to persist", zap.Error(err))
}
if err := bc.dao.Store.Close(); err != nil {
@ -620,15 +620,17 @@ func (bc *Blockchain) Run() {
close(bc.runToExitCh)
}()
go bc.notificationDispatcher()
var nextSync bool
for {
select {
case <-bc.stopCh:
return
case <-persistTimer.C:
dur, err := bc.persist()
dur, err := bc.persist(nextSync)
if err != nil {
bc.log.Warn("failed to persist blockchain", zap.Error(err))
}
nextSync = dur > persistInterval*2
interval := persistInterval - dur
if interval <= 0 {
interval = time.Microsecond // Reset doesn't work with zero value
@ -1520,7 +1522,7 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch {
}
// persist flushes current in-memory Store contents to the persistent storage.
func (bc *Blockchain) persist() (time.Duration, error) {
func (bc *Blockchain) persist(isSync bool) (time.Duration, error) {
var (
start = time.Now()
duration time.Duration
@ -1528,7 +1530,11 @@ func (bc *Blockchain) persist() (time.Duration, error) {
err error
)
persisted, err = bc.dao.Persist()
if isSync {
persisted, err = bc.dao.PersistSync()
} else {
persisted, err = bc.dao.Persist()
}
if err != nil {
return 0, err
}

View file

@ -122,7 +122,7 @@ func TestAddBlock(t *testing.T) {
assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash())
// This one tests persisting blocks, so it does need to persist()
_, err = bc.persist()
_, err = bc.persist(false)
require.NoError(t, err)
for _, block := range blocks {
@ -241,7 +241,7 @@ func TestGetHeader(t *testing.T) {
b2 := bc.newBlock()
_, err = bc.GetHeader(b2.Hash())
assert.Error(t, err)
_, err = bc.persist()
_, err = bc.persist(false)
assert.NoError(t, err)
}
}
@ -259,7 +259,7 @@ func TestGetBlock(t *testing.T) {
assert.Equal(t, blocks[i].Index, block.Index)
assert.Equal(t, blocks[i].Hash(), block.Hash())
}
_, err = bc.persist()
_, err = bc.persist(false)
assert.NoError(t, err)
}
@ -1319,7 +1319,7 @@ func TestHasBlock(t *testing.T) {
}
newBlock := bc.newBlock()
assert.False(t, bc.HasBlock(newBlock.Hash()))
_, err = bc.persist()
_, err = bc.persist(true)
assert.NoError(t, err)
}
}
@ -1355,7 +1355,7 @@ func TestGetTransaction(t *testing.T) {
assert.Equal(t, block.Transactions[0], tx)
assert.Equal(t, 1, io.GetVarSize(tx.Attributes))
assert.Equal(t, 1, io.GetVarSize(tx.Scripts))
_, err = bc.persist()
_, err = bc.persist(true)
assert.NoError(t, err)
}
}

View file

@ -54,6 +54,7 @@ type DAO interface {
GetWrapped() DAO
HasTransaction(hash util.Uint256) error
Persist() (int, error)
PersistSync() (int, error)
PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error
PutContractID(id int32, hash util.Uint160) error
PutCurrentHeader(hashAndIndex []byte) error
@ -651,11 +652,18 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32,
}
// Persist flushes all the changes made into the (supposedly) persistent
// underlying store.
// underlying store. It doesn't block accesses to DAO from other threads.
func (dao *Simple) Persist() (int, error) {
return dao.Store.Persist()
}
// PersistSync flushes all the changes made into the (supposedly) persistent
// underlying store. It's a synchronous version of Persist that doesn't allow
// other threads to work with DAO while flushing the Store.
func (dao *Simple) PersistSync() (int, error) {
return dao.Store.PersistSync()
}
// GetMPTBatch storage changes to be applied to MPT.
func (dao *Simple) GetMPTBatch() mpt.Batch {
var b mpt.Batch

View file

@ -196,7 +196,7 @@ func TestNativeContract_Invoke(t *testing.T) {
res, err := invokeContractMethod(chain, price, tn.Metadata().Hash, "sum", int64(14), int64(28))
require.NoError(t, err)
checkResult(t, res, stackitem.Make(42))
_, err = chain.persist()
_, err = chain.persist(false)
require.NoError(t, err)
select {

View file

@ -36,7 +36,7 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string,
res, err := invokeContractMethod(chain, 100000000, hash, getName)
require.NoError(t, err)
checkResult(t, res, stackitem.Make(defaultValue))
_, err = chain.persist()
_, err = chain.persist(false)
require.NoError(t, err)
})
@ -69,14 +69,14 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string,
if name != "GasPerBlock" { // GasPerBlock is set on the next block
checkResult(t, aers[1], stackitem.Make(defaultValue+1))
}
_, err = chain.persist()
_, err = chain.persist(false)
require.NoError(t, err)
// Get in the next block.
res, err := invokeContractMethod(chain, 100000000, hash, getName)
require.NoError(t, err)
checkResult(t, res, stackitem.Make(defaultValue+1))
_, err = chain.persist()
_, err = chain.persist(false)
require.NoError(t, err)
})
}
@ -131,7 +131,7 @@ func TestBlockedAccounts(t *testing.T) {
res, err := invokeContractMethod(chain, 100000000, policyHash, "isBlocked", random.Uint160())
require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false))
_, err = chain.persist()
_, err = chain.persist(false)
require.NoError(t, err)
})
@ -142,7 +142,7 @@ func TestBlockedAccounts(t *testing.T) {
isBlocked := chain.contracts.Policy.IsBlockedInternal(chain.dao, account)
require.Equal(t, isBlocked, true)
_, err = chain.persist()
_, err = chain.persist(false)
require.NoError(t, err)
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
@ -151,7 +151,7 @@ func TestBlockedAccounts(t *testing.T) {
isBlocked = chain.contracts.Policy.IsBlockedInternal(chain.dao, account)
require.Equal(t, false, isBlocked)
_, err = chain.persist()
_, err = chain.persist(false)
require.NoError(t, err)
})
@ -160,28 +160,28 @@ func TestBlockedAccounts(t *testing.T) {
res, err := invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE())
require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(true))
_, err = chain.persist()
_, err = chain.persist(false)
require.NoError(t, err)
// double-block should fail
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE())
require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false))
_, err = chain.persist()
_, err = chain.persist(false)
require.NoError(t, err)
// unblock
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(true))
_, err = chain.persist()
_, err = chain.persist(false)
require.NoError(t, err)
// unblock the same account should fail as we don't have it blocked
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false))
_, err = chain.persist()
_, err = chain.persist(false)
require.NoError(t, err)
})

View file

@ -218,8 +218,20 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
}
// Persist flushes all the MemoryStore contents into the (supposedly) persistent
// store ps.
// store ps. MemCachedStore remains accessible for the most part of this action
// (any new changes will be cached in memory).
func (s *MemCachedStore) Persist() (int, error) {
return s.persist(false)
}
// PersistSync flushes all the MemoryStore contents into the (supposedly) persistent
// store ps. It's different from Persist in that it blocks MemCachedStore completely
// while flushing things from memory to persistent store.
func (s *MemCachedStore) PersistSync() (int, error) {
return s.persist(true)
}
func (s *MemCachedStore) persist(isSync bool) (int, error) {
var err error
var keys, dkeys int
@ -242,11 +254,15 @@ func (s *MemCachedStore) Persist() (int, error) {
s.ps = tempstore
s.mem = make(map[string][]byte)
s.del = make(map[string]bool)
s.mut.Unlock()
if !isSync {
s.mut.Unlock()
}
err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del)
s.mut.Lock()
if !isSync {
s.mut.Lock()
}
if err == nil {
// tempstore.mem and tempstore.del are completely flushed now
// to tempstore.ps, so all KV pairs are the same and this