storage: completely remove MemoryBatch

If you need something like that, just wrap another MemCachedStore layer around
it.
This commit is contained in:
Roman Khimov 2022-02-16 16:13:12 +03:00
parent 17a43b19e0
commit 017795c9c1
7 changed files with 23 additions and 82 deletions

View file

@ -532,23 +532,17 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
fallthrough fallthrough
case newStorageItemsAdded: case newStorageItemsAdded:
b := bc.dao.Store.Batch() cache := bc.dao.GetWrapped().(*dao.Simple)
prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix) prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) bool { bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) bool {
// #1468, but don't need to copy here, because it is done by Store. // #1468, but don't need to copy here, because it is done by Store.
b.Delete(k) _ = cache.Store.Delete(k)
return true return true
}) })
err := bc.dao.Store.PutBatch(b)
if err != nil {
return fmt.Errorf("failed to remove old storage items: %w", err)
}
// After current state is updated, we need to remove outdated state-related data if so. // After current state is updated, we need to remove outdated state-related data if so.
// The only outdated data we might have is genesis-related data, so check it. // The only outdated data we might have is genesis-related data, so check it.
if p-bc.config.MaxTraceableBlocks > 0 { if p-bc.config.MaxTraceableBlocks > 0 {
cache := bc.dao.GetWrapped().(*dao.Simple)
writeBuf.Reset() writeBuf.Reset()
err := cache.DeleteBlock(bc.headerHashes[0], writeBuf) err := cache.DeleteBlock(bc.headerHashes[0], writeBuf)
if err != nil { if err != nil {
@ -561,14 +555,11 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
return true return true
}) })
} }
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to drop genesis block state: %w", err)
} }
} _ = cache.Store.Put(jumpStageKey, []byte{byte(genesisStateRemoved)})
err = bc.dao.Store.Put(jumpStageKey, []byte{byte(genesisStateRemoved)}) _, err := cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed to store state jump stage: %w", err) return fmt.Errorf("failed to persist old items removal: %w", err)
} }
case genesisStateRemoved: case genesisStateRemoved:
// there's nothing to do after that, so just continue with common operations // there's nothing to do after that, so just continue with common operations
@ -933,7 +924,7 @@ func (bc *Blockchain) AddHeaders(headers ...*block.Header) error {
func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error { func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error {
var ( var (
start = time.Now() start = time.Now()
batch = bc.dao.Store.Batch() batch = bc.dao.GetWrapped().(*dao.Simple)
err error err error
) )
@ -982,7 +973,7 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error {
} }
key := storage.AppendPrefix(storage.DataExecutable, h.Hash().BytesBE()) key := storage.AppendPrefix(storage.DataExecutable, h.Hash().BytesBE())
batch.Put(key, buf.Bytes()) _ = batch.Store.Put(key, buf.Bytes())
buf.Reset() buf.Reset()
lastHeader = h lastHeader = h
} }
@ -995,13 +986,13 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error {
} }
key := storage.AppendPrefixInt(storage.IXHeaderHashList, int(bc.storedHeaderCount)) key := storage.AppendPrefixInt(storage.IXHeaderHashList, int(bc.storedHeaderCount))
batch.Put(key, buf.Bytes()) _ = batch.Store.Put(key, buf.Bytes())
bc.storedHeaderCount += headerBatchCount bc.storedHeaderCount += headerBatchCount
} }
batch.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndexToBytes(lastHeader.Hash(), lastHeader.Index)) _ = batch.Store.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndexToBytes(lastHeader.Hash(), lastHeader.Index))
updateHeaderHeightMetric(len(bc.headerHashes) - 1) updateHeaderHeightMetric(len(bc.headerHashes) - 1)
if err = bc.dao.Store.PutBatch(batch); err != nil { if _, err = batch.Persist(); err != nil {
return err return err
} }
bc.log.Debug("done processing headers", bc.log.Debug("done processing headers",

View file

@ -1846,7 +1846,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
require.NoError(t, bcSpout.AddHeaders(&b.Header)) require.NoError(t, bcSpout.AddHeaders(&b.Header))
// put storage items with STTemp prefix // put storage items with STTemp prefix
batch := bcSpout.dao.Store.Batch() batch := storage.NewMemCachedStore(bcSpout.dao.Store)
tempPrefix := storage.STTempStorage tempPrefix := storage.STTempStorage
if bcSpout.dao.Version.StoragePrefix == tempPrefix { if bcSpout.dao.Version.StoragePrefix == tempPrefix {
tempPrefix = storage.STStorage tempPrefix = storage.STStorage
@ -1855,10 +1855,11 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
key := slice.Copy(k) key := slice.Copy(k)
key[0] = byte(tempPrefix) key[0] = byte(tempPrefix)
value := slice.Copy(v) value := slice.Copy(v)
batch.Put(key, value) _ = batch.Put(key, value)
return true return true
}) })
require.NoError(t, bcSpout.dao.Store.PutBatch(batch)) _, err := batch.Persist()
require.NoError(t, err)
checkNewBlockchainErr := func(t *testing.T, cfg func(c *config.Config), store storage.Store, shouldFail bool) { checkNewBlockchainErr := func(t *testing.T, cfg func(c *config.Config), store storage.Store, shouldFail bool) {
unitTestNetCfg, err := config.Load("../../config", testchain.Network()) unitTestNetCfg, err := config.Load("../../config", testchain.Network())

View file

@ -665,9 +665,9 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a
return dao.Store.Put(key, buf.Bytes()) return dao.Store.Put(key, buf.Bytes())
} }
// DeleteBlock removes block from dao. // DeleteBlock removes block from dao. It's not atomic, so make sure you're
// using private MemCached instance here.
func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error { func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error {
batch := dao.Store.Batch()
key := make([]byte, util.Uint256Size+1) key := make([]byte, util.Uint256Size+1)
key[0] = byte(storage.DataExecutable) key[0] = byte(storage.DataExecutable)
copy(key[1:], h.BytesBE()) copy(key[1:], h.BytesBE())
@ -694,21 +694,21 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error {
if w.Err != nil { if w.Err != nil {
return w.Err return w.Err
} }
batch.Put(key, w.Bytes()) _ = dao.Store.Put(key, w.Bytes())
for _, tx := range b.Transactions { for _, tx := range b.Transactions {
copy(key[1:], tx.Hash().BytesBE()) copy(key[1:], tx.Hash().BytesBE())
batch.Delete(key) _ = dao.Store.Delete(key)
if dao.Version.P2PSigExtensions { if dao.Version.P2PSigExtensions {
for _, attr := range tx.GetAttributes(transaction.ConflictsT) { for _, attr := range tx.GetAttributes(transaction.ConflictsT) {
hash := attr.Value.(*transaction.Conflicts).Hash hash := attr.Value.(*transaction.Conflicts).Hash
copy(key[1:], hash.BytesBE()) copy(key[1:], hash.BytesBE())
batch.Delete(key) _ = dao.Store.Delete(key)
} }
} }
} }
return dao.Store.PutBatch(batch) return nil
} }
// StoreAsCurrentBlock stores a hash of the given block with prefix // StoreAsCurrentBlock stores a hash of the given block with prefix

View file

@ -146,13 +146,13 @@ func (s *Module) CleanStorage() error {
if s.localHeight.Load() != 0 { if s.localHeight.Load() != 0 {
return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load()) return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load())
} }
b := s.Store.Batch() b := storage.NewMemCachedStore(s.Store)
s.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.DataMPT)}}, func(k, _ []byte) bool { s.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.DataMPT)}}, func(k, _ []byte) bool {
// #1468, but don't need to copy here, because it is done by Store. // #1468, but don't need to copy here, because it is done by Store.
b.Delete(k) _ = b.Delete(k)
return true return true
}) })
err := s.Store.PutBatch(b) _, err := b.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed to remove outdated MPT-reated items: %w", err) return fmt.Errorf("failed to remove outdated MPT-reated items: %w", err)
} }

View file

@ -263,18 +263,10 @@ func newMemCachedStoreForTesting(t testing.TB) Store {
return NewMemCachedStore(NewMemoryStore()) return NewMemCachedStore(NewMemoryStore())
} }
type BadBatch struct{}
func (b BadBatch) Delete(k []byte) {}
func (b BadBatch) Put(k, v []byte) {}
type BadStore struct { type BadStore struct {
onPutBatch func() onPutBatch func()
} }
func (b *BadStore) Batch() Batch {
return BadBatch{}
}
func (b *BadStore) Delete(k []byte) error { func (b *BadStore) Delete(k []byte) error {
return nil return nil
} }
@ -284,9 +276,6 @@ func (b *BadStore) Get([]byte) ([]byte, error) {
func (b *BadStore) Put(k, v []byte) error { func (b *BadStore) Put(k, v []byte) error {
return nil return nil
} }
func (b *BadStore) PutBatch(Batch) error {
return nil
}
func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string][]byte) error { func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string][]byte) error {
b.onPutBatch() b.onPutBatch()
return ErrKeyNotFound return ErrKeyNotFound

View file

@ -17,21 +17,6 @@ type MemoryStore struct {
stor map[string][]byte stor map[string][]byte
} }
// MemoryBatch is an in-memory batch compatible with MemoryStore.
type MemoryBatch struct {
MemoryStore
}
// Put implements the Batch interface.
func (b *MemoryBatch) Put(k, v []byte) {
put(b.MemoryStore.chooseMap(k), string(k), slice.Copy(v))
}
// Delete implements Batch interface.
func (b *MemoryBatch) Delete(k []byte) {
drop(b.MemoryStore.chooseMap(k), string(k))
}
// NewMemoryStore creates a new MemoryStore object. // NewMemoryStore creates a new MemoryStore object.
func NewMemoryStore() *MemoryStore { func NewMemoryStore() *MemoryStore {
return &MemoryStore{ return &MemoryStore{
@ -91,12 +76,6 @@ func (s *MemoryStore) Delete(key []byte) error {
return nil return nil
} }
// PutBatch implements the Store interface. Never returns an error.
func (s *MemoryStore) PutBatch(batch Batch) error {
b := batch.(*MemoryBatch)
return s.PutChangeSet(b.mem, b.stor)
}
// PutChangeSet implements the Store interface. Never returns an error. // PutChangeSet implements the Store interface. Never returns an error.
func (s *MemoryStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error { func (s *MemoryStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error {
s.mut.Lock() s.mut.Lock()
@ -187,16 +166,6 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) {
} }
} }
// Batch implements the Batch interface and returns a compatible Batch.
func (s *MemoryStore) Batch() Batch {
return newMemoryBatch()
}
// newMemoryBatch returns new memory batch.
func newMemoryBatch() *MemoryBatch {
return &MemoryBatch{MemoryStore: *NewMemoryStore()}
}
// Close implements Store interface and clears up memory. Never returns an // Close implements Store interface and clears up memory. Never returns an
// error. // error.
func (s *MemoryStore) Close() error { func (s *MemoryStore) Close() error {

View file

@ -103,15 +103,6 @@ type (
Close() error Close() error
} }
// Batch represents an abstraction on top of batch operations.
// Each Store implementation is responsible of casting a Batch
// to its appropriate type. Batches can only be used in a single
// thread.
Batch interface {
Delete(k []byte)
Put(k, v []byte)
}
// KeyPrefix is a constant byte added as a prefix for each key // KeyPrefix is a constant byte added as a prefix for each key
// stored. // stored.
KeyPrefix uint8 KeyPrefix uint8