Merge pull request #2102 from nspcc-dev/store4

Improve (*MemCachedStore).Persist
This commit is contained in:
Roman Khimov 2021-08-02 20:10:05 +03:00 committed by GitHub
commit dfc514eda0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 165 additions and 46 deletions

View file

@ -98,9 +98,13 @@ type Blockchain struct {
// with the only writer being the block addition logic. // with the only writer being the block addition logic.
lock sync.RWMutex lock sync.RWMutex
// Data access object for CRUD operations around storage. // Data access object for CRUD operations around storage. It's write-cached.
dao *dao.Simple dao *dao.Simple
// persistent is the same DB as dao, but we never write to it, so all reads
// are directly from underlying persistent store.
persistent *dao.Simple
// Current index/height of the highest block. // Current index/height of the highest block.
// Read access should always be called by BlockHeight(). // Read access should always be called by BlockHeight().
// Write access should only happen in storeBlock(). // Write access should only happen in storeBlock().
@ -215,6 +219,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
bc := &Blockchain{ bc := &Blockchain{
config: cfg, config: cfg,
dao: dao.NewSimple(s, cfg.StateRootInHeader), dao: dao.NewSimple(s, cfg.StateRootInHeader),
persistent: dao.NewSimple(s, cfg.StateRootInHeader),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
runToExitCh: make(chan struct{}), runToExitCh: make(chan struct{}),
memPool: mempool.New(cfg.MemPoolSize, 0, false), memPool: mempool.New(cfg.MemPoolSize, 0, false),
@ -394,7 +399,7 @@ func (bc *Blockchain) Run() {
persistTimer := time.NewTimer(persistInterval) persistTimer := time.NewTimer(persistInterval)
defer func() { defer func() {
persistTimer.Stop() persistTimer.Stop()
if err := bc.persist(); err != nil { if _, err := bc.persist(); err != nil {
bc.log.Warn("failed to persist", zap.Error(err)) bc.log.Warn("failed to persist", zap.Error(err))
} }
if err := bc.dao.Store.Close(); err != nil { if err := bc.dao.Store.Close(); err != nil {
@ -408,13 +413,15 @@ func (bc *Blockchain) Run() {
case <-bc.stopCh: case <-bc.stopCh:
return return
case <-persistTimer.C: case <-persistTimer.C:
go func() { dur, err := bc.persist()
err := bc.persist() if err != nil {
if err != nil { bc.log.Warn("failed to persist blockchain", zap.Error(err))
bc.log.Warn("failed to persist blockchain", zap.Error(err)) }
} interval := persistInterval - dur
persistTimer.Reset(persistInterval) if interval <= 0 {
}() interval = time.Microsecond // Reset doesn't work with zero value
}
persistTimer.Reset(interval)
} }
} }
} }
@ -1175,41 +1182,43 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch {
} }
// persist flushes current in-memory Store contents to the persistent storage. // persist flushes current in-memory Store contents to the persistent storage.
func (bc *Blockchain) persist() error { func (bc *Blockchain) persist() (time.Duration, error) {
var ( var (
start = time.Now() start = time.Now()
duration time.Duration
persisted int persisted int
err error err error
) )
persisted, err = bc.dao.Persist() persisted, err = bc.dao.Persist()
if err != nil { if err != nil {
return err return 0, err
} }
if persisted > 0 { if persisted > 0 {
bHeight, err := bc.dao.GetCurrentBlockHeight() bHeight, err := bc.persistent.GetCurrentBlockHeight()
if err != nil { if err != nil {
return err return 0, err
} }
oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight) oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight)
diff := bHeight - oldHeight diff := bHeight - oldHeight
storedHeaderHeight, _, err := bc.dao.GetCurrentHeaderHeight() storedHeaderHeight, _, err := bc.persistent.GetCurrentHeaderHeight()
if err != nil { if err != nil {
return err return 0, err
} }
duration = time.Since(start)
bc.log.Info("persisted to disk", bc.log.Info("persisted to disk",
zap.Uint32("blocks", diff), zap.Uint32("blocks", diff),
zap.Int("keys", persisted), zap.Int("keys", persisted),
zap.Uint32("headerHeight", storedHeaderHeight), zap.Uint32("headerHeight", storedHeaderHeight),
zap.Uint32("blockHeight", bHeight), zap.Uint32("blockHeight", bHeight),
zap.Duration("took", time.Since(start))) zap.Duration("took", duration))
// update monitoring metrics. // update monitoring metrics.
updatePersistedHeightMetric(bHeight) updatePersistedHeightMetric(bHeight)
} }
return nil return duration, nil
} }
// GetTransaction returns a TX and its height by the given hash. The height is MaxUint32 if tx is in the mempool. // GetTransaction returns a TX and its height by the given hash. The height is MaxUint32 if tx is in the mempool.

View file

@ -118,7 +118,8 @@ func TestAddBlock(t *testing.T) {
assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash())
// This one tests persisting blocks, so it does need to persist() // This one tests persisting blocks, so it does need to persist()
require.NoError(t, bc.persist()) _, err = bc.persist()
require.NoError(t, err)
for _, block := range blocks { for _, block := range blocks {
key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesBE()) key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesBE())
@ -236,7 +237,8 @@ func TestGetHeader(t *testing.T) {
b2 := bc.newBlock() b2 := bc.newBlock()
_, err = bc.GetHeader(b2.Hash()) _, err = bc.GetHeader(b2.Hash())
assert.Error(t, err) assert.Error(t, err)
assert.NoError(t, bc.persist()) _, err = bc.persist()
assert.NoError(t, err)
} }
} }
@ -253,7 +255,8 @@ func TestGetBlock(t *testing.T) {
assert.Equal(t, blocks[i].Index, block.Index) assert.Equal(t, blocks[i].Index, block.Index)
assert.Equal(t, blocks[i].Hash(), block.Hash()) assert.Equal(t, blocks[i].Hash(), block.Hash())
} }
assert.NoError(t, bc.persist()) _, err = bc.persist()
assert.NoError(t, err)
} }
t.Run("store only header", func(t *testing.T) { t.Run("store only header", func(t *testing.T) {
@ -1305,7 +1308,8 @@ func TestHasBlock(t *testing.T) {
} }
newBlock := bc.newBlock() newBlock := bc.newBlock()
assert.False(t, bc.HasBlock(newBlock.Hash())) assert.False(t, bc.HasBlock(newBlock.Hash()))
assert.NoError(t, bc.persist()) _, err = bc.persist()
assert.NoError(t, err)
} }
} }
@ -1340,7 +1344,8 @@ func TestGetTransaction(t *testing.T) {
assert.Equal(t, block.Transactions[0], tx) assert.Equal(t, block.Transactions[0], tx)
assert.Equal(t, 1, io.GetVarSize(tx.Attributes)) assert.Equal(t, 1, io.GetVarSize(tx.Attributes))
assert.Equal(t, 1, io.GetVarSize(tx.Scripts)) assert.Equal(t, 1, io.GetVarSize(tx.Scripts))
assert.NoError(t, bc.persist()) _, err = bc.persist()
assert.NoError(t, err)
} }
} }

View file

@ -196,7 +196,8 @@ func TestNativeContract_Invoke(t *testing.T) {
res, err := invokeContractMethod(chain, price, tn.Metadata().Hash, "sum", int64(14), int64(28)) res, err := invokeContractMethod(chain, price, tn.Metadata().Hash, "sum", int64(14), int64(28))
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.Make(42)) checkResult(t, res, stackitem.Make(42))
require.NoError(t, chain.persist()) _, err = chain.persist()
require.NoError(t, err)
select { select {
case index := <-tn.blocks: case index := <-tn.blocks:

View file

@ -36,7 +36,8 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string,
res, err := invokeContractMethod(chain, 100000000, hash, getName) res, err := invokeContractMethod(chain, 100000000, hash, getName)
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.Make(defaultValue)) checkResult(t, res, stackitem.Make(defaultValue))
require.NoError(t, chain.persist()) _, err = chain.persist()
require.NoError(t, err)
}) })
t.Run("set, too small value", func(t *testing.T) { t.Run("set, too small value", func(t *testing.T) {
@ -68,13 +69,15 @@ func testGetSet(t *testing.T, chain *Blockchain, hash util.Uint160, name string,
if name != "GasPerBlock" { // GasPerBlock is set on the next block if name != "GasPerBlock" { // GasPerBlock is set on the next block
checkResult(t, aers[1], stackitem.Make(defaultValue+1)) checkResult(t, aers[1], stackitem.Make(defaultValue+1))
} }
require.NoError(t, chain.persist()) _, err = chain.persist()
require.NoError(t, err)
// Get in the next block. // Get in the next block.
res, err := invokeContractMethod(chain, 100000000, hash, getName) res, err := invokeContractMethod(chain, 100000000, hash, getName)
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.Make(defaultValue+1)) checkResult(t, res, stackitem.Make(defaultValue+1))
require.NoError(t, chain.persist()) _, err = chain.persist()
require.NoError(t, err)
}) })
} }
@ -128,7 +131,8 @@ func TestBlockedAccounts(t *testing.T) {
res, err := invokeContractMethod(chain, 100000000, policyHash, "isBlocked", random.Uint160()) res, err := invokeContractMethod(chain, 100000000, policyHash, "isBlocked", random.Uint160())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false)) checkResult(t, res, stackitem.NewBool(false))
require.NoError(t, chain.persist()) _, err = chain.persist()
require.NoError(t, err)
}) })
t.Run("block-unblock account", func(t *testing.T) { t.Run("block-unblock account", func(t *testing.T) {
@ -138,7 +142,8 @@ func TestBlockedAccounts(t *testing.T) {
isBlocked := chain.contracts.Policy.IsBlockedInternal(chain.dao, account) isBlocked := chain.contracts.Policy.IsBlockedInternal(chain.dao, account)
require.Equal(t, isBlocked, true) require.Equal(t, isBlocked, true)
require.NoError(t, chain.persist()) _, err = chain.persist()
require.NoError(t, err)
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
require.NoError(t, err) require.NoError(t, err)
@ -146,7 +151,8 @@ func TestBlockedAccounts(t *testing.T) {
isBlocked = chain.contracts.Policy.IsBlockedInternal(chain.dao, account) isBlocked = chain.contracts.Policy.IsBlockedInternal(chain.dao, account)
require.Equal(t, false, isBlocked) require.Equal(t, false, isBlocked)
require.NoError(t, chain.persist()) _, err = chain.persist()
require.NoError(t, err)
}) })
t.Run("double-block", func(t *testing.T) { t.Run("double-block", func(t *testing.T) {
@ -154,25 +160,29 @@ func TestBlockedAccounts(t *testing.T) {
res, err := invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) res, err := invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(true)) checkResult(t, res, stackitem.NewBool(true))
require.NoError(t, chain.persist()) _, err = chain.persist()
require.NoError(t, err)
// double-block should fail // double-block should fail
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE()) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "blockAccount", true, account.BytesBE())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false)) checkResult(t, res, stackitem.NewBool(false))
require.NoError(t, chain.persist()) _, err = chain.persist()
require.NoError(t, err)
// unblock // unblock
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(true)) checkResult(t, res, stackitem.NewBool(true))
require.NoError(t, chain.persist()) _, err = chain.persist()
require.NoError(t, err)
// unblock the same account should fail as we don't have it blocked // unblock the same account should fail as we don't have it blocked
res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE()) res, err = invokeContractMethodGeneric(chain, 100000000, policyHash, "unblockAccount", true, account.BytesBE())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.NewBool(false)) checkResult(t, res, stackitem.NewBool(false))
require.NoError(t, chain.persist()) _, err = chain.persist()
require.NoError(t, err)
}) })
t.Run("not signed by committee", func(t *testing.T) { t.Run("not signed by committee", func(t *testing.T) {

View file

@ -1,10 +1,14 @@
package storage package storage
import "sync"
// MemCachedStore is a wrapper around persistent store that caches all changes // MemCachedStore is a wrapper around persistent store that caches all changes
// being made for them to be later flushed in one batch. // being made for them to be later flushed in one batch.
type MemCachedStore struct { type MemCachedStore struct {
MemoryStore MemoryStore
// plock protects Persist from double entrance.
plock sync.Mutex
// Persistent Store. // Persistent Store.
ps Store ps Store
} }
@ -96,45 +100,73 @@ func (s *MemCachedStore) Persist() (int, error) {
var err error var err error
var keys, dkeys int var keys, dkeys int
s.plock.Lock()
defer s.plock.Unlock()
s.mut.Lock() s.mut.Lock()
defer s.mut.Unlock()
keys = len(s.mem) keys = len(s.mem)
dkeys = len(s.del) dkeys = len(s.del)
if keys == 0 && dkeys == 0 { if keys == 0 && dkeys == 0 {
s.mut.Unlock()
return 0, nil return 0, nil
} }
memStore, ok := s.ps.(*MemoryStore) // tempstore technically copies current s in lower layer while real s
// starts using fresh new maps. This tempstore is only known here and
// nothing ever changes it, therefore accesses to it (reads) can go
// unprotected while writes are handled by s proper.
var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, del: s.del}, ps: s.ps}
s.ps = tempstore
s.mem = make(map[string][]byte)
s.del = make(map[string]bool)
s.mut.Unlock()
memStore, ok := tempstore.ps.(*MemoryStore)
if !ok { if !ok {
memCachedStore, ok := s.ps.(*MemCachedStore) memCachedStore, ok := tempstore.ps.(*MemCachedStore)
if ok { if ok {
memStore = &memCachedStore.MemoryStore memStore = &memCachedStore.MemoryStore
} }
} }
if memStore != nil { if memStore != nil {
memStore.mut.Lock() memStore.mut.Lock()
for k := range s.mem { for k := range tempstore.mem {
memStore.put(k, s.mem[k]) memStore.put(k, tempstore.mem[k])
} }
for k := range s.del { for k := range tempstore.del {
memStore.drop(k) memStore.drop(k)
} }
memStore.mut.Unlock() memStore.mut.Unlock()
} else { } else {
batch := s.ps.Batch() batch := tempstore.ps.Batch()
for k := range s.mem { for k := range tempstore.mem {
batch.Put([]byte(k), s.mem[k]) batch.Put([]byte(k), tempstore.mem[k])
} }
for k := range s.del { for k := range tempstore.del {
batch.Delete([]byte(k)) batch.Delete([]byte(k))
} }
err = s.ps.PutBatch(batch) err = tempstore.ps.PutBatch(batch)
} }
s.mut.Lock()
if err == nil { if err == nil {
s.mem = make(map[string][]byte) // tempstore.mem and tempstore.del are completely flushed now
s.del = make(map[string]bool) // to tempstore.ps, so all KV pairs are the same and this
// substitution has no visible effects.
s.ps = tempstore.ps
} else {
// We're toast. We'll try to still keep proper state, but OOM
// killer will get to us eventually.
for k := range s.mem {
tempstore.put(k, s.mem[k])
}
for k := range s.del {
tempstore.drop(k)
}
s.ps = tempstore.ps
s.mem = tempstore.mem
s.del = tempstore.del
} }
s.mut.Unlock()
return keys, err return keys, err
} }

View file

@ -177,3 +177,65 @@ func TestCachedSeek(t *testing.T) {
func newMemCachedStoreForTesting(t *testing.T) Store { func newMemCachedStoreForTesting(t *testing.T) Store {
return NewMemCachedStore(NewMemoryStore()) return NewMemCachedStore(NewMemoryStore())
} }
type BadBatch struct{}
func (b BadBatch) Delete(k []byte) {}
func (b BadBatch) Put(k, v []byte) {}
type BadStore struct {
onPutBatch func()
}
func (b *BadStore) Batch() Batch {
return BadBatch{}
}
func (b *BadStore) Delete(k []byte) error {
return nil
}
func (b *BadStore) Get([]byte) ([]byte, error) {
return nil, ErrKeyNotFound
}
func (b *BadStore) Put(k, v []byte) error {
return nil
}
func (b *BadStore) PutBatch(Batch) error {
b.onPutBatch()
return ErrKeyNotFound
}
func (b *BadStore) Seek(k []byte, f func(k, v []byte)) {
}
func (b *BadStore) Close() error {
return nil
}
func TestMemCachedPersistFailing(t *testing.T) {
var (
bs BadStore
t1 = []byte("t1")
t2 = []byte("t2")
b1 = []byte("b1")
)
// cached Store
ts := NewMemCachedStore(&bs)
// Set a pair of keys.
require.NoError(t, ts.Put(t1, t1))
require.NoError(t, ts.Put(t2, t2))
// This will be called during Persist().
bs.onPutBatch = func() {
// Drop one, add one.
require.NoError(t, ts.Put(b1, b1))
require.NoError(t, ts.Delete(t1))
}
_, err := ts.Persist()
require.Error(t, err)
// PutBatch() failed in Persist, but we still should have proper state.
_, err = ts.Get(t1)
require.Error(t, err)
res, err := ts.Get(t2)
require.NoError(t, err)
require.Equal(t, t2, res)
res, err = ts.Get(b1)
require.NoError(t, err)
require.Equal(t, b1, res)
}