diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 1f92796d2..5666389e1 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -611,7 +611,7 @@ func (bc *Blockchain) Run() { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() - if _, err := bc.persist(); err != nil { + if _, err := bc.persist(true); err != nil { bc.log.Warn("failed to persist", zap.Error(err)) } if err := bc.dao.Store.Close(); err != nil { @@ -620,15 +620,17 @@ func (bc *Blockchain) Run() { close(bc.runToExitCh) }() go bc.notificationDispatcher() + var nextSync bool for { select { case <-bc.stopCh: return case <-persistTimer.C: - dur, err := bc.persist() + dur, err := bc.persist(nextSync) if err != nil { bc.log.Warn("failed to persist blockchain", zap.Error(err)) } + nextSync = dur > persistInterval*2 interval := persistInterval - dur if interval <= 0 { interval = time.Microsecond // Reset doesn't work with zero value @@ -1520,7 +1522,7 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch { } // persist flushes current in-memory Store contents to the persistent storage. -func (bc *Blockchain) persist() (time.Duration, error) { +func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { var ( start = time.Now() duration time.Duration @@ -1528,7 +1530,11 @@ func (bc *Blockchain) persist() (time.Duration, error) { err error ) - persisted, err = bc.dao.Persist() + if isSync { + persisted, err = bc.dao.PersistSync() + } else { + persisted, err = bc.dao.Persist() + } if err != nil { return 0, err } diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 36b45f85a..4870be1d6 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -122,7 +122,7 @@ func TestAddBlock(t *testing.T) { assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) // This one tests persisting blocks, so it does need to persist() - _, err = bc.persist() + _, err = bc.persist(false) require.NoError(t, err) for _, block := range blocks { @@ -241,7 +241,7 @@ func TestGetHeader(t *testing.T) { b2 := bc.newBlock() _, err = bc.GetHeader(b2.Hash()) assert.Error(t, err) - _, err = bc.persist() + _, err = bc.persist(false) assert.NoError(t, err) } } @@ -259,7 +259,7 @@ func TestGetBlock(t *testing.T) { assert.Equal(t, blocks[i].Index, block.Index) assert.Equal(t, blocks[i].Hash(), block.Hash()) } - _, err = bc.persist() + _, err = bc.persist(false) assert.NoError(t, err) } @@ -1319,7 +1319,7 @@ func TestHasBlock(t *testing.T) { } newBlock := bc.newBlock() assert.False(t, bc.HasBlock(newBlock.Hash())) - _, err = bc.persist() + _, err = bc.persist(true) assert.NoError(t, err) } } @@ -1355,7 +1355,7 @@ func TestGetTransaction(t *testing.T) { assert.Equal(t, block.Transactions[0], tx) assert.Equal(t, 1, io.GetVarSize(tx.Attributes)) assert.Equal(t, 1, io.GetVarSize(tx.Scripts)) - _, err = bc.persist() + _, err = bc.persist(true) assert.NoError(t, err) } } diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 52c30ec62..9cf17c306 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -54,6 +54,7 @@ type DAO interface { GetWrapped() DAO HasTransaction(hash util.Uint256) error Persist() (int, error) + PersistSync() (int, error) PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error PutContractID(id int32, hash util.Uint160) error PutCurrentHeader(hashAndIndex []byte) error @@ -651,11 +652,18 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, } // Persist flushes all the changes made into the (supposedly) persistent -// underlying store. +// underlying store. It doesn't block accesses to DAO from other threads. func (dao *Simple) Persist() (int, error) { return dao.Store.Persist() } +// PersistSync flushes all the changes made into the (supposedly) persistent +// underlying store. It's a synchronous version of Persist that doesn't allow +// other threads to work with DAO while flushing the Store. +func (dao *Simple) PersistSync() (int, error) { + return dao.Store.PersistSync() +} + // GetMPTBatch storage changes to be applied to MPT. func (dao *Simple) GetMPTBatch() mpt.Batch { var b mpt.Batch diff --git a/pkg/core/native_contract_test.go b/pkg/core/native_contract_test.go index aea370ef4..b9b652b63 100644 --- a/pkg/core/native_contract_test.go +++ b/pkg/core/native_contract_test.go @@ -196,7 +196,7 @@ func TestNativeContract_Invoke(t *testing.T) { res, err := invokeContractMethod(chain, price, tn.Metadata().Hash, "sum", int64(14), int64(28)) require.NoError(t, err) checkResult(t, res, stackitem.Make(42)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) select { diff --git a/pkg/core/native_policy_test.go b/pkg/core/native_policy_test.go index 5a041121c..8dd05fc3c 100644 --- a/pkg/core/native_policy_test.go +++ b/pkg/core/native_policy_test.go @@ -36,7 +36,7 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string, res, err := invokeContractMethod(chain, 100000000, hash, getName) require.NoError(t, err) checkResult(t, res, stackitem.Make(defaultValue)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) }) @@ -69,14 +69,14 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string, if name != "GasPerBlock" { // GasPerBlock is set on the next block checkResult(t, aers[1], stackitem.Make(defaultValue+1)) } - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) // Get in the next block. res, err := invokeContractMethod(chain, 100000000, hash, getName) require.NoError(t, err) checkResult(t, res, stackitem.Make(defaultValue+1)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) }) } @@ -131,7 +131,7 @@ func TestBlockedAccounts(t *testing.T) { res, err := invokeContractMethod(chain, 100000000, policyHash, "isBlocked", random.Uint160()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) }) @@ -142,7 +142,7 @@ func TestBlockedAccounts(t *testing.T) { isBlocked := chain.contracts.Policy.IsBlockedInternal(chain.dao, account) require.Equal(t, isBlocked, true) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) @@ -151,7 +151,7 @@ func TestBlockedAccounts(t *testing.T) { isBlocked = chain.contracts.Policy.IsBlockedInternal(chain.dao, account) require.Equal(t, false, isBlocked) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) }) @@ -160,28 +160,28 @@ func TestBlockedAccounts(t *testing.T) { res, err := invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(true)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) // double-block should fail res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) // unblock res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(true)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) // unblock the same account should fail as we don't have it blocked res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - _, err = chain.persist() + _, err = chain.persist(false) require.NoError(t, err) }) diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index afe32481b..065c7303f 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -218,8 +218,20 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f } // Persist flushes all the MemoryStore contents into the (supposedly) persistent -// store ps. +// store ps. MemCachedStore remains accessible for the most part of this action +// (any new changes will be cached in memory). func (s *MemCachedStore) Persist() (int, error) { + return s.persist(false) +} + +// PersistSync flushes all the MemoryStore contents into the (supposedly) persistent +// store ps. It's different from Persist in that it blocks MemCachedStore completely +// while flushing things from memory to persistent store. +func (s *MemCachedStore) PersistSync() (int, error) { + return s.persist(true) +} + +func (s *MemCachedStore) persist(isSync bool) (int, error) { var err error var keys, dkeys int @@ -242,11 +254,15 @@ func (s *MemCachedStore) Persist() (int, error) { s.ps = tempstore s.mem = make(map[string][]byte) s.del = make(map[string]bool) - s.mut.Unlock() + if !isSync { + s.mut.Unlock() + } err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del) - s.mut.Lock() + if !isSync { + s.mut.Lock() + } if err == nil { // tempstore.mem and tempstore.del are completely flushed now // to tempstore.ps, so all KV pairs are the same and this