From 856385b1067b492b6bea3221ae977769037aec15 Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Mon, 27 Sep 2021 16:35:25 +0300 Subject: [PATCH] dao: use custom storage prefix We use 2 prefixes for storing items because of state synchronization. This commit allows to parametrize dao with the default prefix. Signed-off-by: Evgeniy Stratonikov --- cli/server/dump.go | 5 +++-- pkg/core/blockchain.go | 8 +++++--- pkg/core/blockchain_test.go | 8 ++++++-- pkg/core/dao/dao.go | 27 +++++++++++++++++---------- pkg/core/dao/dao_test.go | 9 +++++++-- pkg/core/mpt/billet.go | 14 ++++++++------ pkg/core/mpt/billet_test.go | 16 ++++++++-------- pkg/core/mpt/trie.go | 2 +- pkg/core/statesync/module.go | 18 ++++++++++++++++-- pkg/core/statesync/module_test.go | 3 ++- pkg/core/statesync_test.go | 2 +- 11 files changed, 74 insertions(+), 38 deletions(-) diff --git a/cli/server/dump.go b/cli/server/dump.go index a35f2cad4..ff662f2a8 100644 --- a/cli/server/dump.go +++ b/cli/server/dump.go @@ -32,7 +32,7 @@ func batchToMap(index uint32, batch *storage.MemBatch) blockDump { ops := make([]storageOp, 0, size) for i := range batch.Put { key := batch.Put[i].Key - if len(key) == 0 || key[0] != byte(storage.STStorage) { + if len(key) == 0 || key[0] != byte(storage.STStorage) && key[0] != byte(storage.STTempStorage) { continue } @@ -50,7 +50,8 @@ func batchToMap(index uint32, batch *storage.MemBatch) blockDump { for i := range batch.Deleted { key := batch.Deleted[i].Key - if len(key) == 0 || key[0] != byte(storage.STStorage) || !batch.Deleted[i].Exists { + if len(key) == 0 || !batch.Deleted[i].Exists || + key[0] != byte(storage.STStorage) && key[0] != byte(storage.STTempStorage) { continue } diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 5666389e1..1a6d97f4c 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -490,7 +490,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error // Replace old storage items by new ones, it should be done step-by step. // Firstly, remove all old genesis-related items. b := bc.dao.Store.Batch() - bc.dao.Store.Seek([]byte{byte(storage.STStorage)}, func(k, _ []byte) { + bc.dao.Store.Seek([]byte{byte(bc.dao.StoragePrefix)}, func(k, _ []byte) { // #1468, but don't need to copy here, because it is done by Store. b.Delete(k) }) @@ -505,14 +505,16 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error for { count := 0 b := bc.dao.Store.Batch() - bc.dao.Store.Seek([]byte{byte(storage.STTempStorage)}, func(k, v []byte) { + currPrefix := byte(bc.dao.StoragePrefix) + syncPrefix := byte(statesync.TemporaryPrefix(bc.dao.StoragePrefix)) + bc.dao.Store.Seek([]byte{syncPrefix}, func(k, v []byte) { if count >= maxStorageBatchSize { return } // #1468, but don't need to copy here, because it is done by Store. b.Delete(k) key := make([]byte, len(k)) - key[0] = byte(storage.STStorage) + key[0] = currPrefix copy(key[1:], k[1:]) b.Put(key, slice.Copy(v)) count += 2 diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 4870be1d6..c930b46b3 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -1765,9 +1765,13 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { // put storage items with STTemp prefix batch := bcSpout.dao.Store.Batch() - bcSpout.dao.Store.Seek(storage.STStorage.Bytes(), func(k, v []byte) { + tempPrefix := storage.STTempStorage + if bcSpout.dao.StoragePrefix == tempPrefix { + tempPrefix = storage.STStorage + } + bcSpout.dao.Store.Seek(bcSpout.dao.StoragePrefix.Bytes(), func(k, v []byte) { key := slice.Copy(k) - key[0] = storage.STTempStorage.Bytes()[0] + key[0] = byte(tempPrefix) value := slice.Copy(v) batch.Put(key, value) }) diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 9cf17c306..e374de6b9 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -74,7 +74,8 @@ type DAO interface { // Simple is memCached wrapper around DB, simple DAO implementation. type Simple struct { - Store *storage.MemCachedStore + StoragePrefix storage.KeyPrefix + Store *storage.MemCachedStore // stateRootInHeader specifies if block header contains state root. stateRootInHeader bool // p2pSigExtensions denotes whether P2PSignatureExtensions are enabled. @@ -84,7 +85,12 @@ type Simple struct { // NewSimple creates new simple dao using provided backend store. func NewSimple(backend storage.Store, stateRootInHeader bool, p2pSigExtensions bool) *Simple { st := storage.NewMemCachedStore(backend) - return &Simple{Store: st, stateRootInHeader: stateRootInHeader, p2pSigExtensions: p2pSigExtensions} + return &Simple{ + StoragePrefix: storage.STStorage, + Store: st, + stateRootInHeader: stateRootInHeader, + p2pSigExtensions: p2pSigExtensions, + } } // GetBatch returns currently accumulated DB changeset. @@ -96,6 +102,7 @@ func (dao *Simple) GetBatch() *storage.MemBatch { // MemCachedStore around the current DAO Store. func (dao *Simple) GetWrapped() DAO { d := NewSimple(dao.Store, dao.stateRootInHeader, dao.p2pSigExtensions) + d.StoragePrefix = dao.StoragePrefix return d } @@ -277,7 +284,7 @@ func (dao *Simple) PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWrit // GetStorageItem returns StorageItem if it exists in the given store. func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem { - b, err := dao.Store.Get(makeStorageItemKey(id, key)) + b, err := dao.Store.Get(makeStorageItemKey(dao.StoragePrefix, id, key)) if err != nil { return nil } @@ -287,14 +294,14 @@ func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem { // PutStorageItem puts given StorageItem for given id with given // key into the given store. func (dao *Simple) PutStorageItem(id int32, key []byte, si state.StorageItem) error { - stKey := makeStorageItemKey(id, key) + stKey := makeStorageItemKey(dao.StoragePrefix, id, key) return dao.Store.Put(stKey, si) } // DeleteStorageItem drops storage item for the given id with the // given key from the store. func (dao *Simple) DeleteStorageItem(id int32, key []byte) error { - stKey := makeStorageItemKey(id, key) + stKey := makeStorageItemKey(dao.StoragePrefix, id, key) return dao.Store.Delete(stKey) } @@ -323,7 +330,7 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S // Seek executes f for all items with a given prefix. // If key is to be used outside of f, they may not be copied. func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { - lookupKey := makeStorageItemKey(id, nil) + lookupKey := makeStorageItemKey(dao.StoragePrefix, id, nil) if prefix != nil { lookupKey = append(lookupKey, prefix...) } @@ -335,7 +342,7 @@ func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { // SeekAsync sends all storage items matching given prefix to a channel and returns // the channel. Resulting keys and values may not be copied. func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue { - lookupKey := makeStorageItemKey(id, nil) + lookupKey := makeStorageItemKey(dao.StoragePrefix, id, nil) if prefix != nil { lookupKey = append(lookupKey, prefix...) } @@ -343,10 +350,10 @@ func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan } // makeStorageItemKey returns a key used to store StorageItem in the DB. -func makeStorageItemKey(id int32, key []byte) []byte { +func makeStorageItemKey(prefix storage.KeyPrefix, id int32, key []byte) []byte { // 1 for prefix + 4 for Uint32 + len(key) for key buf := make([]byte, 5+len(key)) - buf[0] = byte(storage.STStorage) + buf[0] = byte(prefix) binary.LittleEndian.PutUint32(buf[1:], uint32(id)) copy(buf[5:], key) return buf @@ -667,7 +674,7 @@ func (dao *Simple) PersistSync() (int, error) { // GetMPTBatch storage changes to be applied to MPT. func (dao *Simple) GetMPTBatch() mpt.Batch { var b mpt.Batch - dao.Store.MemoryStore.SeekAll([]byte{byte(storage.STStorage)}, func(k, v []byte) { + dao.Store.MemoryStore.SeekAll([]byte{byte(dao.StoragePrefix)}, func(k, v []byte) { b.Add(k[1:], v) }) return b diff --git a/pkg/core/dao/dao_test.go b/pkg/core/dao/dao_test.go index 00b6ee1fd..4ec8cc0d0 100644 --- a/pkg/core/dao/dao_test.go +++ b/pkg/core/dao/dao_test.go @@ -222,11 +222,16 @@ func TestMakeStorageItemKey(t *testing.T) { expected := []byte{byte(storage.STStorage), 0, 0, 0, 0, 1, 2, 3} binary.LittleEndian.PutUint32(expected[1:5], uint32(id)) - actual := makeStorageItemKey(id, []byte{1, 2, 3}) + actual := makeStorageItemKey(storage.STStorage, id, []byte{1, 2, 3}) require.Equal(t, expected, actual) expected = expected[0:5] - actual = makeStorageItemKey(id, nil) + actual = makeStorageItemKey(storage.STStorage, id, nil) + require.Equal(t, expected, actual) + + expected = []byte{byte(storage.STTempStorage), 0, 0, 0, 0, 1, 2, 3} + binary.LittleEndian.PutUint32(expected[1:5], uint32(id)) + actual = makeStorageItemKey(storage.STTempStorage, id, []byte{1, 2, 3}) require.Equal(t, expected, actual) } diff --git a/pkg/core/mpt/billet.go b/pkg/core/mpt/billet.go index 05212aafa..413b8507e 100644 --- a/pkg/core/mpt/billet.go +++ b/pkg/core/mpt/billet.go @@ -28,7 +28,8 @@ var ( // 3. Pair (node, path) must be restored only once. It's a duty of MPT pool to manage // MPT paths in order to provide this assumption. type Billet struct { - Store *storage.MemCachedStore + TempStoragePrefix storage.KeyPrefix + Store *storage.MemCachedStore root Node refcountEnabled bool @@ -38,11 +39,12 @@ type Billet struct { // to decouple storage errors from logic errors so that all storage errors are // processed during `store.Persist()` at the caller. This also has the benefit, // that every `Put` can be considered an atomic operation. -func NewBillet(rootHash util.Uint256, enableRefCount bool, store *storage.MemCachedStore) *Billet { +func NewBillet(rootHash util.Uint256, enableRefCount bool, prefix storage.KeyPrefix, store *storage.MemCachedStore) *Billet { return &Billet{ - Store: store, - root: NewHashNode(rootHash), - refcountEnabled: enableRefCount, + TempStoragePrefix: prefix, + Store: store, + root: NewHashNode(rootHash), + refcountEnabled: enableRefCount, } } @@ -64,7 +66,7 @@ func (b *Billet) RestoreHashNode(path []byte, node Node) error { // If it's a leaf, then put into temporary contract storage. if leaf, ok := node.(*LeafNode); ok { - k := append([]byte{byte(storage.STTempStorage)}, fromNibbles(path)...) + k := append([]byte{byte(b.TempStoragePrefix)}, fromNibbles(path)...) _ = b.Store.Put(k, leaf.value) } return nil diff --git a/pkg/core/mpt/billet_test.go b/pkg/core/mpt/billet_test.go index 7850b129e..4893d0d41 100644 --- a/pkg/core/mpt/billet_test.go +++ b/pkg/core/mpt/billet_test.go @@ -32,7 +32,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b.Children[5] = NewExtensionNode([]byte{0x01}, NewLeafNode([]byte{0xAB, 0xDE})) path := toNibbles([]byte{0xAC}) e := NewExtensionNode(path, NewHashNode(b.Hash())) - tr := NewBillet(e.Hash(), true, newTestStore()) + tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = e // OK @@ -61,7 +61,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { l := NewLeafNode([]byte{0xAB, 0xCD}) path := toNibbles([]byte{0xAC}) e := NewExtensionNode(path, NewHashNode(l.Hash())) - tr := NewBillet(e.Hash(), true, newTestStore()) + tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = e // OK @@ -87,7 +87,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { h := NewHashNode(util.Uint256{1, 2, 3}) path := toNibbles([]byte{0xAC}) e := NewExtensionNode(path, h) - tr := NewBillet(e.Hash(), true, newTestStore()) + tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = e // no-op @@ -99,7 +99,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { t.Run("parent is Leaf", func(t *testing.T) { l := NewLeafNode([]byte{0xAB, 0xCD}) path := []byte{} - tr := NewBillet(l.Hash(), true, newTestStore()) + tr := NewBillet(l.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = l // Already restored => panic expected @@ -121,7 +121,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b := NewBranchNode() b.Children[5] = NewHashNode(l1.Hash()) b.Children[lastChild] = NewHashNode(l2.Hash()) - tr := NewBillet(b.Hash(), true, newTestStore()) + tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = b // OK @@ -152,7 +152,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b := NewBranchNode() b.Children[5] = NewHashNode(l1.Hash()) b.Children[lastChild] = NewHashNode(l2.Hash()) - tr := NewBillet(b.Hash(), true, newTestStore()) + tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = b // OK @@ -179,7 +179,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { // two same hashnodes => leaf's refcount expected to be 2 in the end. b.Children[3] = NewHashNode(l.Hash()) b.Children[4] = NewHashNode(l.Hash()) - tr := NewBillet(b.Hash(), true, newTestStore()) + tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = b // OK @@ -202,7 +202,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b := NewBranchNode() b.Children[3] = NewHashNode(l.Hash()) b.Children[4] = NewHashNode(l.Hash()) - tr := NewBillet(b.Hash(), true, newTestStore()) + tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) // Should fail, because if it's a hash node with non-empty path, then the node // has already been collapsed. diff --git a/pkg/core/mpt/trie.go b/pkg/core/mpt/trie.go index ac54fed65..da466096d 100644 --- a/pkg/core/mpt/trie.go +++ b/pkg/core/mpt/trie.go @@ -566,7 +566,7 @@ func (t *Trie) Find(prefix, from []byte, max int) ([]storage.KeyValue, error) { res []storage.KeyValue count int ) - b := NewBillet(t.root.Hash(), false, t.Store) + b := NewBillet(t.root.Hash(), false, 0, t.Store) process := func(pathToNode []byte, node Node, _ []byte) bool { if leaf, ok := node.(*LeafNode); ok { if from == nil || !bytes.Equal(pathToNode, from) { // (*Billet).traverse includes `from` path into result if so. Need to filter out manually. diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index adc6a389a..a852f7f4d 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -165,6 +165,19 @@ func (s *Module) Init(currChainHeight uint32) error { return s.defineSyncStage() } +// TemporaryPrefix accepts current storage prefix and returns prefix +// to use for storing intermediate items during synchronization. +func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix { + switch currPrefix { + case storage.STStorage: + return storage.STTempStorage + case storage.STTempStorage: + return storage.STStorage + default: + panic(fmt.Sprintf("invalid storage prefix: %x", currPrefix)) + } +} + // defineSyncStage sequentially checks and sets sync state process stage after Module // initialization. It also performs initialization of MPT Billet if necessary. func (s *Module) defineSyncStage() error { @@ -194,7 +207,8 @@ func (s *Module) defineSyncStage() error { if err != nil { return fmt.Errorf("failed to get header to initialize MPT billet: %w", err) } - s.billet = mpt.NewBillet(header.PrevStateRoot, s.bc.GetConfig().KeepOnlyLatestState, s.dao.Store) + s.billet = mpt.NewBillet(header.PrevStateRoot, s.bc.GetConfig().KeepOnlyLatestState, + TemporaryPrefix(s.dao.StoragePrefix), s.dao.Store) s.log.Info("MPT billet initialized", zap.Uint32("height", s.syncPoint), zap.String("state root", header.PrevStateRoot.StringBE())) @@ -466,7 +480,7 @@ func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeByt s.lock.RLock() defer s.lock.RUnlock() - b := mpt.NewBillet(root, s.bc.GetConfig().KeepOnlyLatestState, storage.NewMemCachedStore(s.dao.Store)) + b := mpt.NewBillet(root, s.bc.GetConfig().KeepOnlyLatestState, 0, storage.NewMemCachedStore(s.dao.Store)) return b.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool { return process(node, nodeBytes) }, false) diff --git a/pkg/core/statesync/module_test.go b/pkg/core/statesync/module_test.go index c1bf6b117..1af841129 100644 --- a/pkg/core/statesync/module_test.go +++ b/pkg/core/statesync/module_test.go @@ -56,7 +56,8 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) { dao: dao.NewSimple(actualStorage, true, false), mptpool: NewPool(), } - stateSync.billet = mpt.NewBillet(sr, true, actualStorage) + stateSync.billet = mpt.NewBillet(sr, true, + TemporaryPrefix(stateSync.dao.StoragePrefix), actualStorage) stateSync.mptpool.Add(sr, []byte{}) // The test itself: we'll ask state sync module to restore each node exactly once. diff --git a/pkg/core/statesync_test.go b/pkg/core/statesync_test.go index c701c6700..81896fddc 100644 --- a/pkg/core/statesync_test.go +++ b/pkg/core/statesync_test.go @@ -421,7 +421,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { // compare storage states fetchStorage := func(bc *Blockchain) []storage.KeyValue { var kv []storage.KeyValue - bc.dao.Store.Seek(storage.STStorage.Bytes(), func(k, v []byte) { + bc.dao.Store.Seek(bc.dao.StoragePrefix.Bytes(), func(k, v []byte) { key := slice.Copy(k) value := slice.Copy(v) kv = append(kv, storage.KeyValue{