Merge pull request #2201 from nspcc-dev/mpt-state-switch

Faster state switch
This commit is contained in:
Roman Khimov 2021-11-25 09:51:11 +03:00 committed by GitHub
commit 550372a790
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 280 additions and 137 deletions

View file

@ -32,7 +32,7 @@ func batchToMap(index uint32, batch *storage.MemBatch) blockDump {
ops := make([]storageOp, 0, size) ops := make([]storageOp, 0, size)
for i := range batch.Put { for i := range batch.Put {
key := batch.Put[i].Key key := batch.Put[i].Key
if len(key) == 0 || key[0] != byte(storage.STStorage) { if len(key) == 0 || key[0] != byte(storage.STStorage) && key[0] != byte(storage.STTempStorage) {
continue continue
} }
@ -50,7 +50,8 @@ func batchToMap(index uint32, batch *storage.MemBatch) blockDump {
for i := range batch.Deleted { for i := range batch.Deleted {
key := batch.Deleted[i].Key key := batch.Deleted[i].Key
if len(key) == 0 || key[0] != byte(storage.STStorage) || !batch.Deleted[i].Exists { if len(key) == 0 || !batch.Deleted[i].Exists ||
key[0] != byte(storage.STStorage) && key[0] != byte(storage.STTempStorage) {
continue continue
} }

View file

@ -36,7 +36,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest" "github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"github.com/nspcc-dev/neo-go/pkg/vm" "github.com/nspcc-dev/neo-go/pkg/vm"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"go.uber.org/zap" "go.uber.org/zap"
@ -45,7 +44,7 @@ import (
// Tuning parameters. // Tuning parameters.
const ( const (
headerBatchCount = 2000 headerBatchCount = 2000
version = "0.1.5" version = "0.2.0"
defaultInitialGAS = 52000000_00000000 defaultInitialGAS = 52000000_00000000
defaultMemPoolSize = 50000 defaultMemPoolSize = 50000
@ -57,11 +56,6 @@ const (
// HeaderVerificationGasLimit is the maximum amount of GAS for block header verification. // HeaderVerificationGasLimit is the maximum amount of GAS for block header verification.
HeaderVerificationGasLimit = 3_00000000 // 3 GAS HeaderVerificationGasLimit = 3_00000000 // 3 GAS
defaultStateSyncInterval = 40000 defaultStateSyncInterval = 40000
// maxStorageBatchSize is the number of elements in storage batch expected to fit into the
// storage without delays and problems. Estimated size of batch in case of given number of
// elements does not exceed 1Mb.
maxStorageBatchSize = 10000
) )
// stateJumpStage denotes the stage of state jump process. // stateJumpStage denotes the stage of state jump process.
@ -73,9 +67,6 @@ const (
// stateJumpStarted means that state jump was just initiated, but outdated storage items // stateJumpStarted means that state jump was just initiated, but outdated storage items
// were not yet removed. // were not yet removed.
stateJumpStarted stateJumpStarted
// oldStorageItemsRemoved means that outdated contract storage items were removed, but
// new storage items were not yet saved.
oldStorageItemsRemoved
// newStorageItemsAdded means that contract storage items are up-to-date with the current // newStorageItemsAdded means that contract storage items are up-to-date with the current
// state. // state.
newStorageItemsAdded newStorageItemsAdded
@ -311,9 +302,19 @@ func (bc *Blockchain) init() error {
ver, err := bc.dao.GetVersion() ver, err := bc.dao.GetVersion()
if err != nil { if err != nil {
bc.log.Info("no storage version found! creating genesis block") bc.log.Info("no storage version found! creating genesis block")
if err = bc.dao.PutVersion(version); err != nil { ver = dao.Version{
StoragePrefix: storage.STStorage,
StateRootInHeader: bc.config.StateRootInHeader,
P2PSigExtensions: bc.config.P2PSigExtensions,
P2PStateExchangeExtensions: bc.config.P2PStateExchangeExtensions,
KeepOnlyLatestState: bc.config.KeepOnlyLatestState,
Value: version,
}
if err = bc.dao.PutVersion(ver); err != nil {
return err return err
} }
bc.dao.Version = ver
bc.persistent.Version = ver
genesisBlock, err := createGenesisBlock(bc.config) genesisBlock, err := createGenesisBlock(bc.config)
if err != nil { if err != nil {
return err return err
@ -328,9 +329,30 @@ func (bc *Blockchain) init() error {
} }
return bc.storeBlock(genesisBlock, nil) return bc.storeBlock(genesisBlock, nil)
} }
if ver != version { if ver.Value != version {
return fmt.Errorf("storage version mismatch betweeen %s and %s", version, ver) return fmt.Errorf("storage version mismatch betweeen %s and %s", version, ver.Value)
} }
if ver.StateRootInHeader != bc.config.StateRootInHeader {
return fmt.Errorf("StateRootInHeader setting mismatch (config=%t, db=%t)",
ver.StateRootInHeader, bc.config.StateRootInHeader)
}
if ver.P2PSigExtensions != bc.config.P2PSigExtensions {
return fmt.Errorf("P2PSigExtensions setting mismatch (old=%t, new=%t",
ver.P2PSigExtensions, bc.config.P2PSigExtensions)
}
if ver.P2PStateExchangeExtensions != bc.config.P2PStateExchangeExtensions {
return fmt.Errorf("P2PStateExchangeExtensions setting mismatch (old=%t, new=%t",
ver.P2PStateExchangeExtensions, bc.config.P2PStateExchangeExtensions)
}
if ver.KeepOnlyLatestState != bc.config.KeepOnlyLatestState {
return fmt.Errorf("KeepOnlyLatestState setting mismatch: old=%v, new=%v",
ver.KeepOnlyLatestState, bc.config.KeepOnlyLatestState)
}
bc.dao.Version = ver
bc.persistent.Version = ver
// Always try to remove garbage. If there is nothing to do, it will exit quickly.
go bc.removeOldStorageItems()
// At this point there was no version found in the storage which // At this point there was no version found in the storage which
// implies a creating fresh storage with the version specified // implies a creating fresh storage with the version specified
@ -456,6 +478,26 @@ func (bc *Blockchain) init() error {
return bc.updateExtensibleWhitelist(bHeight) return bc.updateExtensibleWhitelist(bHeight)
} }
func (bc *Blockchain) removeOldStorageItems() {
_, err := bc.dao.Store.Get(storage.SYSCleanStorage.Bytes())
if err != nil {
return
}
b := bc.dao.Store.Batch()
prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
bc.dao.Store.Seek([]byte{byte(prefix)}, func(k, _ []byte) {
// #1468, but don't need to copy here, because it is done by Store.
b.Delete(k)
})
b.Delete(storage.SYSCleanStorage.Bytes())
err = bc.dao.Store.PutBatch(b)
if err != nil {
bc.log.Error("failed to remove old storage items", zap.Error(err))
}
}
// jumpToState is an atomic operation that changes Blockchain state to the one // jumpToState is an atomic operation that changes Blockchain state to the one
// specified by the state sync point p. All the data needed for the jump must be // specified by the state sync point p. All the data needed for the jump must be
// collected by the state sync module. // collected by the state sync module.
@ -487,49 +529,29 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
} }
fallthrough fallthrough
case stateJumpStarted: case stateJumpStarted:
// Replace old storage items by new ones, it should be done step-by step. newPrefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
// Firstly, remove all old genesis-related items. v, err := bc.dao.GetVersion()
b := bc.dao.Store.Batch() if err != nil {
bc.dao.Store.Seek([]byte{byte(storage.STStorage)}, func(k, _ []byte) { return fmt.Errorf("failed to get dao.Version: %w", err)
// #1468, but don't need to copy here, because it is done by Store. }
b.Delete(k) v.StoragePrefix = newPrefix
}) if err := bc.dao.PutVersion(v); err != nil {
b.Put(jumpStageKey, []byte{byte(oldStorageItemsRemoved)}) return fmt.Errorf("failed to update dao.Version: %w", err)
err := bc.dao.Store.PutBatch(b) }
bc.persistent.Version = v
err = bc.dao.Store.Put(jumpStageKey, []byte{byte(newStorageItemsAdded)})
if err != nil { if err != nil {
return fmt.Errorf("failed to store state jump stage: %w", err) return fmt.Errorf("failed to store state jump stage: %w", err)
} }
fallthrough
case oldStorageItemsRemoved: err = bc.dao.Store.Put(storage.SYSCleanStorage.Bytes(), []byte{})
// Then change STTempStorage prefix to STStorage. Each replace operation is atomic.
for {
count := 0
b := bc.dao.Store.Batch()
bc.dao.Store.Seek([]byte{byte(storage.STTempStorage)}, func(k, v []byte) {
if count >= maxStorageBatchSize {
return
}
// #1468, but don't need to copy here, because it is done by Store.
b.Delete(k)
key := make([]byte, len(k))
key[0] = byte(storage.STStorage)
copy(key[1:], k[1:])
b.Put(key, slice.Copy(v))
count += 2
})
if count > 0 {
err := bc.dao.Store.PutBatch(b)
if err != nil {
return fmt.Errorf("failed to replace outdated contract storage items with the fresh ones: %w", err)
}
} else {
break
}
}
err := bc.dao.Store.Put(jumpStageKey, []byte{byte(newStorageItemsAdded)})
if err != nil { if err != nil {
return fmt.Errorf("failed to store state jump stage: %w", err) return fmt.Errorf("failed to store clean storage flag: %w", err)
} }
go bc.removeOldStorageItems()
fallthrough fallthrough
case newStorageItemsAdded: case newStorageItemsAdded:
// After current state is updated, we need to remove outdated state-related data if so. // After current state is updated, we need to remove outdated state-related data if so.

View file

@ -1765,9 +1765,13 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
// put storage items with STTemp prefix // put storage items with STTemp prefix
batch := bcSpout.dao.Store.Batch() batch := bcSpout.dao.Store.Batch()
bcSpout.dao.Store.Seek(storage.STStorage.Bytes(), func(k, v []byte) { tempPrefix := storage.STTempStorage
if bcSpout.dao.Version.StoragePrefix == tempPrefix {
tempPrefix = storage.STStorage
}
bcSpout.dao.Store.Seek(bcSpout.dao.Version.StoragePrefix.Bytes(), func(k, v []byte) {
key := slice.Copy(k) key := slice.Copy(k)
key[0] = storage.STTempStorage.Bytes()[0] key[0] = byte(tempPrefix)
value := slice.Copy(v) value := slice.Copy(v)
batch.Put(key, value) batch.Put(key, value)
}) })
@ -1812,14 +1816,14 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), point)) require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), point))
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, true) checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, true)
}) })
for _, stage := range []stateJumpStage{stateJumpStarted, oldStorageItemsRemoved, newStorageItemsAdded, genesisStateRemoved, 0x03} { for _, stage := range []stateJumpStage{stateJumpStarted, newStorageItemsAdded, genesisStateRemoved, 0x03} {
t.Run(fmt.Sprintf("state jump stage %d", stage), func(t *testing.T) { t.Run(fmt.Sprintf("state jump stage %d", stage), func(t *testing.T) {
require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateJumpStage.Bytes(), []byte{byte(stage)})) require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateJumpStage.Bytes(), []byte{byte(stage)}))
point := make([]byte, 4) point := make([]byte, 4)
binary.LittleEndian.PutUint32(point, uint32(stateSyncPoint)) binary.LittleEndian.PutUint32(point, uint32(stateSyncPoint))
require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), point)) require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), point))
shouldFail := stage == 0x03 // unknown stage shouldFail := stage == 0x03 // unknown stage
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, shouldFail) checkNewBlockchainErr(t, spountCfg, bcSpout.dao.Store, shouldFail)
}) })
} }
} }

View file

@ -50,7 +50,7 @@ type DAO interface {
GetStorageItems(id int32) ([]state.StorageItemWithKey, error) GetStorageItems(id int32) ([]state.StorageItemWithKey, error)
GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.StorageItemWithKey, error) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.StorageItemWithKey, error)
GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error)
GetVersion() (string, error) GetVersion() (Version, error)
GetWrapped() DAO GetWrapped() DAO
HasTransaction(hash util.Uint256) error HasTransaction(hash util.Uint256) error
Persist() (int, error) Persist() (int, error)
@ -63,7 +63,7 @@ type DAO interface {
PutStateSyncPoint(p uint32) error PutStateSyncPoint(p uint32) error
PutStateSyncCurrentBlockHeight(h uint32) error PutStateSyncCurrentBlockHeight(h uint32) error
PutStorageItem(id int32, key []byte, si state.StorageItem) error PutStorageItem(id int32, key []byte, si state.StorageItem) error
PutVersion(v string) error PutVersion(v Version) error
Seek(id int32, prefix []byte, f func(k, v []byte)) Seek(id int32, prefix []byte, f func(k, v []byte))
SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue
StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error
@ -74,17 +74,21 @@ type DAO interface {
// Simple is memCached wrapper around DB, simple DAO implementation. // Simple is memCached wrapper around DB, simple DAO implementation.
type Simple struct { type Simple struct {
Store *storage.MemCachedStore Version Version
// stateRootInHeader specifies if block header contains state root. Store *storage.MemCachedStore
stateRootInHeader bool
// p2pSigExtensions denotes whether P2PSignatureExtensions are enabled.
p2pSigExtensions bool
} }
// NewSimple creates new simple dao using provided backend store. // NewSimple creates new simple dao using provided backend store.
func NewSimple(backend storage.Store, stateRootInHeader bool, p2pSigExtensions bool) *Simple { func NewSimple(backend storage.Store, stateRootInHeader bool, p2pSigExtensions bool) *Simple {
st := storage.NewMemCachedStore(backend) st := storage.NewMemCachedStore(backend)
return &Simple{Store: st, stateRootInHeader: stateRootInHeader, p2pSigExtensions: p2pSigExtensions} return &Simple{
Version: Version{
StoragePrefix: storage.STStorage,
StateRootInHeader: stateRootInHeader,
P2PSigExtensions: p2pSigExtensions,
},
Store: st,
}
} }
// GetBatch returns currently accumulated DB changeset. // GetBatch returns currently accumulated DB changeset.
@ -95,7 +99,8 @@ func (dao *Simple) GetBatch() *storage.MemBatch {
// GetWrapped returns new DAO instance with another layer of wrapped // GetWrapped returns new DAO instance with another layer of wrapped
// MemCachedStore around the current DAO Store. // MemCachedStore around the current DAO Store.
func (dao *Simple) GetWrapped() DAO { func (dao *Simple) GetWrapped() DAO {
d := NewSimple(dao.Store, dao.stateRootInHeader, dao.p2pSigExtensions) d := NewSimple(dao.Store, dao.Version.StateRootInHeader, dao.Version.P2PSigExtensions)
d.Version = dao.Version
return d return d
} }
@ -277,7 +282,7 @@ func (dao *Simple) PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWrit
// GetStorageItem returns StorageItem if it exists in the given store. // GetStorageItem returns StorageItem if it exists in the given store.
func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem { func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem {
b, err := dao.Store.Get(makeStorageItemKey(id, key)) b, err := dao.Store.Get(makeStorageItemKey(dao.Version.StoragePrefix, id, key))
if err != nil { if err != nil {
return nil return nil
} }
@ -287,14 +292,14 @@ func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem {
// PutStorageItem puts given StorageItem for given id with given // PutStorageItem puts given StorageItem for given id with given
// key into the given store. // key into the given store.
func (dao *Simple) PutStorageItem(id int32, key []byte, si state.StorageItem) error { func (dao *Simple) PutStorageItem(id int32, key []byte, si state.StorageItem) error {
stKey := makeStorageItemKey(id, key) stKey := makeStorageItemKey(dao.Version.StoragePrefix, id, key)
return dao.Store.Put(stKey, si) return dao.Store.Put(stKey, si)
} }
// DeleteStorageItem drops storage item for the given id with the // DeleteStorageItem drops storage item for the given id with the
// given key from the store. // given key from the store.
func (dao *Simple) DeleteStorageItem(id int32, key []byte) error { func (dao *Simple) DeleteStorageItem(id int32, key []byte) error {
stKey := makeStorageItemKey(id, key) stKey := makeStorageItemKey(dao.Version.StoragePrefix, id, key)
return dao.Store.Delete(stKey) return dao.Store.Delete(stKey)
} }
@ -323,7 +328,7 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S
// Seek executes f for all items with a given prefix. // Seek executes f for all items with a given prefix.
// If key is to be used outside of f, they may not be copied. // If key is to be used outside of f, they may not be copied.
func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) { func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) {
lookupKey := makeStorageItemKey(id, nil) lookupKey := makeStorageItemKey(dao.Version.StoragePrefix, id, nil)
if prefix != nil { if prefix != nil {
lookupKey = append(lookupKey, prefix...) lookupKey = append(lookupKey, prefix...)
} }
@ -335,7 +340,7 @@ func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) {
// SeekAsync sends all storage items matching given prefix to a channel and returns // SeekAsync sends all storage items matching given prefix to a channel and returns
// the channel. Resulting keys and values may not be copied. // the channel. Resulting keys and values may not be copied.
func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue { func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue {
lookupKey := makeStorageItemKey(id, nil) lookupKey := makeStorageItemKey(dao.Version.StoragePrefix, id, nil)
if prefix != nil { if prefix != nil {
lookupKey = append(lookupKey, prefix...) lookupKey = append(lookupKey, prefix...)
} }
@ -343,10 +348,10 @@ func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan
} }
// makeStorageItemKey returns a key used to store StorageItem in the DB. // makeStorageItemKey returns a key used to store StorageItem in the DB.
func makeStorageItemKey(id int32, key []byte) []byte { func makeStorageItemKey(prefix storage.KeyPrefix, id int32, key []byte) []byte {
// 1 for prefix + 4 for Uint32 + len(key) for key // 1 for prefix + 4 for Uint32 + len(key) for key
buf := make([]byte, 5+len(key)) buf := make([]byte, 5+len(key))
buf[0] = byte(storage.STStorage) buf[0] = byte(prefix)
binary.LittleEndian.PutUint32(buf[1:], uint32(id)) binary.LittleEndian.PutUint32(buf[1:], uint32(id))
copy(buf[5:], key) copy(buf[5:], key)
return buf return buf
@ -364,18 +369,85 @@ func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) {
return nil, err return nil, err
} }
block, err := block.NewBlockFromTrimmedBytes(dao.stateRootInHeader, b) block, err := block.NewBlockFromTrimmedBytes(dao.Version.StateRootInHeader, b)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return block, nil return block, nil
} }
// Version represents current dao version.
type Version struct {
StoragePrefix storage.KeyPrefix
StateRootInHeader bool
P2PSigExtensions bool
P2PStateExchangeExtensions bool
KeepOnlyLatestState bool
Value string
}
const (
stateRootInHeaderBit = 1 << iota
p2pSigExtensionsBit
p2pStateExchangeExtensionsBit
keepOnlyLatestStateBit
)
// FromBytes decodes v from a byte-slice.
func (v *Version) FromBytes(data []byte) error {
if len(data) == 0 {
return errors.New("missing version")
}
i := 0
for ; i < len(data) && data[i] != '\x00'; i++ {
}
if i == len(data) {
v.Value = string(data)
return nil
}
if len(data) != i+3 {
return errors.New("version is invalid")
}
v.Value = string(data[:i])
v.StoragePrefix = storage.KeyPrefix(data[i+1])
v.StateRootInHeader = data[i+2]&stateRootInHeaderBit != 0
v.P2PSigExtensions = data[i+2]&p2pSigExtensionsBit != 0
v.P2PStateExchangeExtensions = data[i+2]&p2pStateExchangeExtensionsBit != 0
v.KeepOnlyLatestState = data[i+2]&keepOnlyLatestStateBit != 0
return nil
}
// Bytes encodes v to a byte-slice.
func (v *Version) Bytes() []byte {
var mask byte
if v.StateRootInHeader {
mask |= stateRootInHeaderBit
}
if v.P2PSigExtensions {
mask |= p2pSigExtensionsBit
}
if v.P2PStateExchangeExtensions {
mask |= p2pStateExchangeExtensionsBit
}
if v.KeepOnlyLatestState {
mask |= keepOnlyLatestStateBit
}
return append([]byte(v.Value), '\x00', byte(v.StoragePrefix), mask)
}
// GetVersion attempts to get the current version stored in the // GetVersion attempts to get the current version stored in the
// underlying store. // underlying store.
func (dao *Simple) GetVersion() (string, error) { func (dao *Simple) GetVersion() (Version, error) {
version, err := dao.Store.Get(storage.SYSVersion.Bytes()) var version Version
return string(version), err
data, err := dao.Store.Get(storage.SYSVersion.Bytes())
if err == nil {
err = version.FromBytes(data)
}
return version, err
} }
// GetCurrentBlockHeight returns the current block height found in the // GetCurrentBlockHeight returns the current block height found in the
@ -478,8 +550,9 @@ func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction,
} }
// PutVersion stores the given version in the underlying store. // PutVersion stores the given version in the underlying store.
func (dao *Simple) PutVersion(v string) error { func (dao *Simple) PutVersion(v Version) error {
return dao.Store.Put(storage.SYSVersion.Bytes(), []byte(v)) dao.Version = v
return dao.Store.Put(storage.SYSVersion.Bytes(), v.Bytes())
} }
// PutCurrentHeader stores current header. // PutCurrentHeader stores current header.
@ -564,7 +637,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error {
return err return err
} }
b, err := block.NewBlockFromTrimmedBytes(dao.stateRootInHeader, bs) b, err := block.NewBlockFromTrimmedBytes(dao.Version.StateRootInHeader, bs)
if err != nil { if err != nil {
return err return err
} }
@ -583,7 +656,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error {
for _, tx := range b.Transactions { for _, tx := range b.Transactions {
copy(key[1:], tx.Hash().BytesBE()) copy(key[1:], tx.Hash().BytesBE())
batch.Delete(key) batch.Delete(key)
if dao.p2pSigExtensions { if dao.Version.P2PSigExtensions {
for _, attr := range tx.GetAttributes(transaction.ConflictsT) { for _, attr := range tx.GetAttributes(transaction.ConflictsT) {
hash := attr.Value.(*transaction.Conflicts).Hash hash := attr.Value.(*transaction.Conflicts).Hash
copy(key[1:], hash.BytesBE()) copy(key[1:], hash.BytesBE())
@ -631,7 +704,7 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32,
if err != nil { if err != nil {
return err return err
} }
if dao.p2pSigExtensions { if dao.Version.P2PSigExtensions {
var value []byte var value []byte
for _, attr := range tx.GetAttributes(transaction.ConflictsT) { for _, attr := range tx.GetAttributes(transaction.ConflictsT) {
hash := attr.Value.(*transaction.Conflicts).Hash hash := attr.Value.(*transaction.Conflicts).Hash
@ -667,7 +740,7 @@ func (dao *Simple) PersistSync() (int, error) {
// GetMPTBatch storage changes to be applied to MPT. // GetMPTBatch storage changes to be applied to MPT.
func (dao *Simple) GetMPTBatch() mpt.Batch { func (dao *Simple) GetMPTBatch() mpt.Batch {
var b mpt.Batch var b mpt.Batch
dao.Store.MemoryStore.SeekAll([]byte{byte(storage.STStorage)}, func(k, v []byte) { dao.Store.MemoryStore.SeekAll([]byte{byte(dao.Version.StoragePrefix)}, func(k, v []byte) {
b.Add(k[1:], v) b.Add(k[1:], v)
}) })
return b return b

View file

@ -115,16 +115,38 @@ func TestGetVersion_NoVersion(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), false, false) dao := NewSimple(storage.NewMemoryStore(), false, false)
version, err := dao.GetVersion() version, err := dao.GetVersion()
require.Error(t, err) require.Error(t, err)
require.Equal(t, "", version) require.Equal(t, "", version.Value)
} }
func TestGetVersion(t *testing.T) { func TestGetVersion(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), false, false) dao := NewSimple(storage.NewMemoryStore(), false, false)
err := dao.PutVersion("testVersion") expected := Version{
StoragePrefix: 0x42,
P2PSigExtensions: true,
StateRootInHeader: true,
Value: "testVersion",
}
err := dao.PutVersion(expected)
require.NoError(t, err) require.NoError(t, err)
version, err := dao.GetVersion() actual, err := dao.GetVersion()
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, version) require.Equal(t, expected, actual)
t.Run("invalid", func(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), false, false)
require.NoError(t, dao.Store.Put(storage.SYSVersion.Bytes(), []byte("0.1.2\x00x")))
_, err := dao.GetVersion()
require.Error(t, err)
})
t.Run("old format", func(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), false, false)
require.NoError(t, dao.Store.Put(storage.SYSVersion.Bytes(), []byte("0.1.2")))
version, err := dao.GetVersion()
require.NoError(t, err)
require.Equal(t, "0.1.2", version.Value)
})
} }
func TestGetCurrentHeaderHeight_NoHeader(t *testing.T) { func TestGetCurrentHeaderHeight_NoHeader(t *testing.T) {
@ -222,11 +244,16 @@ func TestMakeStorageItemKey(t *testing.T) {
expected := []byte{byte(storage.STStorage), 0, 0, 0, 0, 1, 2, 3} expected := []byte{byte(storage.STStorage), 0, 0, 0, 0, 1, 2, 3}
binary.LittleEndian.PutUint32(expected[1:5], uint32(id)) binary.LittleEndian.PutUint32(expected[1:5], uint32(id))
actual := makeStorageItemKey(id, []byte{1, 2, 3}) actual := makeStorageItemKey(storage.STStorage, id, []byte{1, 2, 3})
require.Equal(t, expected, actual) require.Equal(t, expected, actual)
expected = expected[0:5] expected = expected[0:5]
actual = makeStorageItemKey(id, nil) actual = makeStorageItemKey(storage.STStorage, id, nil)
require.Equal(t, expected, actual)
expected = []byte{byte(storage.STTempStorage), 0, 0, 0, 0, 1, 2, 3}
binary.LittleEndian.PutUint32(expected[1:5], uint32(id))
actual = makeStorageItemKey(storage.STTempStorage, id, []byte{1, 2, 3})
require.Equal(t, expected, actual) require.Equal(t, expected, actual)
} }

View file

@ -28,7 +28,8 @@ var (
// 3. Pair (node, path) must be restored only once. It's a duty of MPT pool to manage // 3. Pair (node, path) must be restored only once. It's a duty of MPT pool to manage
// MPT paths in order to provide this assumption. // MPT paths in order to provide this assumption.
type Billet struct { type Billet struct {
Store *storage.MemCachedStore TempStoragePrefix storage.KeyPrefix
Store *storage.MemCachedStore
root Node root Node
refcountEnabled bool refcountEnabled bool
@ -38,11 +39,12 @@ type Billet struct {
// to decouple storage errors from logic errors so that all storage errors are // to decouple storage errors from logic errors so that all storage errors are
// processed during `store.Persist()` at the caller. This also has the benefit, // processed during `store.Persist()` at the caller. This also has the benefit,
// that every `Put` can be considered an atomic operation. // that every `Put` can be considered an atomic operation.
func NewBillet(rootHash util.Uint256, enableRefCount bool, store *storage.MemCachedStore) *Billet { func NewBillet(rootHash util.Uint256, enableRefCount bool, prefix storage.KeyPrefix, store *storage.MemCachedStore) *Billet {
return &Billet{ return &Billet{
Store: store, TempStoragePrefix: prefix,
root: NewHashNode(rootHash), Store: store,
refcountEnabled: enableRefCount, root: NewHashNode(rootHash),
refcountEnabled: enableRefCount,
} }
} }
@ -64,7 +66,10 @@ func (b *Billet) RestoreHashNode(path []byte, node Node) error {
// If it's a leaf, then put into temporary contract storage. // If it's a leaf, then put into temporary contract storage.
if leaf, ok := node.(*LeafNode); ok { if leaf, ok := node.(*LeafNode); ok {
k := append([]byte{byte(storage.STTempStorage)}, fromNibbles(path)...) if b.TempStoragePrefix == 0 {
panic("invalid storage prefix")
}
k := append([]byte{byte(b.TempStoragePrefix)}, fromNibbles(path)...)
_ = b.Store.Put(k, leaf.value) _ = b.Store.Put(k, leaf.value)
} }
return nil return nil

View file

@ -32,7 +32,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
b.Children[5] = NewExtensionNode([]byte{0x01}, NewLeafNode([]byte{0xAB, 0xDE})) b.Children[5] = NewExtensionNode([]byte{0x01}, NewLeafNode([]byte{0xAB, 0xDE}))
path := toNibbles([]byte{0xAC}) path := toNibbles([]byte{0xAC})
e := NewExtensionNode(path, NewHashNode(b.Hash())) e := NewExtensionNode(path, NewHashNode(b.Hash()))
tr := NewBillet(e.Hash(), true, newTestStore()) tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore())
tr.root = e tr.root = e
// OK // OK
@ -61,7 +61,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
l := NewLeafNode([]byte{0xAB, 0xCD}) l := NewLeafNode([]byte{0xAB, 0xCD})
path := toNibbles([]byte{0xAC}) path := toNibbles([]byte{0xAC})
e := NewExtensionNode(path, NewHashNode(l.Hash())) e := NewExtensionNode(path, NewHashNode(l.Hash()))
tr := NewBillet(e.Hash(), true, newTestStore()) tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore())
tr.root = e tr.root = e
// OK // OK
@ -87,7 +87,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
h := NewHashNode(util.Uint256{1, 2, 3}) h := NewHashNode(util.Uint256{1, 2, 3})
path := toNibbles([]byte{0xAC}) path := toNibbles([]byte{0xAC})
e := NewExtensionNode(path, h) e := NewExtensionNode(path, h)
tr := NewBillet(e.Hash(), true, newTestStore()) tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore())
tr.root = e tr.root = e
// no-op // no-op
@ -99,7 +99,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
t.Run("parent is Leaf", func(t *testing.T) { t.Run("parent is Leaf", func(t *testing.T) {
l := NewLeafNode([]byte{0xAB, 0xCD}) l := NewLeafNode([]byte{0xAB, 0xCD})
path := []byte{} path := []byte{}
tr := NewBillet(l.Hash(), true, newTestStore()) tr := NewBillet(l.Hash(), true, storage.STTempStorage, newTestStore())
tr.root = l tr.root = l
// Already restored => panic expected // Already restored => panic expected
@ -121,7 +121,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
b := NewBranchNode() b := NewBranchNode()
b.Children[5] = NewHashNode(l1.Hash()) b.Children[5] = NewHashNode(l1.Hash())
b.Children[lastChild] = NewHashNode(l2.Hash()) b.Children[lastChild] = NewHashNode(l2.Hash())
tr := NewBillet(b.Hash(), true, newTestStore()) tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore())
tr.root = b tr.root = b
// OK // OK
@ -152,7 +152,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
b := NewBranchNode() b := NewBranchNode()
b.Children[5] = NewHashNode(l1.Hash()) b.Children[5] = NewHashNode(l1.Hash())
b.Children[lastChild] = NewHashNode(l2.Hash()) b.Children[lastChild] = NewHashNode(l2.Hash())
tr := NewBillet(b.Hash(), true, newTestStore()) tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore())
tr.root = b tr.root = b
// OK // OK
@ -179,7 +179,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
// two same hashnodes => leaf's refcount expected to be 2 in the end. // two same hashnodes => leaf's refcount expected to be 2 in the end.
b.Children[3] = NewHashNode(l.Hash()) b.Children[3] = NewHashNode(l.Hash())
b.Children[4] = NewHashNode(l.Hash()) b.Children[4] = NewHashNode(l.Hash())
tr := NewBillet(b.Hash(), true, newTestStore()) tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore())
tr.root = b tr.root = b
// OK // OK
@ -202,7 +202,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
b := NewBranchNode() b := NewBranchNode()
b.Children[3] = NewHashNode(l.Hash()) b.Children[3] = NewHashNode(l.Hash())
b.Children[4] = NewHashNode(l.Hash()) b.Children[4] = NewHashNode(l.Hash())
tr := NewBillet(b.Hash(), true, newTestStore()) tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore())
// Should fail, because if it's a hash node with non-empty path, then the node // Should fail, because if it's a hash node with non-empty path, then the node
// has already been collapsed. // has already been collapsed.

View file

@ -566,7 +566,7 @@ func (t *Trie) Find(prefix, from []byte, max int) ([]storage.KeyValue, error) {
res []storage.KeyValue res []storage.KeyValue
count int count int
) )
b := NewBillet(t.root.Hash(), false, t.Store) b := NewBillet(t.root.Hash(), false, 0, t.Store)
process := func(pathToNode []byte, node Node, _ []byte) bool { process := func(pathToNode []byte, node Node, _ []byte) bool {
if leaf, ok := node.(*LeafNode); ok { if leaf, ok := node.(*LeafNode); ok {
if from == nil || !bytes.Equal(pathToNode, from) { // (*Billet).traverse includes `from` path into result if so. Need to filter out manually. if from == nil || !bytes.Equal(pathToNode, from) { // (*Billet).traverse includes `from` path into result if so. Need to filter out manually.

View file

@ -103,22 +103,10 @@ func (s *Module) Init(height uint32, enableRefCount bool) error {
s.validatedHeight.Store(binary.LittleEndian.Uint32(data)) s.validatedHeight.Store(binary.LittleEndian.Uint32(data))
} }
var gcKey = []byte{byte(storage.DataMPT), prefixGC}
if height == 0 { if height == 0 {
s.mpt = mpt.NewTrie(nil, enableRefCount, s.Store) s.mpt = mpt.NewTrie(nil, enableRefCount, s.Store)
var val byte
if enableRefCount {
val = 1
}
s.currentLocal.Store(util.Uint256{}) s.currentLocal.Store(util.Uint256{})
return s.Store.Put(gcKey, []byte{val}) return nil
}
var hasRefCount bool
if v, err := s.Store.Get(gcKey); err == nil {
hasRefCount = v[0] != 0
}
if hasRefCount != enableRefCount {
return fmt.Errorf("KeepOnlyLatestState setting mismatch: old=%v, new=%v", hasRefCount, enableRefCount)
} }
r, err := s.getStateRoot(makeStateRootKey(height)) r, err := s.getStateRoot(makeStateRootKey(height))
if err != nil { if err != nil {
@ -138,25 +126,15 @@ func (s *Module) CleanStorage() error {
if s.localHeight.Load() != 0 { if s.localHeight.Load() != 0 {
return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load()) return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load())
} }
gcKey := []byte{byte(storage.DataMPT), prefixGC}
gcVal, err := s.Store.Get(gcKey)
if err != nil {
return fmt.Errorf("failed to get GC flag: %w", err)
}
//
b := s.Store.Batch() b := s.Store.Batch()
s.Store.Seek([]byte{byte(storage.DataMPT)}, func(k, _ []byte) { s.Store.Seek([]byte{byte(storage.DataMPT)}, func(k, _ []byte) {
// #1468, but don't need to copy here, because it is done by Store. // #1468, but don't need to copy here, because it is done by Store.
b.Delete(k) b.Delete(k)
}) })
err = s.Store.PutBatch(b) err := s.Store.PutBatch(b)
if err != nil { if err != nil {
return fmt.Errorf("failed to remove outdated MPT-reated items: %w", err) return fmt.Errorf("failed to remove outdated MPT-reated items: %w", err)
} }
err = s.Store.Put(gcKey, gcVal)
if err != nil {
return fmt.Errorf("failed to store GC flag: %w", err)
}
currentLocal := s.currentLocal.Load().(util.Uint256) currentLocal := s.currentLocal.Load().(util.Uint256)
if !currentLocal.Equals(util.Uint256{}) { if !currentLocal.Equals(util.Uint256{}) {
err := s.addLocalStateRoot(s.Store, &state.MPTRoot{ err := s.addLocalStateRoot(s.Store, &state.MPTRoot{

View file

@ -17,7 +17,6 @@ var (
) )
const ( const (
prefixGC = 0x01
prefixLocal = 0x02 prefixLocal = 0x02
prefixValidated = 0x03 prefixValidated = 0x03
) )

View file

@ -165,6 +165,19 @@ func (s *Module) Init(currChainHeight uint32) error {
return s.defineSyncStage() return s.defineSyncStage()
} }
// TemporaryPrefix accepts current storage prefix and returns prefix
// to use for storing intermediate items during synchronization.
func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
switch currPrefix {
case storage.STStorage:
return storage.STTempStorage
case storage.STTempStorage:
return storage.STStorage
default:
panic(fmt.Sprintf("invalid storage prefix: %x", currPrefix))
}
}
// defineSyncStage sequentially checks and sets sync state process stage after Module // defineSyncStage sequentially checks and sets sync state process stage after Module
// initialization. It also performs initialization of MPT Billet if necessary. // initialization. It also performs initialization of MPT Billet if necessary.
func (s *Module) defineSyncStage() error { func (s *Module) defineSyncStage() error {
@ -194,7 +207,8 @@ func (s *Module) defineSyncStage() error {
if err != nil { if err != nil {
return fmt.Errorf("failed to get header to initialize MPT billet: %w", err) return fmt.Errorf("failed to get header to initialize MPT billet: %w", err)
} }
s.billet = mpt.NewBillet(header.PrevStateRoot, s.bc.GetConfig().KeepOnlyLatestState, s.dao.Store) s.billet = mpt.NewBillet(header.PrevStateRoot, s.bc.GetConfig().KeepOnlyLatestState,
TemporaryPrefix(s.dao.Version.StoragePrefix), s.dao.Store)
s.log.Info("MPT billet initialized", s.log.Info("MPT billet initialized",
zap.Uint32("height", s.syncPoint), zap.Uint32("height", s.syncPoint),
zap.String("state root", header.PrevStateRoot.StringBE())) zap.String("state root", header.PrevStateRoot.StringBE()))
@ -466,7 +480,7 @@ func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeByt
s.lock.RLock() s.lock.RLock()
defer s.lock.RUnlock() defer s.lock.RUnlock()
b := mpt.NewBillet(root, s.bc.GetConfig().KeepOnlyLatestState, storage.NewMemCachedStore(s.dao.Store)) b := mpt.NewBillet(root, s.bc.GetConfig().KeepOnlyLatestState, 0, storage.NewMemCachedStore(s.dao.Store))
return b.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool { return b.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool {
return process(node, nodeBytes) return process(node, nodeBytes)
}, false) }, false)

View file

@ -56,7 +56,8 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
dao: dao.NewSimple(actualStorage, true, false), dao: dao.NewSimple(actualStorage, true, false),
mptpool: NewPool(), mptpool: NewPool(),
} }
stateSync.billet = mpt.NewBillet(sr, true, actualStorage) stateSync.billet = mpt.NewBillet(sr, true,
TemporaryPrefix(stateSync.dao.Version.StoragePrefix), actualStorage)
stateSync.mptpool.Add(sr, []byte{}) stateSync.mptpool.Add(sr, []byte{})
// The test itself: we'll ask state sync module to restore each node exactly once. // The test itself: we'll ask state sync module to restore each node exactly once.

View file

@ -2,6 +2,7 @@ package core
import ( import (
"testing" "testing"
"time"
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
@ -300,7 +301,9 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
c.ProtocolConfiguration.KeepOnlyLatestState = true c.ProtocolConfiguration.KeepOnlyLatestState = true
c.ProtocolConfiguration.RemoveUntraceableBlocks = true c.ProtocolConfiguration.RemoveUntraceableBlocks = true
} }
bcBolt := newTestChainWithCustomCfg(t, boltCfg) bcBoltStore := memoryStore{storage.NewMemoryStore()}
bcBolt := initTestChain(t, bcBoltStore, boltCfg)
go bcBolt.Run()
module := bcBolt.GetStateSyncModule() module := bcBolt.GetStateSyncModule()
t.Run("error: add headers before initialisation", func(t *testing.T) { t.Run("error: add headers before initialisation", func(t *testing.T) {
@ -421,9 +424,12 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
// compare storage states // compare storage states
fetchStorage := func(bc *Blockchain) []storage.KeyValue { fetchStorage := func(bc *Blockchain) []storage.KeyValue {
var kv []storage.KeyValue var kv []storage.KeyValue
bc.dao.Store.Seek(storage.STStorage.Bytes(), func(k, v []byte) { bc.dao.Store.Seek(bc.dao.Version.StoragePrefix.Bytes(), func(k, v []byte) {
key := slice.Copy(k) key := slice.Copy(k)
value := slice.Copy(v) value := slice.Copy(v)
if key[0] == byte(storage.STTempStorage) {
key[0] = byte(storage.STStorage)
}
kv = append(kv, storage.KeyValue{ kv = append(kv, storage.KeyValue{
Key: key, Key: key,
Value: value, Value: value,
@ -436,7 +442,19 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
require.ElementsMatch(t, expected, actual) require.ElementsMatch(t, expected, actual)
// no temp items should be left // no temp items should be left
bcBolt.dao.Store.Seek(storage.STTempStorage.Bytes(), func(k, v []byte) { require.Eventually(t, func() bool {
t.Fatal("temp storage items are found") var haveItems bool
}) bcBolt.dao.Store.Seek(storage.STStorage.Bytes(), func(_, _ []byte) {
haveItems = true
})
return !haveItems
}, time.Second*5, time.Millisecond*100)
bcBolt.Close()
// Check restoring with new prefix.
bcBolt = initTestChain(t, bcBoltStore, boltCfg)
go bcBolt.Run()
defer bcBolt.Close()
require.Equal(t, storage.STTempStorage, bcBolt.dao.Version.StoragePrefix)
require.Equal(t, storage.STTempStorage, bcBolt.persistent.Version.StoragePrefix)
} }

View file

@ -29,6 +29,7 @@ const (
SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2 SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2
SYSStateSyncPoint KeyPrefix = 0xc3 SYSStateSyncPoint KeyPrefix = 0xc3
SYSStateJumpStage KeyPrefix = 0xc4 SYSStateJumpStage KeyPrefix = 0xc4
SYSCleanStorage KeyPrefix = 0xc5
SYSVersion KeyPrefix = 0xf0 SYSVersion KeyPrefix = 0xf0
) )