diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 11d0d78de..00a7f58be 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -98,9 +98,13 @@ type Blockchain struct { // with the only writer being the block addition logic. lock sync.RWMutex - // Data access object for CRUD operations around storage. + // Data access object for CRUD operations around storage. It's write-cached. dao *dao.Simple + // persistent is the same DB as dao, but we never write to it, so all reads + // are directly from underlying persistent store. + persistent *dao.Simple + // Current index/height of the highest block. // Read access should always be called by BlockHeight(). // Write access should only happen in storeBlock(). @@ -215,6 +219,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L bc := &Blockchain{ config: cfg, dao: dao.NewSimple(s, cfg.StateRootInHeader), + persistent: dao.NewSimple(s, cfg.StateRootInHeader), stopCh: make(chan struct{}), runToExitCh: make(chan struct{}), memPool: mempool.New(cfg.MemPoolSize, 0, false), @@ -394,7 +399,7 @@ func (bc *Blockchain) Run() { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() - if err := bc.persist(); err != nil { + if _, err := bc.persist(); err != nil { bc.log.Warn("failed to persist", zap.Error(err)) } if err := bc.dao.Store.Close(); err != nil { @@ -408,13 +413,15 @@ func (bc *Blockchain) Run() { case <-bc.stopCh: return case <-persistTimer.C: - go func() { - err := bc.persist() - if err != nil { - bc.log.Warn("failed to persist blockchain", zap.Error(err)) - } - persistTimer.Reset(persistInterval) - }() + dur, err := bc.persist() + if err != nil { + bc.log.Warn("failed to persist blockchain", zap.Error(err)) + } + interval := persistInterval - dur + if interval <= 0 { + interval = time.Microsecond // Reset doesn't work with zero value + } + persistTimer.Reset(interval) } } } @@ -1175,41 +1182,43 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch { } // persist flushes current in-memory Store contents to the persistent storage. -func (bc *Blockchain) persist() error { +func (bc *Blockchain) persist() (time.Duration, error) { var ( start = time.Now() + duration time.Duration persisted int err error ) persisted, err = bc.dao.Persist() if err != nil { - return err + return 0, err } if persisted > 0 { - bHeight, err := bc.dao.GetCurrentBlockHeight() + bHeight, err := bc.persistent.GetCurrentBlockHeight() if err != nil { - return err + return 0, err } oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight) diff := bHeight - oldHeight - storedHeaderHeight, _, err := bc.dao.GetCurrentHeaderHeight() + storedHeaderHeight, _, err := bc.persistent.GetCurrentHeaderHeight() if err != nil { - return err + return 0, err } + duration = time.Since(start) bc.log.Info("persisted to disk", zap.Uint32("blocks", diff), zap.Int("keys", persisted), zap.Uint32("headerHeight", storedHeaderHeight), zap.Uint32("blockHeight", bHeight), - zap.Duration("took", time.Since(start))) + zap.Duration("took", duration)) // update monitoring metrics. updatePersistedHeightMetric(bHeight) } - return nil + return duration, nil } // GetTransaction returns a TX and its height by the given hash. The height is MaxUint32 if tx is in the mempool. diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index d3a2b7ead..8f0d00d51 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -118,7 +118,8 @@ func TestAddBlock(t *testing.T) { assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) // This one tests persisting blocks, so it does need to persist() - require.NoError(t, bc.persist()) + _, err = bc.persist() + require.NoError(t, err) for _, block := range blocks { key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesBE()) @@ -236,7 +237,8 @@ func TestGetHeader(t *testing.T) { b2 := bc.newBlock() _, err = bc.GetHeader(b2.Hash()) assert.Error(t, err) - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } } @@ -253,7 +255,8 @@ func TestGetBlock(t *testing.T) { assert.Equal(t, blocks[i].Index, block.Index) assert.Equal(t, blocks[i].Hash(), block.Hash()) } - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } t.Run("store only header", func(t *testing.T) { @@ -1305,7 +1308,8 @@ func TestHasBlock(t *testing.T) { } newBlock := bc.newBlock() assert.False(t, bc.HasBlock(newBlock.Hash())) - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } } @@ -1340,7 +1344,8 @@ func TestGetTransaction(t *testing.T) { assert.Equal(t, block.Transactions[0], tx) assert.Equal(t, 1, io.GetVarSize(tx.Attributes)) assert.Equal(t, 1, io.GetVarSize(tx.Scripts)) - assert.NoError(t, bc.persist()) + _, err = bc.persist() + assert.NoError(t, err) } } diff --git a/pkg/core/native_contract_test.go b/pkg/core/native_contract_test.go index 9c7b81fcb..eb2403b4b 100644 --- a/pkg/core/native_contract_test.go +++ b/pkg/core/native_contract_test.go @@ -196,7 +196,8 @@ func TestNativeContract_Invoke(t *testing.T) { res, err := invokeContractMethod(chain, price, tn.Metadata().Hash, "sum", int64(14), int64(28)) require.NoError(t, err) checkResult(t, res, stackitem.Make(42)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) select { case index := <-tn.blocks: diff --git a/pkg/core/native_policy_test.go b/pkg/core/native_policy_test.go index 3b315b28c..631a1718f 100644 --- a/pkg/core/native_policy_test.go +++ b/pkg/core/native_policy_test.go @@ -36,7 +36,8 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string, res, err := invokeContractMethod(chain, 100000000, hash, getName) require.NoError(t, err) checkResult(t, res, stackitem.Make(defaultValue)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("set, too small value", func(t *testing.T) { @@ -68,13 +69,15 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string, if name != "GasPerBlock" { // GasPerBlock is set on the next block checkResult(t, aers[1], stackitem.Make(defaultValue+1)) } - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // Get in the next block. res, err := invokeContractMethod(chain, 100000000, hash, getName) require.NoError(t, err) checkResult(t, res, stackitem.Make(defaultValue+1)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) } @@ -128,7 +131,8 @@ func TestBlockedAccounts(t *testing.T) { res, err := invokeContractMethod(chain, 100000000, policyHash, "isBlocked", random.Uint160()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("block-unblock account", func(t *testing.T) { @@ -138,7 +142,8 @@ func TestBlockedAccounts(t *testing.T) { isBlocked := chain.contracts.Policy.IsBlockedInternal(chain.dao, account) require.Equal(t, isBlocked, true) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) @@ -146,7 +151,8 @@ func TestBlockedAccounts(t *testing.T) { isBlocked = chain.contracts.Policy.IsBlockedInternal(chain.dao, account) require.Equal(t, false, isBlocked) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("double-block", func(t *testing.T) { @@ -154,25 +160,29 @@ func TestBlockedAccounts(t *testing.T) { res, err := invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(true)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // double-block should fail res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // unblock res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(true)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) // unblock the same account should fail as we don't have it blocked res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.NewBool(false)) - require.NoError(t, chain.persist()) + _, err = chain.persist() + require.NoError(t, err) }) t.Run("not signed by committee", func(t *testing.T) { diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 6c2cddf67..4be37f44c 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -1,10 +1,14 @@ package storage +import "sync" + // MemCachedStore is a wrapper around persistent store that caches all changes // being made for them to be later flushed in one batch. type MemCachedStore struct { MemoryStore + // plock protects Persist from double entrance. + plock sync.Mutex // Persistent Store. ps Store } @@ -96,45 +100,73 @@ func (s *MemCachedStore) Persist() (int, error) { var err error var keys, dkeys int + s.plock.Lock() + defer s.plock.Unlock() s.mut.Lock() - defer s.mut.Unlock() keys = len(s.mem) dkeys = len(s.del) if keys == 0 && dkeys == 0 { + s.mut.Unlock() return 0, nil } - memStore, ok := s.ps.(*MemoryStore) + // tempstore technically copies current s in lower layer while real s + // starts using fresh new maps. This tempstore is only known here and + // nothing ever changes it, therefore accesses to it (reads) can go + // unprotected while writes are handled by s proper. + var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, del: s.del}, ps: s.ps} + s.ps = tempstore + s.mem = make(map[string][]byte) + s.del = make(map[string]bool) + s.mut.Unlock() + + memStore, ok := tempstore.ps.(*MemoryStore) if !ok { - memCachedStore, ok := s.ps.(*MemCachedStore) + memCachedStore, ok := tempstore.ps.(*MemCachedStore) if ok { memStore = &memCachedStore.MemoryStore } } if memStore != nil { memStore.mut.Lock() - for k := range s.mem { - memStore.put(k, s.mem[k]) + for k := range tempstore.mem { + memStore.put(k, tempstore.mem[k]) } - for k := range s.del { + for k := range tempstore.del { memStore.drop(k) } memStore.mut.Unlock() } else { - batch := s.ps.Batch() - for k := range s.mem { - batch.Put([]byte(k), s.mem[k]) + batch := tempstore.ps.Batch() + for k := range tempstore.mem { + batch.Put([]byte(k), tempstore.mem[k]) } - for k := range s.del { + for k := range tempstore.del { batch.Delete([]byte(k)) } - err = s.ps.PutBatch(batch) + err = tempstore.ps.PutBatch(batch) } + s.mut.Lock() if err == nil { - s.mem = make(map[string][]byte) - s.del = make(map[string]bool) + // tempstore.mem and tempstore.del are completely flushed now + // to tempstore.ps, so all KV pairs are the same and this + // substitution has no visible effects. + s.ps = tempstore.ps + } else { + // We're toast. We'll try to still keep proper state, but OOM + // killer will get to us eventually. + for k := range s.mem { + tempstore.put(k, s.mem[k]) + } + for k := range s.del { + tempstore.drop(k) + } + s.ps = tempstore.ps + s.mem = tempstore.mem + s.del = tempstore.del } + s.mut.Unlock() return keys, err } diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index 8addd4088..7dfee3027 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -177,3 +177,65 @@ func TestCachedSeek(t *testing.T) { func newMemCachedStoreForTesting(t *testing.T) Store { return NewMemCachedStore(NewMemoryStore()) } + +type BadBatch struct{} + +func (b BadBatch) Delete(k []byte) {} +func (b BadBatch) Put(k, v []byte) {} + +type BadStore struct { + onPutBatch func() +} + +func (b *BadStore) Batch() Batch { + return BadBatch{} +} +func (b *BadStore) Delete(k []byte) error { + return nil +} +func (b *BadStore) Get([]byte) ([]byte, error) { + return nil, ErrKeyNotFound +} +func (b *BadStore) Put(k, v []byte) error { + return nil +} +func (b *BadStore) PutBatch(Batch) error { + b.onPutBatch() + return ErrKeyNotFound +} +func (b *BadStore) Seek(k []byte, f func(k, v []byte)) { +} +func (b *BadStore) Close() error { + return nil +} + +func TestMemCachedPersistFailing(t *testing.T) { + var ( + bs BadStore + t1 = []byte("t1") + t2 = []byte("t2") + b1 = []byte("b1") + ) + // cached Store + ts := NewMemCachedStore(&bs) + // Set a pair of keys. + require.NoError(t, ts.Put(t1, t1)) + require.NoError(t, ts.Put(t2, t2)) + // This will be called during Persist(). + bs.onPutBatch = func() { + // Drop one, add one. + require.NoError(t, ts.Put(b1, b1)) + require.NoError(t, ts.Delete(t1)) + } + _, err := ts.Persist() + require.Error(t, err) + // PutBatch() failed in Persist, but we still should have proper state. + _, err = ts.Get(t1) + require.Error(t, err) + res, err := ts.Get(t2) + require.NoError(t, err) + require.Equal(t, t2, res) + res, err = ts.Get(b1) + require.NoError(t, err) + require.Equal(t, b1, res) +}