From 856385b1067b492b6bea3221ae977769037aec15 Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Mon, 27 Sep 2021 16:35:25 +0300 Subject: [PATCH 1/7] dao: use custom storage prefix We use 2 prefixes for storing items because of state synchronization. This commit allows to parametrize dao with the default prefix. Signed-off-by: Evgeniy Stratonikov --- cli/server/dump.go | 5 +++-- pkg/core/blockchain.go | 8 +++++--- pkg/core/blockchain_test.go | 8 ++++++-- pkg/core/dao/dao.go | 27 +++++++++++++++++---------- pkg/core/dao/dao_test.go | 9 +++++++-- pkg/core/mpt/billet.go | 14 ++++++++------ pkg/core/mpt/billet_test.go | 16 ++++++++-------- pkg/core/mpt/trie.go | 2 +- pkg/core/statesync/module.go | 18 ++++++++++++++++-- pkg/core/statesync/module_test.go | 3 ++- pkg/core/statesync_test.go | 2 +- 11 files changed, 74 insertions(+), 38 deletions(-) diff --git a/cli/server/dump.go b/cli/server/dump.go index a35f2cad4..ff662f2a8 100644 --- a/cli/server/dump.go +++ b/cli/server/dump.go @@ -32,7 +32,7 @@ func batchToMap(index uint32, batch *storage.MemBatch) blockDump { ops := make([]storageOp, 0, size) for i := range batch.Put { key := batch.Put[i].Key - if len(key) == 0 || key[0] != byte(storage.STStorage) { + if len(key) == 0 || key[0] != byte(storage.STStorage) && key[0] != byte(storage.STTempStorage) { continue } @@ -50,7 +50,8 @@ func batchToMap(index uint32, batch *storage.MemBatch) blockDump { for i := range batch.Deleted { key := batch.Deleted[i].Key - if len(key) == 0 || key[0] != byte(storage.STStorage) || !batch.Deleted[i].Exists { + if len(key) == 0 || !batch.Deleted[i].Exists || + key[0] != byte(storage.STStorage) && key[0] != byte(storage.STTempStorage) { continue } diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 5666389e1..1a6d97f4c 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -490,7 +490,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error // Replace old storage items by new ones, it should be done step-by step. // Firstly, remove all old genesis-related items. b := bc.dao.Store.Batch() - bc.dao.Store.Seek([]byte{byte(storage.STStorage)}, func(k, _ []byte) { + bc.dao.Store.Seek([]byte{byte(bc.dao.StoragePrefix)}, func(k, _ []byte) { // #1468, but don't need to copy here, because it is done by Store. b.Delete(k) }) @@ -505,14 +505,16 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error for { count := 0 b := bc.dao.Store.Batch() - bc.dao.Store.Seek([]byte{byte(storage.STTempStorage)}, func(k, v []byte) { + currPrefix := byte(bc.dao.StoragePrefix) + syncPrefix := byte(statesync.TemporaryPrefix(bc.dao.StoragePrefix)) + bc.dao.Store.Seek([]byte{syncPrefix}, func(k, v []byte) { if count >= maxStorageBatchSize { return } // #1468, but don't need to copy here, because it is done by Store. b.Delete(k) key := make([]byte, len(k)) - key[0] = byte(storage.STStorage) + key[0] = currPrefix copy(key[1:], k[1:]) b.Put(key, slice.Copy(v)) count += 2 diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 4870be1d6..c930b46b3 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -1765,9 +1765,13 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { // put storage items with STTemp prefix batch := bcSpout.dao.Store.Batch() - bcSpout.dao.Store.Seek(storage.STStorage.Bytes(), func(k, v []byte) { + tempPrefix := storage.STTempStorage + if bcSpout.dao.StoragePrefix == tempPrefix { + tempPrefix = storage.STStorage + } + bcSpout.dao.Store.Seek(bcSpout.dao.StoragePrefix.Bytes(), func(k, v []byte) { key := slice.Copy(k) - key[0] = storage.STTempStorage.Bytes()[0] + key[0] = byte(tempPrefix) value := slice.Copy(v) batch.Put(key, value) }) diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 9cf17c306..e374de6b9 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -74,7 +74,8 @@ type DAO interface { // Simple is memCached wrapper around DB, simple DAO implementation. type Simple struct { - Store *storage.MemCachedStore + StoragePrefix storage.KeyPrefix + Store *storage.MemCachedStore // stateRootInHeader specifies if block header contains state root. stateRootInHeader bool // p2pSigExtensions denotes whether P2PSignatureExtensions are enabled. @@ -84,7 +85,12 @@ type Simple struct { // NewSimple creates new simple dao using provided backend store. func NewSimple(backend storage.Store, stateRootInHeader bool, p2pSigExtensions bool) *Simple { st := storage.NewMemCachedStore(backend) - return &Simple{Store: st, stateRootInHeader: stateRootInHeader, p2pSigExtensions: p2pSigExtensions} + return &Simple{ + StoragePrefix: storage.STStorage, + Store: st, + stateRootInHeader: stateRootInHeader, + p2pSigExtensions: p2pSigExtensions, + } } // GetBatch returns currently accumulated DB changeset. @@ -96,6 +102,7 @@ func (dao *Simple) GetBatch() *storage.MemBatch { // MemCachedStore around the current DAO Store. func (dao *Simple) GetWrapped() DAO { d := NewSimple(dao.Store, dao.stateRootInHeader, dao.p2pSigExtensions) + d.StoragePrefix = dao.StoragePrefix return d } @@ -277,7 +284,7 @@ func (dao *Simple) PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWrit // GetStorageItem returns StorageItem if it exists in the given store. func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem { - b, err := dao.Store.Get(makeStorageItemKey(id, key)) + b, err := dao.Store.Get(makeStorageItemKey(dao.StoragePrefix, id, key)) if err != nil { return nil } @@ -287,14 +294,14 @@ func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem { // PutStorageItem puts given StorageItem for given id with given // key into the given store. func (dao *Simple) PutStorageItem(id int32, key []byte, si state.StorageItem) error { - stKey := makeStorageItemKey(id, key) + stKey := makeStorageItemKey(dao.StoragePrefix, id, key) return dao.Store.Put(stKey, si) } // DeleteStorageItem drops storage item for the given id with the // given key from the store. func (dao *Simple) DeleteStorageItem(id int32, key []byte) error { - stKey := makeStorageItemKey(id, key) + stKey := makeStorageItemKey(dao.StoragePrefix, id, key) return dao.Store.Delete(stKey) } @@ -323,7 +330,7 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S // Seek executes f for all items with a given prefix. // If key is to be used outside of f, they may not be copied. func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { - lookupKey := makeStorageItemKey(id, nil) + lookupKey := makeStorageItemKey(dao.StoragePrefix, id, nil) if prefix != nil { lookupKey = append(lookupKey, prefix...) } @@ -335,7 +342,7 @@ func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { // SeekAsync sends all storage items matching given prefix to a channel and returns // the channel. Resulting keys and values may not be copied. func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue { - lookupKey := makeStorageItemKey(id, nil) + lookupKey := makeStorageItemKey(dao.StoragePrefix, id, nil) if prefix != nil { lookupKey = append(lookupKey, prefix...) } @@ -343,10 +350,10 @@ func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan } // makeStorageItemKey returns a key used to store StorageItem in the DB. -func makeStorageItemKey(id int32, key []byte) []byte { +func makeStorageItemKey(prefix storage.KeyPrefix, id int32, key []byte) []byte { // 1 for prefix + 4 for Uint32 + len(key) for key buf := make([]byte, 5+len(key)) - buf[0] = byte(storage.STStorage) + buf[0] = byte(prefix) binary.LittleEndian.PutUint32(buf[1:], uint32(id)) copy(buf[5:], key) return buf @@ -667,7 +674,7 @@ func (dao *Simple) PersistSync() (int, error) { // GetMPTBatch storage changes to be applied to MPT. func (dao *Simple) GetMPTBatch() mpt.Batch { var b mpt.Batch - dao.Store.MemoryStore.SeekAll([]byte{byte(storage.STStorage)}, func(k, v []byte) { + dao.Store.MemoryStore.SeekAll([]byte{byte(dao.StoragePrefix)}, func(k, v []byte) { b.Add(k[1:], v) }) return b diff --git a/pkg/core/dao/dao_test.go b/pkg/core/dao/dao_test.go index 00b6ee1fd..4ec8cc0d0 100644 --- a/pkg/core/dao/dao_test.go +++ b/pkg/core/dao/dao_test.go @@ -222,11 +222,16 @@ func TestMakeStorageItemKey(t *testing.T) { expected := []byte{byte(storage.STStorage), 0, 0, 0, 0, 1, 2, 3} binary.LittleEndian.PutUint32(expected[1:5], uint32(id)) - actual := makeStorageItemKey(id, []byte{1, 2, 3}) + actual := makeStorageItemKey(storage.STStorage, id, []byte{1, 2, 3}) require.Equal(t, expected, actual) expected = expected[0:5] - actual = makeStorageItemKey(id, nil) + actual = makeStorageItemKey(storage.STStorage, id, nil) + require.Equal(t, expected, actual) + + expected = []byte{byte(storage.STTempStorage), 0, 0, 0, 0, 1, 2, 3} + binary.LittleEndian.PutUint32(expected[1:5], uint32(id)) + actual = makeStorageItemKey(storage.STTempStorage, id, []byte{1, 2, 3}) require.Equal(t, expected, actual) } diff --git a/pkg/core/mpt/billet.go b/pkg/core/mpt/billet.go index 05212aafa..413b8507e 100644 --- a/pkg/core/mpt/billet.go +++ b/pkg/core/mpt/billet.go @@ -28,7 +28,8 @@ var ( // 3. Pair (node, path) must be restored only once. It's a duty of MPT pool to manage // MPT paths in order to provide this assumption. type Billet struct { - Store *storage.MemCachedStore + TempStoragePrefix storage.KeyPrefix + Store *storage.MemCachedStore root Node refcountEnabled bool @@ -38,11 +39,12 @@ type Billet struct { // to decouple storage errors from logic errors so that all storage errors are // processed during `store.Persist()` at the caller. This also has the benefit, // that every `Put` can be considered an atomic operation. -func NewBillet(rootHash util.Uint256, enableRefCount bool, store *storage.MemCachedStore) *Billet { +func NewBillet(rootHash util.Uint256, enableRefCount bool, prefix storage.KeyPrefix, store *storage.MemCachedStore) *Billet { return &Billet{ - Store: store, - root: NewHashNode(rootHash), - refcountEnabled: enableRefCount, + TempStoragePrefix: prefix, + Store: store, + root: NewHashNode(rootHash), + refcountEnabled: enableRefCount, } } @@ -64,7 +66,7 @@ func (b *Billet) RestoreHashNode(path []byte, node Node) error { // If it's a leaf, then put into temporary contract storage. if leaf, ok := node.(*LeafNode); ok { - k := append([]byte{byte(storage.STTempStorage)}, fromNibbles(path)...) + k := append([]byte{byte(b.TempStoragePrefix)}, fromNibbles(path)...) _ = b.Store.Put(k, leaf.value) } return nil diff --git a/pkg/core/mpt/billet_test.go b/pkg/core/mpt/billet_test.go index 7850b129e..4893d0d41 100644 --- a/pkg/core/mpt/billet_test.go +++ b/pkg/core/mpt/billet_test.go @@ -32,7 +32,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b.Children[5] = NewExtensionNode([]byte{0x01}, NewLeafNode([]byte{0xAB, 0xDE})) path := toNibbles([]byte{0xAC}) e := NewExtensionNode(path, NewHashNode(b.Hash())) - tr := NewBillet(e.Hash(), true, newTestStore()) + tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = e // OK @@ -61,7 +61,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { l := NewLeafNode([]byte{0xAB, 0xCD}) path := toNibbles([]byte{0xAC}) e := NewExtensionNode(path, NewHashNode(l.Hash())) - tr := NewBillet(e.Hash(), true, newTestStore()) + tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = e // OK @@ -87,7 +87,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { h := NewHashNode(util.Uint256{1, 2, 3}) path := toNibbles([]byte{0xAC}) e := NewExtensionNode(path, h) - tr := NewBillet(e.Hash(), true, newTestStore()) + tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = e // no-op @@ -99,7 +99,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { t.Run("parent is Leaf", func(t *testing.T) { l := NewLeafNode([]byte{0xAB, 0xCD}) path := []byte{} - tr := NewBillet(l.Hash(), true, newTestStore()) + tr := NewBillet(l.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = l // Already restored => panic expected @@ -121,7 +121,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b := NewBranchNode() b.Children[5] = NewHashNode(l1.Hash()) b.Children[lastChild] = NewHashNode(l2.Hash()) - tr := NewBillet(b.Hash(), true, newTestStore()) + tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = b // OK @@ -152,7 +152,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b := NewBranchNode() b.Children[5] = NewHashNode(l1.Hash()) b.Children[lastChild] = NewHashNode(l2.Hash()) - tr := NewBillet(b.Hash(), true, newTestStore()) + tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = b // OK @@ -179,7 +179,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { // two same hashnodes => leaf's refcount expected to be 2 in the end. b.Children[3] = NewHashNode(l.Hash()) b.Children[4] = NewHashNode(l.Hash()) - tr := NewBillet(b.Hash(), true, newTestStore()) + tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) tr.root = b // OK @@ -202,7 +202,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b := NewBranchNode() b.Children[3] = NewHashNode(l.Hash()) b.Children[4] = NewHashNode(l.Hash()) - tr := NewBillet(b.Hash(), true, newTestStore()) + tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) // Should fail, because if it's a hash node with non-empty path, then the node // has already been collapsed. diff --git a/pkg/core/mpt/trie.go b/pkg/core/mpt/trie.go index ac54fed65..da466096d 100644 --- a/pkg/core/mpt/trie.go +++ b/pkg/core/mpt/trie.go @@ -566,7 +566,7 @@ func (t *Trie) Find(prefix, from []byte, max int) ([]storage.KeyValue, error) { res []storage.KeyValue count int ) - b := NewBillet(t.root.Hash(), false, t.Store) + b := NewBillet(t.root.Hash(), false, 0, t.Store) process := func(pathToNode []byte, node Node, _ []byte) bool { if leaf, ok := node.(*LeafNode); ok { if from == nil || !bytes.Equal(pathToNode, from) { // (*Billet).traverse includes `from` path into result if so. Need to filter out manually. diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index adc6a389a..a852f7f4d 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -165,6 +165,19 @@ func (s *Module) Init(currChainHeight uint32) error { return s.defineSyncStage() } +// TemporaryPrefix accepts current storage prefix and returns prefix +// to use for storing intermediate items during synchronization. +func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix { + switch currPrefix { + case storage.STStorage: + return storage.STTempStorage + case storage.STTempStorage: + return storage.STStorage + default: + panic(fmt.Sprintf("invalid storage prefix: %x", currPrefix)) + } +} + // defineSyncStage sequentially checks and sets sync state process stage after Module // initialization. It also performs initialization of MPT Billet if necessary. func (s *Module) defineSyncStage() error { @@ -194,7 +207,8 @@ func (s *Module) defineSyncStage() error { if err != nil { return fmt.Errorf("failed to get header to initialize MPT billet: %w", err) } - s.billet = mpt.NewBillet(header.PrevStateRoot, s.bc.GetConfig().KeepOnlyLatestState, s.dao.Store) + s.billet = mpt.NewBillet(header.PrevStateRoot, s.bc.GetConfig().KeepOnlyLatestState, + TemporaryPrefix(s.dao.StoragePrefix), s.dao.Store) s.log.Info("MPT billet initialized", zap.Uint32("height", s.syncPoint), zap.String("state root", header.PrevStateRoot.StringBE())) @@ -466,7 +480,7 @@ func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeByt s.lock.RLock() defer s.lock.RUnlock() - b := mpt.NewBillet(root, s.bc.GetConfig().KeepOnlyLatestState, storage.NewMemCachedStore(s.dao.Store)) + b := mpt.NewBillet(root, s.bc.GetConfig().KeepOnlyLatestState, 0, storage.NewMemCachedStore(s.dao.Store)) return b.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool { return process(node, nodeBytes) }, false) diff --git a/pkg/core/statesync/module_test.go b/pkg/core/statesync/module_test.go index c1bf6b117..1af841129 100644 --- a/pkg/core/statesync/module_test.go +++ b/pkg/core/statesync/module_test.go @@ -56,7 +56,8 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) { dao: dao.NewSimple(actualStorage, true, false), mptpool: NewPool(), } - stateSync.billet = mpt.NewBillet(sr, true, actualStorage) + stateSync.billet = mpt.NewBillet(sr, true, + TemporaryPrefix(stateSync.dao.StoragePrefix), actualStorage) stateSync.mptpool.Add(sr, []byte{}) // The test itself: we'll ask state sync module to restore each node exactly once. diff --git a/pkg/core/statesync_test.go b/pkg/core/statesync_test.go index c701c6700..81896fddc 100644 --- a/pkg/core/statesync_test.go +++ b/pkg/core/statesync_test.go @@ -421,7 +421,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { // compare storage states fetchStorage := func(bc *Blockchain) []storage.KeyValue { var kv []storage.KeyValue - bc.dao.Store.Seek(storage.STStorage.Bytes(), func(k, v []byte) { + bc.dao.Store.Seek(bc.dao.StoragePrefix.Bytes(), func(k, v []byte) { key := slice.Copy(k) value := slice.Copy(v) kv = append(kv, storage.KeyValue{ From 6c5a7d9b29c7b1153227d51a6deaa483be446a41 Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Wed, 20 Oct 2021 17:19:16 +0300 Subject: [PATCH 2/7] dao: include settings in `Version` Signed-off-by: Evgeniy Stratonikov --- pkg/core/blockchain.go | 11 +++++---- pkg/core/dao/dao.go | 50 ++++++++++++++++++++++++++++++++++------ pkg/core/dao/dao_test.go | 16 ++++++++++--- 3 files changed, 63 insertions(+), 14 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 1a6d97f4c..fb15b82e0 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -45,7 +45,7 @@ import ( // Tuning parameters. const ( headerBatchCount = 2000 - version = "0.1.5" + version = "0.2.0" defaultInitialGAS = 52000000_00000000 defaultMemPoolSize = 50000 @@ -311,7 +311,8 @@ func (bc *Blockchain) init() error { ver, err := bc.dao.GetVersion() if err != nil { bc.log.Info("no storage version found! creating genesis block") - if err = bc.dao.PutVersion(version); err != nil { + v := dao.Version{Prefix: storage.STStorage, Value: version} + if err = bc.dao.PutVersion(v); err != nil { return err } genesisBlock, err := createGenesisBlock(bc.config) @@ -328,9 +329,11 @@ func (bc *Blockchain) init() error { } return bc.storeBlock(genesisBlock, nil) } - if ver != version { - return fmt.Errorf("storage version mismatch betweeen %s and %s", version, ver) + if ver.Value != version { + return fmt.Errorf("storage version mismatch betweeen %s and %s", version, ver.Value) } + bc.dao.StoragePrefix = ver.Prefix + bc.persistent.StoragePrefix = ver.Prefix // not strictly needed but we better be consistent here // At this point there was no version found in the storage which // implies a creating fresh storage with the version specified diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index e374de6b9..fd2817c7a 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -50,7 +50,7 @@ type DAO interface { GetStorageItems(id int32) ([]state.StorageItemWithKey, error) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.StorageItemWithKey, error) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) - GetVersion() (string, error) + GetVersion() (Version, error) GetWrapped() DAO HasTransaction(hash util.Uint256) error Persist() (int, error) @@ -63,7 +63,7 @@ type DAO interface { PutStateSyncPoint(p uint32) error PutStateSyncCurrentBlockHeight(h uint32) error PutStorageItem(id int32, key []byte, si state.StorageItem) error - PutVersion(v string) error + PutVersion(v Version) error Seek(id int32, prefix []byte, f func(k, v []byte)) SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error @@ -378,11 +378,46 @@ func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) { return block, nil } +// Version represents current dao version. +type Version struct { + Prefix storage.KeyPrefix + Value string +} + +// FromBytes decodes v from a byte-slice. +func (v *Version) FromBytes(data []byte) error { + if len(data) == 0 { + return errors.New("missing version") + } + i := 0 + for ; i < len(data) && data[i] != '\x00'; i++ { + } + + if i == len(data) { + v.Value = string(data) + return nil + } + + v.Value = string(data[:i]) + v.Prefix = storage.KeyPrefix(data[i+1]) + return nil +} + +// Bytes encodes v to a byte-slice. +func (v *Version) Bytes() []byte { + return append([]byte(v.Value), '\x00', byte(v.Prefix)) +} + // GetVersion attempts to get the current version stored in the // underlying store. -func (dao *Simple) GetVersion() (string, error) { - version, err := dao.Store.Get(storage.SYSVersion.Bytes()) - return string(version), err +func (dao *Simple) GetVersion() (Version, error) { + var version Version + + data, err := dao.Store.Get(storage.SYSVersion.Bytes()) + if err == nil { + err = version.FromBytes(data) + } + return version, err } // GetCurrentBlockHeight returns the current block height found in the @@ -485,8 +520,9 @@ func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction, } // PutVersion stores the given version in the underlying store. -func (dao *Simple) PutVersion(v string) error { - return dao.Store.Put(storage.SYSVersion.Bytes(), []byte(v)) +func (dao *Simple) PutVersion(v Version) error { + dao.StoragePrefix = v.Prefix + return dao.Store.Put(storage.SYSVersion.Bytes(), v.Bytes()) } // PutCurrentHeader stores current header. diff --git a/pkg/core/dao/dao_test.go b/pkg/core/dao/dao_test.go index 4ec8cc0d0..7d1c3d9c2 100644 --- a/pkg/core/dao/dao_test.go +++ b/pkg/core/dao/dao_test.go @@ -115,16 +115,26 @@ func TestGetVersion_NoVersion(t *testing.T) { dao := NewSimple(storage.NewMemoryStore(), false, false) version, err := dao.GetVersion() require.Error(t, err) - require.Equal(t, "", version) + require.Equal(t, "", version.Value) } func TestGetVersion(t *testing.T) { dao := NewSimple(storage.NewMemoryStore(), false, false) - err := dao.PutVersion("testVersion") + err := dao.PutVersion(Version{Prefix: 0x42, Value: "testVersion"}) require.NoError(t, err) version, err := dao.GetVersion() require.NoError(t, err) - require.NotNil(t, version) + require.EqualValues(t, 0x42, version.Prefix) + require.Equal(t, "testVersion", version.Value) + + t.Run("old format", func(t *testing.T) { + dao := NewSimple(storage.NewMemoryStore(), false, false) + require.NoError(t, dao.Store.Put(storage.SYSVersion.Bytes(), []byte("0.1.2"))) + + version, err := dao.GetVersion() + require.NoError(t, err) + require.Equal(t, "0.1.2", version.Value) + }) } func TestGetCurrentHeaderHeight_NoHeader(t *testing.T) { From 856e9cf67b2d8f77579393f602a47494bdf92492 Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Wed, 20 Oct 2021 18:20:31 +0300 Subject: [PATCH 3/7] statesync: copy state by swapping prefix Signed-off-by: Evgeniy Stratonikov --- pkg/core/blockchain.go | 44 ++++++++++---------------------------- pkg/core/mpt/billet.go | 3 +++ pkg/core/statesync_test.go | 17 +++++++++++++-- 3 files changed, 29 insertions(+), 35 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index fb15b82e0..61547711b 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -36,7 +36,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" - "github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/vm" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "go.uber.org/zap" @@ -57,11 +56,6 @@ const ( // HeaderVerificationGasLimit is the maximum amount of GAS for block header verification. HeaderVerificationGasLimit = 3_00000000 // 3 GAS defaultStateSyncInterval = 40000 - - // maxStorageBatchSize is the number of elements in storage batch expected to fit into the - // storage without delays and problems. Estimated size of batch in case of given number of - // elements does not exceed 1Mb. - maxStorageBatchSize = 10000 ) // stateJumpStage denotes the stage of state jump process. @@ -504,34 +498,18 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error } fallthrough case oldStorageItemsRemoved: - // Then change STTempStorage prefix to STStorage. Each replace operation is atomic. - for { - count := 0 - b := bc.dao.Store.Batch() - currPrefix := byte(bc.dao.StoragePrefix) - syncPrefix := byte(statesync.TemporaryPrefix(bc.dao.StoragePrefix)) - bc.dao.Store.Seek([]byte{syncPrefix}, func(k, v []byte) { - if count >= maxStorageBatchSize { - return - } - // #1468, but don't need to copy here, because it is done by Store. - b.Delete(k) - key := make([]byte, len(k)) - key[0] = currPrefix - copy(key[1:], k[1:]) - b.Put(key, slice.Copy(v)) - count += 2 - }) - if count > 0 { - err := bc.dao.Store.PutBatch(b) - if err != nil { - return fmt.Errorf("failed to replace outdated contract storage items with the fresh ones: %w", err) - } - } else { - break - } + newPrefix := statesync.TemporaryPrefix(bc.dao.StoragePrefix) + v, err := bc.dao.GetVersion() + if err != nil { + return fmt.Errorf("failed to get dao.Version: %w", err) } - err := bc.dao.Store.Put(jumpStageKey, []byte{byte(newStorageItemsAdded)}) + v.Prefix = newPrefix + if err := bc.dao.PutVersion(v); err != nil { + return fmt.Errorf("failed to update dao.Version: %w", err) + } + bc.persistent.StoragePrefix = newPrefix + + err = bc.dao.Store.Put(jumpStageKey, []byte{byte(newStorageItemsAdded)}) if err != nil { return fmt.Errorf("failed to store state jump stage: %w", err) } diff --git a/pkg/core/mpt/billet.go b/pkg/core/mpt/billet.go index 413b8507e..39c4770fe 100644 --- a/pkg/core/mpt/billet.go +++ b/pkg/core/mpt/billet.go @@ -66,6 +66,9 @@ func (b *Billet) RestoreHashNode(path []byte, node Node) error { // If it's a leaf, then put into temporary contract storage. if leaf, ok := node.(*LeafNode); ok { + if b.TempStoragePrefix == 0 { + panic("invalid storage prefix") + } k := append([]byte{byte(b.TempStoragePrefix)}, fromNibbles(path)...) _ = b.Store.Put(k, leaf.value) } diff --git a/pkg/core/statesync_test.go b/pkg/core/statesync_test.go index 81896fddc..bd493005d 100644 --- a/pkg/core/statesync_test.go +++ b/pkg/core/statesync_test.go @@ -300,7 +300,9 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { c.ProtocolConfiguration.KeepOnlyLatestState = true c.ProtocolConfiguration.RemoveUntraceableBlocks = true } - bcBolt := newTestChainWithCustomCfg(t, boltCfg) + bcBoltStore := memoryStore{storage.NewMemoryStore()} + bcBolt := initTestChain(t, bcBoltStore, boltCfg) + go bcBolt.Run() module := bcBolt.GetStateSyncModule() t.Run("error: add headers before initialisation", func(t *testing.T) { @@ -424,6 +426,9 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { bc.dao.Store.Seek(bc.dao.StoragePrefix.Bytes(), func(k, v []byte) { key := slice.Copy(k) value := slice.Copy(v) + if key[0] == byte(storage.STTempStorage) { + key[0] = byte(storage.STStorage) + } kv = append(kv, storage.KeyValue{ Key: key, Value: value, @@ -436,7 +441,15 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { require.ElementsMatch(t, expected, actual) // no temp items should be left - bcBolt.dao.Store.Seek(storage.STTempStorage.Bytes(), func(k, v []byte) { + bcBolt.dao.Store.Seek(storage.STStorage.Bytes(), func(k, v []byte) { t.Fatal("temp storage items are found") }) + bcBolt.Close() + + // Check restoring with new prefix. + bcBolt = initTestChain(t, bcBoltStore, boltCfg) + go bcBolt.Run() + defer bcBolt.Close() + require.Equal(t, storage.STTempStorage, bcBolt.dao.StoragePrefix) + require.Equal(t, storage.STTempStorage, bcBolt.persistent.StoragePrefix) } From f7e2d3d71734ee7f485972a539e67de6f6db2017 Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Fri, 22 Oct 2021 10:58:53 +0300 Subject: [PATCH 4/7] dao: add stateroot-related settings to `Version` Signed-off-by: Evgeniy Stratonikov --- pkg/core/blockchain.go | 31 +++++++++---- pkg/core/blockchain_test.go | 4 +- pkg/core/dao/dao.go | 72 +++++++++++++++++++------------ pkg/core/dao/dao_test.go | 20 +++++++-- pkg/core/statesync/module.go | 2 +- pkg/core/statesync/module_test.go | 2 +- pkg/core/statesync_test.go | 6 +-- 7 files changed, 91 insertions(+), 46 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 61547711b..1e819d35b 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -305,10 +305,17 @@ func (bc *Blockchain) init() error { ver, err := bc.dao.GetVersion() if err != nil { bc.log.Info("no storage version found! creating genesis block") - v := dao.Version{Prefix: storage.STStorage, Value: version} - if err = bc.dao.PutVersion(v); err != nil { + ver = dao.Version{ + StoragePrefix: storage.STStorage, + StateRootInHeader: bc.config.StateRootInHeader, + P2PSigExtensions: bc.config.P2PSigExtensions, + Value: version, + } + if err = bc.dao.PutVersion(ver); err != nil { return err } + bc.dao.Version = ver + bc.persistent.Version = ver genesisBlock, err := createGenesisBlock(bc.config) if err != nil { return err @@ -326,8 +333,16 @@ func (bc *Blockchain) init() error { if ver.Value != version { return fmt.Errorf("storage version mismatch betweeen %s and %s", version, ver.Value) } - bc.dao.StoragePrefix = ver.Prefix - bc.persistent.StoragePrefix = ver.Prefix // not strictly needed but we better be consistent here + if ver.StateRootInHeader != bc.config.StateRootInHeader { + return fmt.Errorf("StateRootInHeader setting mismatch (config=%t, db=%t)", + ver.StateRootInHeader, bc.config.StateRootInHeader) + } + if ver.P2PSigExtensions != bc.config.P2PSigExtensions { + return fmt.Errorf("P2PSigExtensions setting mismatch (old=%t, new=%t", + ver.P2PSigExtensions, bc.config.P2PSigExtensions) + } + bc.dao.Version = ver + bc.persistent.Version = ver // At this point there was no version found in the storage which // implies a creating fresh storage with the version specified @@ -487,7 +502,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error // Replace old storage items by new ones, it should be done step-by step. // Firstly, remove all old genesis-related items. b := bc.dao.Store.Batch() - bc.dao.Store.Seek([]byte{byte(bc.dao.StoragePrefix)}, func(k, _ []byte) { + bc.dao.Store.Seek([]byte{byte(bc.dao.Version.StoragePrefix)}, func(k, _ []byte) { // #1468, but don't need to copy here, because it is done by Store. b.Delete(k) }) @@ -498,16 +513,16 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error } fallthrough case oldStorageItemsRemoved: - newPrefix := statesync.TemporaryPrefix(bc.dao.StoragePrefix) + newPrefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix) v, err := bc.dao.GetVersion() if err != nil { return fmt.Errorf("failed to get dao.Version: %w", err) } - v.Prefix = newPrefix + v.StoragePrefix = newPrefix if err := bc.dao.PutVersion(v); err != nil { return fmt.Errorf("failed to update dao.Version: %w", err) } - bc.persistent.StoragePrefix = newPrefix + bc.persistent.Version = v err = bc.dao.Store.Put(jumpStageKey, []byte{byte(newStorageItemsAdded)}) if err != nil { diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index c930b46b3..5c2eff93b 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -1766,10 +1766,10 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { // put storage items with STTemp prefix batch := bcSpout.dao.Store.Batch() tempPrefix := storage.STTempStorage - if bcSpout.dao.StoragePrefix == tempPrefix { + if bcSpout.dao.Version.StoragePrefix == tempPrefix { tempPrefix = storage.STStorage } - bcSpout.dao.Store.Seek(bcSpout.dao.StoragePrefix.Bytes(), func(k, v []byte) { + bcSpout.dao.Store.Seek(bcSpout.dao.Version.StoragePrefix.Bytes(), func(k, v []byte) { key := slice.Copy(k) key[0] = byte(tempPrefix) value := slice.Copy(v) diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index fd2817c7a..a82519fb5 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -74,22 +74,20 @@ type DAO interface { // Simple is memCached wrapper around DB, simple DAO implementation. type Simple struct { - StoragePrefix storage.KeyPrefix - Store *storage.MemCachedStore - // stateRootInHeader specifies if block header contains state root. - stateRootInHeader bool - // p2pSigExtensions denotes whether P2PSignatureExtensions are enabled. - p2pSigExtensions bool + Version Version + Store *storage.MemCachedStore } // NewSimple creates new simple dao using provided backend store. func NewSimple(backend storage.Store, stateRootInHeader bool, p2pSigExtensions bool) *Simple { st := storage.NewMemCachedStore(backend) return &Simple{ - StoragePrefix: storage.STStorage, - Store: st, - stateRootInHeader: stateRootInHeader, - p2pSigExtensions: p2pSigExtensions, + Version: Version{ + StoragePrefix: storage.STStorage, + StateRootInHeader: stateRootInHeader, + P2PSigExtensions: p2pSigExtensions, + }, + Store: st, } } @@ -101,8 +99,8 @@ func (dao *Simple) GetBatch() *storage.MemBatch { // GetWrapped returns new DAO instance with another layer of wrapped // MemCachedStore around the current DAO Store. func (dao *Simple) GetWrapped() DAO { - d := NewSimple(dao.Store, dao.stateRootInHeader, dao.p2pSigExtensions) - d.StoragePrefix = dao.StoragePrefix + d := NewSimple(dao.Store, dao.Version.StateRootInHeader, dao.Version.P2PSigExtensions) + d.Version = dao.Version return d } @@ -284,7 +282,7 @@ func (dao *Simple) PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWrit // GetStorageItem returns StorageItem if it exists in the given store. func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem { - b, err := dao.Store.Get(makeStorageItemKey(dao.StoragePrefix, id, key)) + b, err := dao.Store.Get(makeStorageItemKey(dao.Version.StoragePrefix, id, key)) if err != nil { return nil } @@ -294,14 +292,14 @@ func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem { // PutStorageItem puts given StorageItem for given id with given // key into the given store. func (dao *Simple) PutStorageItem(id int32, key []byte, si state.StorageItem) error { - stKey := makeStorageItemKey(dao.StoragePrefix, id, key) + stKey := makeStorageItemKey(dao.Version.StoragePrefix, id, key) return dao.Store.Put(stKey, si) } // DeleteStorageItem drops storage item for the given id with the // given key from the store. func (dao *Simple) DeleteStorageItem(id int32, key []byte) error { - stKey := makeStorageItemKey(dao.StoragePrefix, id, key) + stKey := makeStorageItemKey(dao.Version.StoragePrefix, id, key) return dao.Store.Delete(stKey) } @@ -330,7 +328,7 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S // Seek executes f for all items with a given prefix. // If key is to be used outside of f, they may not be copied. func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { - lookupKey := makeStorageItemKey(dao.StoragePrefix, id, nil) + lookupKey := makeStorageItemKey(dao.Version.StoragePrefix, id, nil) if prefix != nil { lookupKey = append(lookupKey, prefix...) } @@ -342,7 +340,7 @@ func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { // SeekAsync sends all storage items matching given prefix to a channel and returns // the channel. Resulting keys and values may not be copied. func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue { - lookupKey := makeStorageItemKey(dao.StoragePrefix, id, nil) + lookupKey := makeStorageItemKey(dao.Version.StoragePrefix, id, nil) if prefix != nil { lookupKey = append(lookupKey, prefix...) } @@ -371,7 +369,7 @@ func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) { return nil, err } - block, err := block.NewBlockFromTrimmedBytes(dao.stateRootInHeader, b) + block, err := block.NewBlockFromTrimmedBytes(dao.Version.StateRootInHeader, b) if err != nil { return nil, err } @@ -380,10 +378,17 @@ func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) { // Version represents current dao version. type Version struct { - Prefix storage.KeyPrefix - Value string + StoragePrefix storage.KeyPrefix + StateRootInHeader bool + P2PSigExtensions bool + Value string } +const ( + stateRootInHeaderBit = 1 << iota + p2pSigExtensionsBit +) + // FromBytes decodes v from a byte-slice. func (v *Version) FromBytes(data []byte) error { if len(data) == 0 { @@ -398,14 +403,27 @@ func (v *Version) FromBytes(data []byte) error { return nil } + if len(data) != i+3 { + return errors.New("version is invalid") + } + v.Value = string(data[:i]) - v.Prefix = storage.KeyPrefix(data[i+1]) + v.StoragePrefix = storage.KeyPrefix(data[i+1]) + v.StateRootInHeader = data[i+2]&stateRootInHeaderBit != 0 + v.P2PSigExtensions = data[i+2]&p2pSigExtensionsBit != 0 return nil } // Bytes encodes v to a byte-slice. func (v *Version) Bytes() []byte { - return append([]byte(v.Value), '\x00', byte(v.Prefix)) + var mask byte + if v.StateRootInHeader { + mask |= stateRootInHeaderBit + } + if v.P2PSigExtensions { + mask |= p2pSigExtensionsBit + } + return append([]byte(v.Value), '\x00', byte(v.StoragePrefix), mask) } // GetVersion attempts to get the current version stored in the @@ -521,7 +539,7 @@ func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction, // PutVersion stores the given version in the underlying store. func (dao *Simple) PutVersion(v Version) error { - dao.StoragePrefix = v.Prefix + dao.Version = v return dao.Store.Put(storage.SYSVersion.Bytes(), v.Bytes()) } @@ -607,7 +625,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error { return err } - b, err := block.NewBlockFromTrimmedBytes(dao.stateRootInHeader, bs) + b, err := block.NewBlockFromTrimmedBytes(dao.Version.StateRootInHeader, bs) if err != nil { return err } @@ -626,7 +644,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error { for _, tx := range b.Transactions { copy(key[1:], tx.Hash().BytesBE()) batch.Delete(key) - if dao.p2pSigExtensions { + if dao.Version.P2PSigExtensions { for _, attr := range tx.GetAttributes(transaction.ConflictsT) { hash := attr.Value.(*transaction.Conflicts).Hash copy(key[1:], hash.BytesBE()) @@ -674,7 +692,7 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, if err != nil { return err } - if dao.p2pSigExtensions { + if dao.Version.P2PSigExtensions { var value []byte for _, attr := range tx.GetAttributes(transaction.ConflictsT) { hash := attr.Value.(*transaction.Conflicts).Hash @@ -710,7 +728,7 @@ func (dao *Simple) PersistSync() (int, error) { // GetMPTBatch storage changes to be applied to MPT. func (dao *Simple) GetMPTBatch() mpt.Batch { var b mpt.Batch - dao.Store.MemoryStore.SeekAll([]byte{byte(dao.StoragePrefix)}, func(k, v []byte) { + dao.Store.MemoryStore.SeekAll([]byte{byte(dao.Version.StoragePrefix)}, func(k, v []byte) { b.Add(k[1:], v) }) return b diff --git a/pkg/core/dao/dao_test.go b/pkg/core/dao/dao_test.go index 7d1c3d9c2..29c6de4b8 100644 --- a/pkg/core/dao/dao_test.go +++ b/pkg/core/dao/dao_test.go @@ -120,13 +120,25 @@ func TestGetVersion_NoVersion(t *testing.T) { func TestGetVersion(t *testing.T) { dao := NewSimple(storage.NewMemoryStore(), false, false) - err := dao.PutVersion(Version{Prefix: 0x42, Value: "testVersion"}) + expected := Version{ + StoragePrefix: 0x42, + P2PSigExtensions: true, + StateRootInHeader: true, + Value: "testVersion", + } + err := dao.PutVersion(expected) require.NoError(t, err) - version, err := dao.GetVersion() + actual, err := dao.GetVersion() require.NoError(t, err) - require.EqualValues(t, 0x42, version.Prefix) - require.Equal(t, "testVersion", version.Value) + require.Equal(t, expected, actual) + t.Run("invalid", func(t *testing.T) { + dao := NewSimple(storage.NewMemoryStore(), false, false) + require.NoError(t, dao.Store.Put(storage.SYSVersion.Bytes(), []byte("0.1.2\x00x"))) + + _, err := dao.GetVersion() + require.Error(t, err) + }) t.Run("old format", func(t *testing.T) { dao := NewSimple(storage.NewMemoryStore(), false, false) require.NoError(t, dao.Store.Put(storage.SYSVersion.Bytes(), []byte("0.1.2"))) diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index a852f7f4d..9bdd1a775 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -208,7 +208,7 @@ func (s *Module) defineSyncStage() error { return fmt.Errorf("failed to get header to initialize MPT billet: %w", err) } s.billet = mpt.NewBillet(header.PrevStateRoot, s.bc.GetConfig().KeepOnlyLatestState, - TemporaryPrefix(s.dao.StoragePrefix), s.dao.Store) + TemporaryPrefix(s.dao.Version.StoragePrefix), s.dao.Store) s.log.Info("MPT billet initialized", zap.Uint32("height", s.syncPoint), zap.String("state root", header.PrevStateRoot.StringBE())) diff --git a/pkg/core/statesync/module_test.go b/pkg/core/statesync/module_test.go index 1af841129..538a273eb 100644 --- a/pkg/core/statesync/module_test.go +++ b/pkg/core/statesync/module_test.go @@ -57,7 +57,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) { mptpool: NewPool(), } stateSync.billet = mpt.NewBillet(sr, true, - TemporaryPrefix(stateSync.dao.StoragePrefix), actualStorage) + TemporaryPrefix(stateSync.dao.Version.StoragePrefix), actualStorage) stateSync.mptpool.Add(sr, []byte{}) // The test itself: we'll ask state sync module to restore each node exactly once. diff --git a/pkg/core/statesync_test.go b/pkg/core/statesync_test.go index bd493005d..7f7803d89 100644 --- a/pkg/core/statesync_test.go +++ b/pkg/core/statesync_test.go @@ -423,7 +423,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { // compare storage states fetchStorage := func(bc *Blockchain) []storage.KeyValue { var kv []storage.KeyValue - bc.dao.Store.Seek(bc.dao.StoragePrefix.Bytes(), func(k, v []byte) { + bc.dao.Store.Seek(bc.dao.Version.StoragePrefix.Bytes(), func(k, v []byte) { key := slice.Copy(k) value := slice.Copy(v) if key[0] == byte(storage.STTempStorage) { @@ -450,6 +450,6 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { bcBolt = initTestChain(t, bcBoltStore, boltCfg) go bcBolt.Run() defer bcBolt.Close() - require.Equal(t, storage.STTempStorage, bcBolt.dao.StoragePrefix) - require.Equal(t, storage.STTempStorage, bcBolt.persistent.StoragePrefix) + require.Equal(t, storage.STTempStorage, bcBolt.dao.Version.StoragePrefix) + require.Equal(t, storage.STTempStorage, bcBolt.persistent.Version.StoragePrefix) } From f1767f361db9eee97e8331fdc47d7e772f48d56d Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Fri, 22 Oct 2021 11:09:47 +0300 Subject: [PATCH 5/7] dao: add `KeepOnlyLatestState` setting to `dao.Version` Signed-off-by: Evgeniy Stratonikov --- pkg/core/blockchain.go | 13 +++++++++---- pkg/core/blockchain_test.go | 2 +- pkg/core/dao/dao.go | 14 ++++++++++---- pkg/core/stateroot/module.go | 26 ++------------------------ pkg/core/stateroot/store.go | 1 - 5 files changed, 22 insertions(+), 34 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 1e819d35b..2b4f6d556 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -306,10 +306,11 @@ func (bc *Blockchain) init() error { if err != nil { bc.log.Info("no storage version found! creating genesis block") ver = dao.Version{ - StoragePrefix: storage.STStorage, - StateRootInHeader: bc.config.StateRootInHeader, - P2PSigExtensions: bc.config.P2PSigExtensions, - Value: version, + StoragePrefix: storage.STStorage, + StateRootInHeader: bc.config.StateRootInHeader, + P2PSigExtensions: bc.config.P2PSigExtensions, + KeepOnlyLatestState: bc.config.KeepOnlyLatestState, + Value: version, } if err = bc.dao.PutVersion(ver); err != nil { return err @@ -341,6 +342,10 @@ func (bc *Blockchain) init() error { return fmt.Errorf("P2PSigExtensions setting mismatch (old=%t, new=%t", ver.P2PSigExtensions, bc.config.P2PSigExtensions) } + if ver.KeepOnlyLatestState != bc.config.KeepOnlyLatestState { + return fmt.Errorf("KeepOnlyLatestState setting mismatch: old=%v, new=%v", + ver.KeepOnlyLatestState, bc.config.KeepOnlyLatestState) + } bc.dao.Version = ver bc.persistent.Version = ver diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 5c2eff93b..a1c20be2a 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -1823,7 +1823,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { binary.LittleEndian.PutUint32(point, uint32(stateSyncPoint)) require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), point)) shouldFail := stage == 0x03 // unknown stage - checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, shouldFail) + checkNewBlockchainErr(t, spountCfg, bcSpout.dao.Store, shouldFail) }) } } diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index a82519fb5..5387e4752 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -378,15 +378,17 @@ func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) { // Version represents current dao version. type Version struct { - StoragePrefix storage.KeyPrefix - StateRootInHeader bool - P2PSigExtensions bool - Value string + StoragePrefix storage.KeyPrefix + StateRootInHeader bool + P2PSigExtensions bool + KeepOnlyLatestState bool + Value string } const ( stateRootInHeaderBit = 1 << iota p2pSigExtensionsBit + keepOnlyLatestStateBit ) // FromBytes decodes v from a byte-slice. @@ -411,6 +413,7 @@ func (v *Version) FromBytes(data []byte) error { v.StoragePrefix = storage.KeyPrefix(data[i+1]) v.StateRootInHeader = data[i+2]&stateRootInHeaderBit != 0 v.P2PSigExtensions = data[i+2]&p2pSigExtensionsBit != 0 + v.KeepOnlyLatestState = data[i+2]&keepOnlyLatestStateBit != 0 return nil } @@ -423,6 +426,9 @@ func (v *Version) Bytes() []byte { if v.P2PSigExtensions { mask |= p2pSigExtensionsBit } + if v.KeepOnlyLatestState { + mask |= keepOnlyLatestStateBit + } return append([]byte(v.Value), '\x00', byte(v.StoragePrefix), mask) } diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index e5e7a5439..322e23c8e 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -103,22 +103,10 @@ func (s *Module) Init(height uint32, enableRefCount bool) error { s.validatedHeight.Store(binary.LittleEndian.Uint32(data)) } - var gcKey = []byte{byte(storage.DataMPT), prefixGC} if height == 0 { s.mpt = mpt.NewTrie(nil, enableRefCount, s.Store) - var val byte - if enableRefCount { - val = 1 - } s.currentLocal.Store(util.Uint256{}) - return s.Store.Put(gcKey, []byte{val}) - } - var hasRefCount bool - if v, err := s.Store.Get(gcKey); err == nil { - hasRefCount = v[0] != 0 - } - if hasRefCount != enableRefCount { - return fmt.Errorf("KeepOnlyLatestState setting mismatch: old=%v, new=%v", hasRefCount, enableRefCount) + return nil } r, err := s.getStateRoot(makeStateRootKey(height)) if err != nil { @@ -138,25 +126,15 @@ func (s *Module) CleanStorage() error { if s.localHeight.Load() != 0 { return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load()) } - gcKey := []byte{byte(storage.DataMPT), prefixGC} - gcVal, err := s.Store.Get(gcKey) - if err != nil { - return fmt.Errorf("failed to get GC flag: %w", err) - } - // b := s.Store.Batch() s.Store.Seek([]byte{byte(storage.DataMPT)}, func(k, _ []byte) { // #1468, but don't need to copy here, because it is done by Store. b.Delete(k) }) - err = s.Store.PutBatch(b) + err := s.Store.PutBatch(b) if err != nil { return fmt.Errorf("failed to remove outdated MPT-reated items: %w", err) } - err = s.Store.Put(gcKey, gcVal) - if err != nil { - return fmt.Errorf("failed to store GC flag: %w", err) - } currentLocal := s.currentLocal.Load().(util.Uint256) if !currentLocal.Equals(util.Uint256{}) { err := s.addLocalStateRoot(s.Store, &state.MPTRoot{ diff --git a/pkg/core/stateroot/store.go b/pkg/core/stateroot/store.go index 9efcf7c5f..ee5a52f78 100644 --- a/pkg/core/stateroot/store.go +++ b/pkg/core/stateroot/store.go @@ -17,7 +17,6 @@ var ( ) const ( - prefixGC = 0x01 prefixLocal = 0x02 prefixValidated = 0x03 ) From 582d489c902c0eba5b0d6f8f0effe744940e7b98 Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Wed, 3 Nov 2021 12:55:33 +0300 Subject: [PATCH 6/7] dao: add `P2PStateExchangeExtensions` setting to `dao.Version` Signed-off-by: Evgeniy Stratonikov --- pkg/core/blockchain.go | 15 ++++++++++----- pkg/core/dao/dao.go | 16 +++++++++++----- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 2b4f6d556..a08375b81 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -306,11 +306,12 @@ func (bc *Blockchain) init() error { if err != nil { bc.log.Info("no storage version found! creating genesis block") ver = dao.Version{ - StoragePrefix: storage.STStorage, - StateRootInHeader: bc.config.StateRootInHeader, - P2PSigExtensions: bc.config.P2PSigExtensions, - KeepOnlyLatestState: bc.config.KeepOnlyLatestState, - Value: version, + StoragePrefix: storage.STStorage, + StateRootInHeader: bc.config.StateRootInHeader, + P2PSigExtensions: bc.config.P2PSigExtensions, + P2PStateExchangeExtensions: bc.config.P2PStateExchangeExtensions, + KeepOnlyLatestState: bc.config.KeepOnlyLatestState, + Value: version, } if err = bc.dao.PutVersion(ver); err != nil { return err @@ -342,6 +343,10 @@ func (bc *Blockchain) init() error { return fmt.Errorf("P2PSigExtensions setting mismatch (old=%t, new=%t", ver.P2PSigExtensions, bc.config.P2PSigExtensions) } + if ver.P2PStateExchangeExtensions != bc.config.P2PStateExchangeExtensions { + return fmt.Errorf("P2PStateExchangeExtensions setting mismatch (old=%t, new=%t", + ver.P2PStateExchangeExtensions, bc.config.P2PStateExchangeExtensions) + } if ver.KeepOnlyLatestState != bc.config.KeepOnlyLatestState { return fmt.Errorf("KeepOnlyLatestState setting mismatch: old=%v, new=%v", ver.KeepOnlyLatestState, bc.config.KeepOnlyLatestState) diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 5387e4752..e0e9bad9b 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -378,16 +378,18 @@ func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) { // Version represents current dao version. type Version struct { - StoragePrefix storage.KeyPrefix - StateRootInHeader bool - P2PSigExtensions bool - KeepOnlyLatestState bool - Value string + StoragePrefix storage.KeyPrefix + StateRootInHeader bool + P2PSigExtensions bool + P2PStateExchangeExtensions bool + KeepOnlyLatestState bool + Value string } const ( stateRootInHeaderBit = 1 << iota p2pSigExtensionsBit + p2pStateExchangeExtensionsBit keepOnlyLatestStateBit ) @@ -413,6 +415,7 @@ func (v *Version) FromBytes(data []byte) error { v.StoragePrefix = storage.KeyPrefix(data[i+1]) v.StateRootInHeader = data[i+2]&stateRootInHeaderBit != 0 v.P2PSigExtensions = data[i+2]&p2pSigExtensionsBit != 0 + v.P2PStateExchangeExtensions = data[i+2]&p2pStateExchangeExtensionsBit != 0 v.KeepOnlyLatestState = data[i+2]&keepOnlyLatestStateBit != 0 return nil } @@ -426,6 +429,9 @@ func (v *Version) Bytes() []byte { if v.P2PSigExtensions { mask |= p2pSigExtensionsBit } + if v.P2PStateExchangeExtensions { + mask |= p2pStateExchangeExtensionsBit + } if v.KeepOnlyLatestState { mask |= keepOnlyLatestStateBit } From fac595bbdfc8a0c97b05efa2d6c2cb06446cfd79 Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Tue, 9 Nov 2021 15:13:53 +0300 Subject: [PATCH 7/7] core: remove old storage items asynchronously Signed-off-by: Evgeniy Stratonikov --- pkg/core/blockchain.go | 48 ++++++++++++++++++++++++------------- pkg/core/blockchain_test.go | 2 +- pkg/core/statesync_test.go | 11 ++++++--- pkg/core/storage/store.go | 1 + 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index a08375b81..041888e3a 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -67,9 +67,6 @@ const ( // stateJumpStarted means that state jump was just initiated, but outdated storage items // were not yet removed. stateJumpStarted - // oldStorageItemsRemoved means that outdated contract storage items were removed, but - // new storage items were not yet saved. - oldStorageItemsRemoved // newStorageItemsAdded means that contract storage items are up-to-date with the current // state. newStorageItemsAdded @@ -354,6 +351,9 @@ func (bc *Blockchain) init() error { bc.dao.Version = ver bc.persistent.Version = ver + // Always try to remove garbage. If there is nothing to do, it will exit quickly. + go bc.removeOldStorageItems() + // At this point there was no version found in the storage which // implies a creating fresh storage with the version specified // and the genesis block as first block. @@ -478,6 +478,26 @@ func (bc *Blockchain) init() error { return bc.updateExtensibleWhitelist(bHeight) } +func (bc *Blockchain) removeOldStorageItems() { + _, err := bc.dao.Store.Get(storage.SYSCleanStorage.Bytes()) + if err != nil { + return + } + + b := bc.dao.Store.Batch() + prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix) + bc.dao.Store.Seek([]byte{byte(prefix)}, func(k, _ []byte) { + // #1468, but don't need to copy here, because it is done by Store. + b.Delete(k) + }) + b.Delete(storage.SYSCleanStorage.Bytes()) + + err = bc.dao.Store.PutBatch(b) + if err != nil { + bc.log.Error("failed to remove old storage items", zap.Error(err)) + } +} + // jumpToState is an atomic operation that changes Blockchain state to the one // specified by the state sync point p. All the data needed for the jump must be // collected by the state sync module. @@ -509,20 +529,6 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error } fallthrough case stateJumpStarted: - // Replace old storage items by new ones, it should be done step-by step. - // Firstly, remove all old genesis-related items. - b := bc.dao.Store.Batch() - bc.dao.Store.Seek([]byte{byte(bc.dao.Version.StoragePrefix)}, func(k, _ []byte) { - // #1468, but don't need to copy here, because it is done by Store. - b.Delete(k) - }) - b.Put(jumpStageKey, []byte{byte(oldStorageItemsRemoved)}) - err := bc.dao.Store.PutBatch(b) - if err != nil { - return fmt.Errorf("failed to store state jump stage: %w", err) - } - fallthrough - case oldStorageItemsRemoved: newPrefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix) v, err := bc.dao.GetVersion() if err != nil { @@ -538,6 +544,14 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error if err != nil { return fmt.Errorf("failed to store state jump stage: %w", err) } + + err = bc.dao.Store.Put(storage.SYSCleanStorage.Bytes(), []byte{}) + if err != nil { + return fmt.Errorf("failed to store clean storage flag: %w", err) + } + + go bc.removeOldStorageItems() + fallthrough case newStorageItemsAdded: // After current state is updated, we need to remove outdated state-related data if so. diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index a1c20be2a..2c69760e5 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -1816,7 +1816,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), point)) checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, true) }) - for _, stage := range []stateJumpStage{stateJumpStarted, oldStorageItemsRemoved, newStorageItemsAdded, genesisStateRemoved, 0x03} { + for _, stage := range []stateJumpStage{stateJumpStarted, newStorageItemsAdded, genesisStateRemoved, 0x03} { t.Run(fmt.Sprintf("state jump stage %d", stage), func(t *testing.T) { require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateJumpStage.Bytes(), []byte{byte(stage)})) point := make([]byte, 4) diff --git a/pkg/core/statesync_test.go b/pkg/core/statesync_test.go index 7f7803d89..3b802b757 100644 --- a/pkg/core/statesync_test.go +++ b/pkg/core/statesync_test.go @@ -2,6 +2,7 @@ package core import ( "testing" + "time" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" @@ -441,9 +442,13 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { require.ElementsMatch(t, expected, actual) // no temp items should be left - bcBolt.dao.Store.Seek(storage.STStorage.Bytes(), func(k, v []byte) { - t.Fatal("temp storage items are found") - }) + require.Eventually(t, func() bool { + var haveItems bool + bcBolt.dao.Store.Seek(storage.STStorage.Bytes(), func(_, _ []byte) { + haveItems = true + }) + return !haveItems + }, time.Second*5, time.Millisecond*100) bcBolt.Close() // Check restoring with new prefix. diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index ff9abd3c3..20c9be321 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -29,6 +29,7 @@ const ( SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2 SYSStateSyncPoint KeyPrefix = 0xc3 SYSStateJumpStage KeyPrefix = 0xc4 + SYSCleanStorage KeyPrefix = 0xc5 SYSVersion KeyPrefix = 0xf0 )