diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 9d0cb3a0f..25de99481 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -532,23 +532,17 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error fallthrough case newStorageItemsAdded: - b := bc.dao.Store.Batch() + cache := bc.dao.GetWrapped().(*dao.Simple) prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix) bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) bool { // #1468, but don't need to copy here, because it is done by Store. - b.Delete(k) + _ = cache.Store.Delete(k) return true }) - err := bc.dao.Store.PutBatch(b) - if err != nil { - return fmt.Errorf("failed to remove old storage items: %w", err) - } - // After current state is updated, we need to remove outdated state-related data if so. // The only outdated data we might have is genesis-related data, so check it. if p-bc.config.MaxTraceableBlocks > 0 { - cache := bc.dao.GetWrapped().(*dao.Simple) writeBuf.Reset() err := cache.DeleteBlock(bc.headerHashes[0], writeBuf) if err != nil { @@ -561,14 +555,11 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error return true }) } - _, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to drop genesis block state: %w", err) - } } - err = bc.dao.Store.Put(jumpStageKey, []byte{byte(genesisStateRemoved)}) + _ = cache.Store.Put(jumpStageKey, []byte{byte(genesisStateRemoved)}) + _, err := cache.Persist() if err != nil { - return fmt.Errorf("failed to store state jump stage: %w", err) + return fmt.Errorf("failed to persist old items removal: %w", err) } case genesisStateRemoved: // there's nothing to do after that, so just continue with common operations @@ -933,7 +924,7 @@ func (bc *Blockchain) AddHeaders(headers ...*block.Header) error { func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error { var ( start = time.Now() - batch = bc.dao.Store.Batch() + batch = bc.dao.GetWrapped().(*dao.Simple) err error ) @@ -982,7 +973,7 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error { } key := storage.AppendPrefix(storage.DataExecutable, h.Hash().BytesBE()) - batch.Put(key, buf.Bytes()) + _ = batch.Store.Put(key, buf.Bytes()) buf.Reset() lastHeader = h } @@ -995,13 +986,13 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error { } key := storage.AppendPrefixInt(storage.IXHeaderHashList, int(bc.storedHeaderCount)) - batch.Put(key, buf.Bytes()) + _ = batch.Store.Put(key, buf.Bytes()) bc.storedHeaderCount += headerBatchCount } - batch.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndexToBytes(lastHeader.Hash(), lastHeader.Index)) + _ = batch.Store.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndexToBytes(lastHeader.Hash(), lastHeader.Index)) updateHeaderHeightMetric(len(bc.headerHashes) - 1) - if err = bc.dao.Store.PutBatch(batch); err != nil { + if _, err = batch.Persist(); err != nil { return err } bc.log.Debug("done processing headers", diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index db3d56e7d..9c95b5e21 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -1846,7 +1846,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { require.NoError(t, bcSpout.AddHeaders(&b.Header)) // put storage items with STTemp prefix - batch := bcSpout.dao.Store.Batch() + batch := storage.NewMemCachedStore(bcSpout.dao.Store) tempPrefix := storage.STTempStorage if bcSpout.dao.Version.StoragePrefix == tempPrefix { tempPrefix = storage.STStorage @@ -1855,10 +1855,11 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { key := slice.Copy(k) key[0] = byte(tempPrefix) value := slice.Copy(v) - batch.Put(key, value) + _ = batch.Put(key, value) return true }) - require.NoError(t, bcSpout.dao.Store.PutBatch(batch)) + _, err := batch.Persist() + require.NoError(t, err) checkNewBlockchainErr := func(t *testing.T, cfg func(c *config.Config), store storage.Store, shouldFail bool) { unitTestNetCfg, err := config.Load("../../config", testchain.Network()) diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index aff495107..0573e1265 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -665,9 +665,9 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a return dao.Store.Put(key, buf.Bytes()) } -// DeleteBlock removes block from dao. +// DeleteBlock removes block from dao. It's not atomic, so make sure you're +// using private MemCached instance here. func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error { - batch := dao.Store.Batch() key := make([]byte, util.Uint256Size+1) key[0] = byte(storage.DataExecutable) copy(key[1:], h.BytesBE()) @@ -694,21 +694,21 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error { if w.Err != nil { return w.Err } - batch.Put(key, w.Bytes()) + _ = dao.Store.Put(key, w.Bytes()) for _, tx := range b.Transactions { copy(key[1:], tx.Hash().BytesBE()) - batch.Delete(key) + _ = dao.Store.Delete(key) if dao.Version.P2PSigExtensions { for _, attr := range tx.GetAttributes(transaction.ConflictsT) { hash := attr.Value.(*transaction.Conflicts).Hash copy(key[1:], hash.BytesBE()) - batch.Delete(key) + _ = dao.Store.Delete(key) } } } - return dao.Store.PutBatch(batch) + return nil } // StoreAsCurrentBlock stores a hash of the given block with prefix diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index d729e0b80..30c6fe310 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -146,13 +146,13 @@ func (s *Module) CleanStorage() error { if s.localHeight.Load() != 0 { return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load()) } - b := s.Store.Batch() + b := storage.NewMemCachedStore(s.Store) s.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.DataMPT)}}, func(k, _ []byte) bool { // #1468, but don't need to copy here, because it is done by Store. - b.Delete(k) + _ = b.Delete(k) return true }) - err := s.Store.PutBatch(b) + _, err := b.Persist() if err != nil { return fmt.Errorf("failed to remove outdated MPT-reated items: %w", err) } diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index d7d524435..8bbad9f72 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -263,18 +263,10 @@ func newMemCachedStoreForTesting(t testing.TB) Store { return NewMemCachedStore(NewMemoryStore()) } -type BadBatch struct{} - -func (b BadBatch) Delete(k []byte) {} -func (b BadBatch) Put(k, v []byte) {} - type BadStore struct { onPutBatch func() } -func (b *BadStore) Batch() Batch { - return BadBatch{} -} func (b *BadStore) Delete(k []byte) error { return nil } @@ -284,9 +276,6 @@ func (b *BadStore) Get([]byte) ([]byte, error) { func (b *BadStore) Put(k, v []byte) error { return nil } -func (b *BadStore) PutBatch(Batch) error { - return nil -} func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string][]byte) error { b.onPutBatch() return ErrKeyNotFound diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index fa31a1b0f..64c47a287 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -17,21 +17,6 @@ type MemoryStore struct { stor map[string][]byte } -// MemoryBatch is an in-memory batch compatible with MemoryStore. -type MemoryBatch struct { - MemoryStore -} - -// Put implements the Batch interface. -func (b *MemoryBatch) Put(k, v []byte) { - put(b.MemoryStore.chooseMap(k), string(k), slice.Copy(v)) -} - -// Delete implements Batch interface. -func (b *MemoryBatch) Delete(k []byte) { - drop(b.MemoryStore.chooseMap(k), string(k)) -} - // NewMemoryStore creates a new MemoryStore object. func NewMemoryStore() *MemoryStore { return &MemoryStore{ @@ -91,12 +76,6 @@ func (s *MemoryStore) Delete(key []byte) error { return nil } -// PutBatch implements the Store interface. Never returns an error. -func (s *MemoryStore) PutBatch(batch Batch) error { - b := batch.(*MemoryBatch) - return s.PutChangeSet(b.mem, b.stor) -} - // PutChangeSet implements the Store interface. Never returns an error. func (s *MemoryStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error { s.mut.Lock() @@ -187,16 +166,6 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) { } } -// Batch implements the Batch interface and returns a compatible Batch. -func (s *MemoryStore) Batch() Batch { - return newMemoryBatch() -} - -// newMemoryBatch returns new memory batch. -func newMemoryBatch() *MemoryBatch { - return &MemoryBatch{MemoryStore: *NewMemoryStore()} -} - // Close implements Store interface and clears up memory. Never returns an // error. func (s *MemoryStore) Close() error { diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index a99c5cc62..0776466cc 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -103,15 +103,6 @@ type ( Close() error } - // Batch represents an abstraction on top of batch operations. - // Each Store implementation is responsible of casting a Batch - // to its appropriate type. Batches can only be used in a single - // thread. - Batch interface { - Delete(k []byte) - Put(k, v []byte) - } - // KeyPrefix is a constant byte added as a prefix for each key // stored. KeyPrefix uint8