statesync: copy state by swapping prefix

Signed-off-by: Evgeniy Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgeniy Stratonikov 2021-10-20 18:20:31 +03:00
parent 6c5a7d9b29
commit 856e9cf67b
3 changed files with 29 additions and 35 deletions

View file

@ -36,7 +36,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest" "github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"github.com/nspcc-dev/neo-go/pkg/vm" "github.com/nspcc-dev/neo-go/pkg/vm"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"go.uber.org/zap" "go.uber.org/zap"
@ -57,11 +56,6 @@ const (
// HeaderVerificationGasLimit is the maximum amount of GAS for block header verification. // HeaderVerificationGasLimit is the maximum amount of GAS for block header verification.
HeaderVerificationGasLimit = 3_00000000 // 3 GAS HeaderVerificationGasLimit = 3_00000000 // 3 GAS
defaultStateSyncInterval = 40000 defaultStateSyncInterval = 40000
// maxStorageBatchSize is the number of elements in storage batch expected to fit into the
// storage without delays and problems. Estimated size of batch in case of given number of
// elements does not exceed 1Mb.
maxStorageBatchSize = 10000
) )
// stateJumpStage denotes the stage of state jump process. // stateJumpStage denotes the stage of state jump process.
@ -504,34 +498,18 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
} }
fallthrough fallthrough
case oldStorageItemsRemoved: case oldStorageItemsRemoved:
// Then change STTempStorage prefix to STStorage. Each replace operation is atomic. newPrefix := statesync.TemporaryPrefix(bc.dao.StoragePrefix)
for { v, err := bc.dao.GetVersion()
count := 0
b := bc.dao.Store.Batch()
currPrefix := byte(bc.dao.StoragePrefix)
syncPrefix := byte(statesync.TemporaryPrefix(bc.dao.StoragePrefix))
bc.dao.Store.Seek([]byte{syncPrefix}, func(k, v []byte) {
if count >= maxStorageBatchSize {
return
}
// #1468, but don't need to copy here, because it is done by Store.
b.Delete(k)
key := make([]byte, len(k))
key[0] = currPrefix
copy(key[1:], k[1:])
b.Put(key, slice.Copy(v))
count += 2
})
if count > 0 {
err := bc.dao.Store.PutBatch(b)
if err != nil { if err != nil {
return fmt.Errorf("failed to replace outdated contract storage items with the fresh ones: %w", err) return fmt.Errorf("failed to get dao.Version: %w", err)
} }
} else { v.Prefix = newPrefix
break if err := bc.dao.PutVersion(v); err != nil {
return fmt.Errorf("failed to update dao.Version: %w", err)
} }
} bc.persistent.StoragePrefix = newPrefix
err := bc.dao.Store.Put(jumpStageKey, []byte{byte(newStorageItemsAdded)})
err = bc.dao.Store.Put(jumpStageKey, []byte{byte(newStorageItemsAdded)})
if err != nil { if err != nil {
return fmt.Errorf("failed to store state jump stage: %w", err) return fmt.Errorf("failed to store state jump stage: %w", err)
} }

View file

@ -66,6 +66,9 @@ func (b *Billet) RestoreHashNode(path []byte, node Node) error {
// If it's a leaf, then put into temporary contract storage. // If it's a leaf, then put into temporary contract storage.
if leaf, ok := node.(*LeafNode); ok { if leaf, ok := node.(*LeafNode); ok {
if b.TempStoragePrefix == 0 {
panic("invalid storage prefix")
}
k := append([]byte{byte(b.TempStoragePrefix)}, fromNibbles(path)...) k := append([]byte{byte(b.TempStoragePrefix)}, fromNibbles(path)...)
_ = b.Store.Put(k, leaf.value) _ = b.Store.Put(k, leaf.value)
} }

View file

@ -300,7 +300,9 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
c.ProtocolConfiguration.KeepOnlyLatestState = true c.ProtocolConfiguration.KeepOnlyLatestState = true
c.ProtocolConfiguration.RemoveUntraceableBlocks = true c.ProtocolConfiguration.RemoveUntraceableBlocks = true
} }
bcBolt := newTestChainWithCustomCfg(t, boltCfg) bcBoltStore := memoryStore{storage.NewMemoryStore()}
bcBolt := initTestChain(t, bcBoltStore, boltCfg)
go bcBolt.Run()
module := bcBolt.GetStateSyncModule() module := bcBolt.GetStateSyncModule()
t.Run("error: add headers before initialisation", func(t *testing.T) { t.Run("error: add headers before initialisation", func(t *testing.T) {
@ -424,6 +426,9 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
bc.dao.Store.Seek(bc.dao.StoragePrefix.Bytes(), func(k, v []byte) { bc.dao.Store.Seek(bc.dao.StoragePrefix.Bytes(), func(k, v []byte) {
key := slice.Copy(k) key := slice.Copy(k)
value := slice.Copy(v) value := slice.Copy(v)
if key[0] == byte(storage.STTempStorage) {
key[0] = byte(storage.STStorage)
}
kv = append(kv, storage.KeyValue{ kv = append(kv, storage.KeyValue{
Key: key, Key: key,
Value: value, Value: value,
@ -436,7 +441,15 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
require.ElementsMatch(t, expected, actual) require.ElementsMatch(t, expected, actual)
// no temp items should be left // no temp items should be left
bcBolt.dao.Store.Seek(storage.STTempStorage.Bytes(), func(k, v []byte) { bcBolt.dao.Store.Seek(storage.STStorage.Bytes(), func(k, v []byte) {
t.Fatal("temp storage items are found") t.Fatal("temp storage items are found")
}) })
bcBolt.Close()
// Check restoring with new prefix.
bcBolt = initTestChain(t, bcBoltStore, boltCfg)
go bcBolt.Run()
defer bcBolt.Close()
require.Equal(t, storage.STTempStorage, bcBolt.dao.StoragePrefix)
require.Equal(t, storage.STTempStorage, bcBolt.persistent.StoragePrefix)
} }