core: allow to reset blockchain state

This commit is contained in:
Anna Shaleva 2022-10-20 13:59:19 +03:00
parent ec9317d5b4
commit bd6bb9e9e2
9 changed files with 776 additions and 62 deletions

View file

@ -23,6 +23,23 @@ import (
const neoAmount = 99999000
// Various contract IDs that were deployed to basic chain.
const (
RublesContractID = int32(1)
VerifyContractID = int32(2)
VerifyWithArgsContractID = int32(3)
NNSContractID = int32(4)
NFSOContractID = int32(5)
StorageContractID = int32(6)
)
const (
// RublesOldTestvalue is a value initially stored by `testkey` key inside Rubles contract.
RublesOldTestvalue = "testvalue"
// RublesNewTestvalue is an updated value of Rubles' storage item with `testkey` key.
RublesNewTestvalue = "newtestvalue"
)
// InitSimple initializes chain with simple contracts from 'examples' folder.
// It's not as complicated as chain got after Init and may be used for tests where
// chain with a small amount of data is needed and for historical functionality testing.
@ -138,13 +155,13 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
// Block #2: deploy test_contract (Rubles contract).
cfgPath := filepath.Join(testDataPrefix, "test_contract.yml")
block2H, txDeployH, cHash := deployContractFromPriv0(t, filepath.Join(testDataPrefix, "test_contract.go"), "Rubl", cfgPath, 1)
block2H, txDeployH, cHash := deployContractFromPriv0(t, filepath.Join(testDataPrefix, "test_contract.go"), "Rubl", cfgPath, RublesContractID)
t.Logf("txDeploy: %s", txDeployH.StringLE())
t.Logf("Block2 hash: %s", block2H.StringLE())
// Block #3: invoke `putValue` method on the test_contract.
rublPriv0Invoker := e.NewInvoker(cHash, acc0)
txInvH := rublPriv0Invoker.Invoke(t, true, "putValue", "testkey", "testvalue")
txInvH := rublPriv0Invoker.Invoke(t, true, "putValue", "testkey", RublesOldTestvalue)
t.Logf("txInv: %s", txInvH.StringLE())
// Block #4: transfer 1000 NEO from priv0 to priv1.
@ -166,7 +183,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
// Block #7: push verification contract into the chain.
verifyPath := filepath.Join(testDataPrefix, "verify", "verification_contract.go")
verifyCfg := filepath.Join(testDataPrefix, "verify", "verification_contract.yml")
_, _, _ = deployContractFromPriv0(t, verifyPath, "Verify", verifyCfg, 2)
_, _, _ = deployContractFromPriv0(t, verifyPath, "Verify", verifyCfg, VerifyContractID)
// Block #8: deposit some GAS to notary contract for priv0.
transferTxH = gasPriv0Invoker.Invoke(t, true, "transfer", priv0ScriptHash, notaryHash, 10_0000_0000, []interface{}{priv0ScriptHash, int64(e.Chain.BlockHeight() + 1000)})
@ -183,12 +200,12 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
// Block #10: push verification contract with arguments into the chain.
verifyPath = filepath.Join(testDataPrefix, "verify_args", "verification_with_args_contract.go")
verifyCfg = filepath.Join(testDataPrefix, "verify_args", "verification_with_args_contract.yml")
_, _, _ = deployContractFromPriv0(t, verifyPath, "VerifyWithArgs", verifyCfg, 3) // block #10
_, _, _ = deployContractFromPriv0(t, verifyPath, "VerifyWithArgs", verifyCfg, VerifyWithArgsContractID) // block #10
// Block #11: push NameService contract into the chain.
nsPath := filepath.Join(examplesPrefix, "nft-nd-nns")
nsConfigPath := filepath.Join(nsPath, "nns.yml")
_, _, nsHash := deployContractFromPriv0(t, nsPath, nsPath, nsConfigPath, 4) // block #11
_, _, nsHash := deployContractFromPriv0(t, nsPath, nsPath, nsConfigPath, NNSContractID) // block #11
nsCommitteeInvoker := e.CommitteeInvoker(nsHash)
nsPriv0Invoker := e.NewInvoker(nsHash, acc0)
@ -212,7 +229,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
nsPriv0Invoker.Invoke(t, stackitem.Null{}, "setRecord", "neo.com", int64(nns.A), "1.2.3.4") // block #15
// Block #16: invoke `test_contract.go`: put new value with the same key to check `getstate` RPC call
txPutNewValue := rublPriv0Invoker.PrepareInvoke(t, "putValue", "testkey", "newtestvalue") // tx1
txPutNewValue := rublPriv0Invoker.PrepareInvoke(t, "putValue", "testkey", RublesNewTestvalue) // tx1
// Invoke `test_contract.go`: put values to check `findstates` RPC call.
txPut1 := rublPriv0Invoker.PrepareInvoke(t, "putValue", "aa", "v1") // tx2
txPut2 := rublPriv0Invoker.PrepareInvoke(t, "putValue", "aa10", "v2") // tx3
@ -226,7 +243,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
// Block #17: deploy NeoFS Object contract (NEP11-Divisible).
nfsPath := filepath.Join(examplesPrefix, "nft-d")
nfsConfigPath := filepath.Join(nfsPath, "nft.yml")
_, _, nfsHash := deployContractFromPriv0(t, nfsPath, nfsPath, nfsConfigPath, 5) // block #17
_, _, nfsHash := deployContractFromPriv0(t, nfsPath, nfsPath, nfsConfigPath, NFSOContractID) // block #17
nfsPriv0Invoker := e.NewInvoker(nfsHash, acc0)
nfsPriv1Invoker := e.NewInvoker(nfsHash, acc1)
@ -254,7 +271,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
// Block #22: deploy storage_contract (Storage contract for `traverseiterator` and `terminatesession` RPC calls test).
storagePath := filepath.Join(testDataPrefix, "storage", "storage_contract.go")
storageCfg := filepath.Join(testDataPrefix, "storage", "storage_contract.yml")
_, _, _ = deployContractFromPriv0(t, storagePath, "Storage", storageCfg, 6)
_, _, _ = deployContractFromPriv0(t, storagePath, "Storage", storageCfg, StorageContractID)
// Compile contract to test `invokescript` RPC call
invokePath := filepath.Join(testDataPrefix, "invoke", "invokescript_contract.go")

View file

@ -36,6 +36,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"github.com/nspcc-dev/neo-go/pkg/vm"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
@ -61,21 +62,30 @@ const (
defaultStateSyncInterval = 40000
)
// stateJumpStage denotes the stage of state jump process.
type stateJumpStage byte
// stateChangeStage denotes the stage of state modification process.
type stateChangeStage byte
// A set of stages used to split state jump / state reset into atomic operations.
const (
// none means that no state jump process was initiated yet.
none stateJumpStage = 1 << iota
// none means that no state jump or state reset process was initiated yet.
none stateChangeStage = 1 << iota
// stateJumpStarted means that state jump was just initiated, but outdated storage items
// were not yet removed.
stateJumpStarted
// newStorageItemsAdded means that contract storage items are up-to-date with the current
// state.
newStorageItemsAdded
// genesisStateRemoved means that state corresponding to the genesis block was removed
// from the storage.
genesisStateRemoved
// staleBlocksRemoved means that state corresponding to the stale blocks (genesis block in
// in case of state jump) was removed from the storage.
staleBlocksRemoved
// headersReset denotes stale SYS-prefixed and IX-prefixed information was removed from
// the storage (applicable to state reset only).
headersReset
// transfersReset denotes NEP transfers were successfully updated (applicable to state reset only).
transfersReset
// stateResetBit represents a bit identifier for state reset process. If this bit is not set, then
// it's an unfinished state jump.
stateResetBit byte = 1 << 7
)
var (
@ -451,22 +461,25 @@ func (bc *Blockchain) init() error {
}
}
// Check whether StateJump stage is in the storage and continue interrupted state jump if so.
jumpStage, err := bc.dao.Store.Get([]byte{byte(storage.SYSStateJumpStage)})
// Check whether StateChangeState stage is in the storage and continue interrupted state jump / state reset if so.
stateChStage, err := bc.dao.Store.Get([]byte{byte(storage.SYSStateChangeStage)})
if err == nil {
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().RemoveUntraceableBlocks) {
return errors.New("state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on. " +
"To start an archival node drop the database manually and restart the node")
}
if len(jumpStage) != 1 {
if len(stateChStage) != 1 {
return fmt.Errorf("invalid state jump stage format")
}
// State jump wasn't finished yet, thus continue it.
// State jump / state reset wasn't finished yet, thus continue it.
stateSyncPoint, err := bc.dao.GetStateSyncPoint()
if err != nil {
return fmt.Errorf("failed to get state sync point from the storage")
}
return bc.jumpToStateInternal(stateSyncPoint, stateJumpStage(jumpStage[0]))
if (stateChStage[0] & stateResetBit) != 0 {
return bc.resetStateInternal(stateSyncPoint, stateChangeStage(stateChStage[0]&(^stateResetBit)))
}
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().RemoveUntraceableBlocks) {
return errors.New("state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on. " +
"To start an archival node drop the database manually and restart the node")
}
return bc.jumpToStateInternal(stateSyncPoint, stateChangeStage(stateChStage[0]))
}
bHeight, err := bc.dao.GetCurrentBlockHeight()
@ -537,14 +550,14 @@ func (bc *Blockchain) jumpToState(p uint32) error {
// changes Blockchain state to the one specified by state sync point p and state
// jump stage. All the data needed for the jump must be in the DB, otherwise an
// error is returned. It is not protected by mutex.
func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error {
func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) error {
if p+1 >= uint32(len(bc.headerHashes)) {
return fmt.Errorf("invalid state sync point %d: headerHeignt is %d", p, len(bc.headerHashes))
}
bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p))
jumpStageKey := []byte{byte(storage.SYSStateJumpStage)}
jumpStageKey := []byte{byte(storage.SYSStateChangeStage)}
switch stage {
case none:
bc.dao.Store.Put(jumpStageKey, []byte{byte(stateJumpStarted)})
@ -586,28 +599,24 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
})
}
}
cache.Store.Put(jumpStageKey, []byte{byte(genesisStateRemoved)})
_, err := cache.Persist()
// Update SYS-prefixed info.
block, err := bc.dao.GetBlock(bc.headerHashes[p])
if err != nil {
return fmt.Errorf("failed to get current block: %w", err)
}
cache.StoreAsCurrentBlock(block)
cache.Store.Put(jumpStageKey, []byte{byte(staleBlocksRemoved)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist old items removal: %w", err)
}
case genesisStateRemoved:
case staleBlocksRemoved:
// there's nothing to do after that, so just continue with common operations
// and remove state jump stage in the end.
default:
return errors.New("unknown state jump stage")
return fmt.Errorf("unknown state jump stage: %d", stage)
}
block, err := bc.dao.GetBlock(bc.headerHashes[p])
if err != nil {
return fmt.Errorf("failed to get current block: %w", err)
}
bc.dao.StoreAsCurrentBlock(block)
bc.topBlock.Store(block)
atomic.StoreUint32(&bc.blockHeight, p)
atomic.StoreUint32(&bc.persistedHeight, p)
block, err = bc.dao.GetBlock(bc.headerHashes[p+1])
block, err := bc.dao.GetBlock(bc.headerHashes[p+1])
if err != nil {
return fmt.Errorf("failed to get block to init MPT: %w", err)
}
@ -616,18 +625,246 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
Root: block.PrevStateRoot,
})
bc.dao.Store.Delete(jumpStageKey)
err = bc.resetRAMState(p, false)
if err != nil {
return fmt.Errorf("failed to update in-memory blockchain data: %w", err)
}
return nil
}
// resetRAMState resets in-memory cached info.
func (bc *Blockchain) resetRAMState(height uint32, resetHeaders bool) error {
if resetHeaders {
bc.headerHashes = bc.headerHashes[:height+1]
bc.storedHeaderCount = height + 1
}
block, err := bc.dao.GetBlock(bc.headerHashes[height])
if err != nil {
return fmt.Errorf("failed to get current block: %w", err)
}
bc.topBlock.Store(block)
atomic.StoreUint32(&bc.blockHeight, height)
atomic.StoreUint32(&bc.persistedHeight, height)
err = bc.initializeNativeCache(block.Index, bc.dao)
if err != nil {
return fmt.Errorf("failed to initialize natives cache: %w", err)
}
if err := bc.updateExtensibleWhitelist(p); err != nil {
if err := bc.updateExtensibleWhitelist(height); err != nil {
return fmt.Errorf("failed to update extensible whitelist: %w", err)
}
updateBlockHeightMetric(p)
updateBlockHeightMetric(height)
return nil
}
bc.dao.Store.Delete(jumpStageKey)
// Reset resets chain state to the specified height if possible. This method
// performs direct DB changes and can be called on non-running Blockchain only.
func (bc *Blockchain) Reset(height uint32) error {
if bc.isRunning.Load().(bool) {
return errors.New("can't reset state of the running blockchain")
}
bc.dao.PutStateSyncPoint(height)
return bc.resetStateInternal(height, none)
}
func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) error {
currHeight := bc.BlockHeight()
if height > currHeight {
return fmt.Errorf("current block height is %d, can't reset state to height %d", currHeight, height)
}
if height == currHeight && stage == none {
bc.log.Info("chain is already at the proper state", zap.Uint32("height", height))
return nil
}
if bc.config.KeepOnlyLatestState {
return fmt.Errorf("KeepOnlyLatestState is enabled, state for height %d is outdated and removed from the storage", height)
}
if bc.config.RemoveUntraceableBlocks && currHeight >= bc.config.MaxTraceableBlocks {
return fmt.Errorf("RemoveUntraceableBlocks is enabled, a necessary batch of traceable blocks has already been removed")
}
// Retrieve necessary state before the DB modification.
hHeight := bc.HeaderHeight()
b, err := bc.GetBlock(bc.headerHashes[height])
if err != nil {
return fmt.Errorf("failed to retrieve block %d: %w", height, err)
}
sr, err := bc.stateRoot.GetStateRoot(height)
if err != nil {
return fmt.Errorf("failed to retrieve stateroot for height %d: %w", height, err)
}
v := bc.dao.Version
cache := bc.dao // dao is MemCachedStore over DB, so use dao directly to persist cached changes right to the underlying DB
bc.log.Info("initialize state reset", zap.Uint32("target height", height))
start := time.Now()
p := start
resetStageKey := []byte{byte(storage.SYSStateChangeStage)}
switch stage {
case none:
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err)
}
fallthrough
case stateJumpStarted:
// Remove headers/blocks/transactions/aers from currHeight down to height (not including height itself).
for i := height + 1; i <= hHeight; i++ {
err := cache.PurgeBlock(bc.headerHashes[i])
if err != nil {
return fmt.Errorf("error while removing block %d: %w", i, err)
}
}
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist blocks, transactions ans AERs changes to the DB: %w", err)
}
bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("duration", time.Since(p)))
p = time.Now()
fallthrough
case staleBlocksRemoved:
// Completely remove contract IDs to update them later.
cache.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.STContractID)}}, func(k, _ []byte) bool {
cache.Store.Delete(k)
return true
})
// Reset contracts storage and store new contract IDs.
var mode = mpt.ModeAll
if bc.config.RemoveUntraceableBlocks {
mode |= mpt.ModeGCFlag
}
trieStore := mpt.NewTrieStore(sr.Root, mode, cache.Store)
oldStoragePrefix := v.StoragePrefix
newStoragePrefix := statesync.TemporaryPrefix(oldStoragePrefix)
mgmtCSPrefixLen := 1 + 4 + 1 // STStorage + Management ID + contract state prefix
mgmtContractPrefix := make([]byte, mgmtCSPrefixLen-1)
id := int32(native.ManagementContractID)
binary.BigEndian.PutUint32(mgmtContractPrefix, uint32(id))
mgmtContractPrefix[4] = native.PrefixContract
cs := new(state.Contract)
const persistBatchSize = 10000
var (
seekErr error
cnt int
)
trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool {
if cnt >= persistBatchSize {
_, seekErr = cache.Persist()
if seekErr != nil {
return false
}
}
// May safely omit KV copying.
k[0] = byte(newStoragePrefix)
cache.Store.Put(k, v)
// @fixme: remove this part after #2702.
if bytes.HasPrefix(k[1:], mgmtContractPrefix) {
var hash util.Uint160
copy(hash[:], k[mgmtCSPrefixLen:])
err = stackitem.DeserializeConvertible(v, cs)
if err != nil {
seekErr = fmt.Errorf("failed to deserialize contract state: %w", err)
}
cache.PutContractID(cs.ID, hash)
}
cnt++
return seekErr == nil
})
if seekErr != nil {
return fmt.Errorf("failed to reset contract IDs: %w", err)
}
trieStore.Close()
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err)
}
bc.log.Info("contracts storage and IDs are reset", zap.Duration("duration", time.Since(p)))
p = time.Now()
fallthrough
case newStorageItemsAdded:
// Reset SYS-prefixed and IX-prefixed information.
cache.DeleteHeaderHashes(height+1, headerBatchCount)
cache.StoreAsCurrentBlock(b)
cache.PutCurrentHeader(b.Hash(), height)
v.StoragePrefix = statesync.TemporaryPrefix(v.StoragePrefix)
cache.PutVersion(v)
bc.persistent.Version = v
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist headers changes to the DB: %w", err)
}
bc.log.Info("headers are reset", zap.Duration("duration", time.Since(p)))
p = time.Now()
fallthrough
case headersReset:
// Reset MPT.
err = bc.stateRoot.ResetState(height, cache.Store)
if err != nil {
return fmt.Errorf("failed to rollback MPT state: %w", err)
}
// Reset transfers.
err = bc.resetTransfers(cache, height)
if err != nil {
return fmt.Errorf("failed to strip transfer log / transfer info: %w", err)
}
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err)
}
bc.log.Info("MPT and transfers are reset", zap.Duration("duration", time.Since(p)))
fallthrough
case transfersReset:
// there's nothing to do after that, so just continue with common operations
// and remove state reset stage in the end.
default:
return fmt.Errorf("unknown state reset stage: %d", stage)
}
// Direct (cache-less) DB operation: remove stale storage items.
err = bc.store.SeekGC(storage.SeekRange{
Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))},
}, func(_, _ []byte) bool {
return false
})
if err != nil {
return fmt.Errorf("faield to remove stale storage items from DB: %w", err)
}
cache.Store.Delete(resetStageKey)
// Unlike the state jump, state sync point must be removed as we have complete state for this height.
cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist state reset stage to DAO: %w", err)
}
err = bc.resetRAMState(height, true)
if err != nil {
return fmt.Errorf("failed to update in-memory blockchain data: %w", err)
}
bc.log.Info("reset finished successfully", zap.Duration("duration", time.Since(start)))
return nil
}
@ -734,6 +971,143 @@ func (bc *Blockchain) tryRunGC(oldHeight uint32) time.Duration {
return dur
}
// resetTransfers is a helper function that strips the top newest NEP17 and NEP11 transfer logs
// down to the given height (not including the height itself) and updates corresponding token
// transfer info.
func (bc *Blockchain) resetTransfers(cache *dao.Simple, height uint32) error {
// Completely remove transfer info, updating it takes too much effort. We'll gather new
// transfer info on-the-fly later.
cache.Store.Seek(storage.SeekRange{
Prefix: []byte{byte(storage.STTokenTransferInfo)},
}, func(k, v []byte) bool {
cache.Store.Delete(k)
return true
})
// Look inside each transfer batch and iterate over the batch transfers, picking those that
// not newer than the given height. Also, for each suitable transfer update transfer info
// flushing changes after complete account's transfers processing.
prefixes := []byte{byte(storage.STNEP11Transfers), byte(storage.STNEP17Transfers)}
for i := range prefixes {
var (
acc util.Uint160
trInfo *state.TokenTransferInfo
removeFollowing bool
seekErr error
)
cache.Store.Seek(storage.SeekRange{
Prefix: prefixes[i : i+1],
Backwards: false, // From oldest to newest batch.
}, func(k, v []byte) bool {
var batchAcc util.Uint160
copy(batchAcc[:], k[1:])
if batchAcc != acc { // Some new account we're iterating over.
if trInfo != nil {
seekErr = cache.PutTokenTransferInfo(acc, trInfo)
if seekErr != nil {
return false
}
}
acc = batchAcc
trInfo = nil
removeFollowing = false
} else if removeFollowing {
cache.Store.Delete(slice.Copy(k))
return seekErr == nil
}
r := io.NewBinReaderFromBuf(v[1:])
l := len(v)
bytesRead := 1 // 1 is for batch size byte which is read by default.
var (
oldBatchSize = v[0]
newBatchSize byte
)
for i := byte(0); i < v[0]; i++ { // From oldest to newest transfer of the batch.
var t *state.NEP17Transfer
if k[0] == byte(storage.STNEP11Transfers) {
tr := new(state.NEP11Transfer)
tr.DecodeBinary(r)
t = &tr.NEP17Transfer
} else {
t = new(state.NEP17Transfer)
t.DecodeBinary(r)
}
if r.Err != nil {
seekErr = fmt.Errorf("failed to decode subsequent transfer: %w", r.Err)
break
}
if t.Block > height {
break
}
bytesRead = l - r.Len() // Including batch size byte.
newBatchSize++
if trInfo == nil {
var err error
trInfo, err = cache.GetTokenTransferInfo(batchAcc)
if err != nil {
seekErr = fmt.Errorf("failed to retrieve token transfer info for %s: %w", batchAcc.StringLE(), r.Err)
return false
}
}
appendTokenTransferInfo(trInfo, t.Asset, t.Block, t.Timestamp, k[0] == byte(storage.STNEP11Transfers), newBatchSize >= state.TokenTransferBatchSize)
}
if newBatchSize == oldBatchSize {
// The batch is already in storage and doesn't need to be changed.
return seekErr == nil
}
if newBatchSize > 0 {
v[0] = newBatchSize
cache.Store.Put(k, v[:bytesRead])
} else {
cache.Store.Delete(k)
removeFollowing = true
}
return seekErr == nil
})
if seekErr != nil {
return seekErr
}
if trInfo != nil {
// Flush the last batch of transfer info changes.
err := cache.PutTokenTransferInfo(acc, trInfo)
if err != nil {
return err
}
}
}
return nil
}
// appendTokenTransferInfo is a helper for resetTransfers that updates token transfer info
// wrt the given transfer that was added to the subsequent transfer batch.
func appendTokenTransferInfo(transferData *state.TokenTransferInfo,
token int32, bIndex uint32, bTimestamp uint64, isNEP11 bool, lastTransferInBatch bool) {
var (
newBatch *bool
nextBatch *uint32
currTimestamp *uint64
)
if !isNEP11 {
newBatch = &transferData.NewNEP17Batch
nextBatch = &transferData.NextNEP17Batch
currTimestamp = &transferData.NextNEP17NewestTimestamp
} else {
newBatch = &transferData.NewNEP11Batch
nextBatch = &transferData.NextNEP11Batch
currTimestamp = &transferData.NextNEP11NewestTimestamp
}
transferData.LastUpdated[token] = bIndex
*newBatch = lastTransferInBatch
if *newBatch {
*nextBatch++
*currTimestamp = bTimestamp
}
}
func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration {
bc.log.Info("starting transfer data garbage collection", zap.Uint32("index", index))
start := time.Now()

View file

@ -203,14 +203,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
c.ProtocolConfiguration.KeepOnlyLatestState = true
}
// manually store statejump stage to check statejump recover process
bPrefix[0] = byte(storage.SYSStateJumpStage)
t.Run("invalid RemoveUntraceableBlocks setting", func(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
checkNewBlockchainErr(t, func(c *config.Config) {
boltCfg(c)
c.ProtocolConfiguration.RemoveUntraceableBlocks = false
}, bcSpout.dao.Store, "state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on")
})
bPrefix[0] = byte(storage.SYSStateChangeStage)
t.Run("invalid state jump stage format", func(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{0x01, 0x02})
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "invalid state jump stage format")
@ -219,6 +212,16 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "failed to get state sync point from the storage")
})
t.Run("invalid RemoveUntraceableBlocks setting", func(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
point := make([]byte, 4)
binary.LittleEndian.PutUint32(point, uint32(stateSyncPoint))
bcSpout.dao.Store.Put([]byte{byte(storage.SYSStateSyncPoint)}, point)
checkNewBlockchainErr(t, func(c *config.Config) {
boltCfg(c)
c.ProtocolConfiguration.RemoveUntraceableBlocks = false
}, bcSpout.dao.Store, "state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on")
})
t.Run("invalid state sync point", func(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
point := make([]byte, 4)
@ -226,7 +229,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
bcSpout.dao.Store.Put([]byte{byte(storage.SYSStateSyncPoint)}, point)
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "invalid state sync point")
})
for _, stage := range []stateJumpStage{stateJumpStarted, newStorageItemsAdded, genesisStateRemoved, 0x03} {
for _, stage := range []stateChangeStage{stateJumpStarted, newStorageItemsAdded, staleBlocksRemoved, 0x03} {
t.Run(fmt.Sprintf("state jump stage %d", stage), func(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stage)})
point := make([]byte, 4)

View file

@ -1897,3 +1897,220 @@ func TestBlockchain_Bug1728(t *testing.T) {
c := neotest.CompileSource(t, acc.ScriptHash(), strings.NewReader(src), &compiler.Options{Name: "TestContract"})
managementInvoker.DeployContract(t, c, nil)
}
func TestBlockchain_ResetStateErrors(t *testing.T) {
chainHeight := 3
checkResetErr := func(t *testing.T, cfg func(c *config.ProtocolConfiguration), h uint32, errText string) {
db, path := newLevelDBForTestingWithPath(t, t.TempDir())
bc, validators, committee := chain.NewMultiWithCustomConfigAndStore(t, cfg, db, false)
e := neotest.NewExecutor(t, bc, validators, committee)
go bc.Run()
for i := 0; i < chainHeight; i++ {
e.AddNewBlock(t) // get some height
}
bc.Close()
db, _ = newLevelDBForTestingWithPath(t, path)
defer db.Close()
bc, _, _ = chain.NewMultiWithCustomConfigAndStore(t, cfg, db, false)
err := bc.Reset(h)
if errText != "" {
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), errText), err)
} else {
require.NoError(t, err)
}
}
t.Run("large height", func(t *testing.T) {
checkResetErr(t, nil, uint32(chainHeight+1), "can't reset state to height 4")
})
t.Run("already at height", func(t *testing.T) {
checkResetErr(t, nil, uint32(chainHeight), "")
})
t.Run("KeepOnlyLatestState is enabled", func(t *testing.T) {
checkResetErr(t, func(c *config.ProtocolConfiguration) {
c.KeepOnlyLatestState = true
}, uint32(chainHeight-1), "KeepOnlyLatestState is enabled")
})
t.Run("some blocks where removed", func(t *testing.T) {
checkResetErr(t, func(c *config.ProtocolConfiguration) {
c.RemoveUntraceableBlocks = true
c.MaxTraceableBlocks = 2
}, uint32(chainHeight-3), "RemoveUntraceableBlocks is enabled, a necessary batch of traceable blocks has already been removed")
})
}
// TestBlockchain_ResetState is based on knowledge about basic chain transactions,
// it performs basic chain reset and checks that reset chain has proper state.
func TestBlockchain_ResetState(t *testing.T) {
// Create the DB.
db, path := newLevelDBForTestingWithPath(t, t.TempDir())
bc, validators, committee := chain.NewMultiWithCustomConfigAndStore(t, func(cfg *config.ProtocolConfiguration) {
cfg.P2PSigExtensions = true
}, db, false)
go bc.Run()
e := neotest.NewExecutor(t, bc, validators, committee)
basicchain.Init(t, "../../", e)
// Gather some reference information.
resetBlockIndex := uint32(15)
staleID := basicchain.NFSOContractID // NEP11
rublesH := e.ContractHash(t, basicchain.RublesContractID)
nnsH := e.ContractHash(t, basicchain.NNSContractID)
staleH := e.ContractHash(t, staleID)
gasH := e.NativeHash(t, nativenames.Gas)
neoH := e.NativeHash(t, nativenames.Neo)
gasID := e.NativeID(t, nativenames.Gas)
neoID := e.NativeID(t, nativenames.Neo)
resetBlockHash := bc.GetHeaderHash(int(resetBlockIndex))
resetBlockHeader, err := bc.GetHeader(resetBlockHash)
require.NoError(t, err)
topBlockHeight := bc.BlockHeight()
topBH := bc.GetHeaderHash(int(bc.BlockHeight()))
staleBH := bc.GetHeaderHash(int(resetBlockIndex + 1))
staleB, err := bc.GetBlock(staleBH)
require.NoError(t, err)
staleTx := staleB.Transactions[0]
_, err = bc.GetAppExecResults(staleTx.Hash(), trigger.Application)
require.NoError(t, err)
sr, err := bc.GetStateModule().GetStateRoot(resetBlockIndex)
require.NoError(t, err)
staleSR, err := bc.GetStateModule().GetStateRoot(resetBlockIndex + 1)
require.NoError(t, err)
rublesKey := []byte("testkey")
rublesStaleKey := []byte("aa")
rublesStaleValue := bc.GetStorageItem(basicchain.RublesContractID, rublesKey) // check value is there
require.Equal(t, []byte(basicchain.RublesNewTestvalue), []byte(rublesStaleValue))
acc0 := e.Validator.(neotest.MultiSigner).Single(2) // priv0 index->order and order->index conversion
priv0ScriptHash := acc0.ScriptHash()
var (
expectedNEP11t []*state.NEP11Transfer
expectedNEP17t []*state.NEP17Transfer
)
require.NoError(t, bc.ForEachNEP11Transfer(priv0ScriptHash, resetBlockHeader.Timestamp, func(t *state.NEP11Transfer) (bool, error) {
if t.Block <= resetBlockIndex {
expectedNEP11t = append(expectedNEP11t, t)
}
return true, nil
}))
require.NoError(t, bc.ForEachNEP17Transfer(priv0ScriptHash, resetBlockHeader.Timestamp, func(t *state.NEP17Transfer) (bool, error) {
if t.Block <= resetBlockIndex {
expectedNEP17t = append(expectedNEP17t, t)
}
return true, nil
}))
// checkProof checks that some stale proof is reachable
checkProof := func() {
rublesStaleFullKey := make([]byte, 4)
binary.LittleEndian.PutUint32(rublesStaleFullKey, uint32(basicchain.RublesContractID))
rublesStaleFullKey = append(rublesStaleFullKey, rublesStaleKey...)
proof, err := bc.GetStateModule().GetStateProof(staleSR.Root, rublesStaleFullKey)
require.NoError(t, err)
require.NotEmpty(t, proof)
}
checkProof()
// Ensure all changes were persisted.
bc.Close()
// Start new chain with existing DB, but do not run it.
db, _ = newLevelDBForTestingWithPath(t, path)
bc, _, _ = chain.NewMultiWithCustomConfigAndStore(t, func(cfg *config.ProtocolConfiguration) {
cfg.P2PSigExtensions = true
}, db, false)
defer db.Close()
require.Equal(t, topBlockHeight, bc.BlockHeight()) // ensure DB was properly initialized.
// Reset state.
require.NoError(t, bc.Reset(resetBlockIndex))
// Check that state was properly reset.
require.Equal(t, resetBlockIndex, bc.BlockHeight())
require.Equal(t, resetBlockIndex, bc.HeaderHeight())
require.Equal(t, resetBlockHash, bc.CurrentHeaderHash())
require.Equal(t, resetBlockHash, bc.CurrentBlockHash())
require.Equal(t, resetBlockIndex, bc.GetStateModule().CurrentLocalHeight())
require.Equal(t, sr.Root, bc.GetStateModule().CurrentLocalStateRoot())
require.Equal(t, uint32(0), bc.GetStateModule().CurrentValidatedHeight())
// Try to get the latest block\header.
bh := bc.GetHeaderHash(int(resetBlockIndex))
require.Equal(t, resetBlockHash, bh)
h, err := bc.GetHeader(bh)
require.NoError(t, err)
require.Equal(t, resetBlockHeader, h)
actualRublesHash, err := bc.GetContractScriptHash(basicchain.RublesContractID)
require.NoError(t, err)
require.Equal(t, rublesH, actualRublesHash)
// Check that stale blocks/headers/txs/aers/sr are not reachable.
for i := resetBlockIndex + 1; i <= topBlockHeight; i++ {
hHash := bc.GetHeaderHash(int(i))
require.Equal(t, util.Uint256{}, hHash)
_, err = bc.GetStateRoot(i)
require.Error(t, err)
}
for _, h := range []util.Uint256{staleBH, topBH} {
_, err = bc.GetHeader(h)
require.Error(t, err)
_, err = bc.GetHeader(h)
require.Error(t, err)
}
_, _, err = bc.GetTransaction(staleTx.Hash())
require.Error(t, err)
_, err = bc.GetAppExecResults(staleTx.Hash(), trigger.Application)
require.Error(t, err)
// However, proofs and everything related to stale MPT nodes still should work properly,
// because we don't remove stale MPT nodes.
checkProof()
// Check NEP-compatible contracts.
nep11 := bc.GetNEP11Contracts()
require.Equal(t, 1, len(nep11)) // NNS
require.Equal(t, nnsH, nep11[0])
nep17 := bc.GetNEP17Contracts()
require.Equal(t, 3, len(nep17)) // Neo, Gas, Rubles
require.ElementsMatch(t, []util.Uint160{gasH, neoH, rublesH}, nep17)
// Retrieve stale contract.
cs := bc.GetContractState(staleH)
require.Nil(t, cs)
// Retrieve stale storage item.
rublesValue := bc.GetStorageItem(basicchain.RublesContractID, rublesKey)
require.Equal(t, []byte(basicchain.RublesOldTestvalue), []byte(rublesValue)) // the one with historic state
require.Nil(t, bc.GetStorageItem(basicchain.RublesContractID, rublesStaleKey)) // the one that was added after target reset block
db.Seek(storage.SeekRange{
Prefix: []byte{byte(storage.STStorage)}, // no items with old prefix
}, func(k, v []byte) bool {
t.Fatal("no stale items must be left in storage")
return false
})
// Check transfers.
var (
actualNEP11t []*state.NEP11Transfer
actualNEP17t []*state.NEP17Transfer
)
require.NoError(t, bc.ForEachNEP11Transfer(priv0ScriptHash, e.TopBlock(t).Timestamp, func(t *state.NEP11Transfer) (bool, error) {
actualNEP11t = append(actualNEP11t, t)
return true, nil
}))
require.NoError(t, bc.ForEachNEP17Transfer(priv0ScriptHash, e.TopBlock(t).Timestamp, func(t *state.NEP17Transfer) (bool, error) {
actualNEP17t = append(actualNEP17t, t)
return true, nil
}))
assert.Equal(t, expectedNEP11t, actualNEP11t)
assert.Equal(t, expectedNEP17t, actualNEP17t)
lub, err := bc.GetTokenLastUpdated(priv0ScriptHash)
require.NoError(t, err)
expectedLUB := map[int32]uint32{ // this information is extracted from basic chain initialization code
basicchain.NNSContractID: resetBlockIndex - 1, // `neo.com` registration
basicchain.RublesContractID: 6, // transfer of 123 RUR to priv1
gasID: resetBlockIndex, // fee for `1.2.3.4` A record registration
neoID: 4, // transfer of 1000 NEO to priv1
}
require.Equal(t, expectedLUB, lub)
}

View file

@ -603,6 +603,26 @@ func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
return hashes, seekErr
}
// DeleteHeaderHashes removes batches of header hashes starting from the one that
// contains header with index `since` up to the most recent batch. It assumes that
// all stored batches contain `batchSize` hashes.
func (dao *Simple) DeleteHeaderHashes(since uint32, batchSize int) {
dao.Store.Seek(storage.SeekRange{
Prefix: dao.mkKeyPrefix(storage.IXHeaderHashList),
Backwards: true,
}, func(k, _ []byte) bool {
first := binary.BigEndian.Uint32(k[1:])
if first >= since {
dao.Store.Delete(k)
return first != since
}
if first+uint32(batchSize)-1 >= since {
dao.Store.Delete(k)
}
return false
})
}
// GetTransaction returns Transaction and its height by the given hash
// if it exists in the store. It does not return dummy transactions.
func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) {
@ -739,6 +759,17 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a
// DeleteBlock removes the block from dao. It's not atomic, so make sure you're
// using private MemCached instance here.
func (dao *Simple) DeleteBlock(h util.Uint256) error {
return dao.deleteBlock(h, true)
}
// PurgeBlock completely removes specified block (or just block header) from dao.
// It differs from DeleteBlock in that it removes header anyway. It's not atomic,
// so make sure you're using private MemCached instance here.
func (dao *Simple) PurgeBlock(h util.Uint256) error {
return dao.deleteBlock(h, false)
}
func (dao *Simple) deleteBlock(h util.Uint256, keepHeader bool) error {
key := dao.makeExecutableKey(h)
b, err := dao.getBlock(key)
@ -746,9 +777,13 @@ func (dao *Simple) DeleteBlock(h util.Uint256) error {
return err
}
err = dao.storeHeader(key, &b.Header)
if err != nil {
return err
if keepHeader {
err = dao.storeHeader(key, &b.Header)
if err != nil {
return err
}
} else {
dao.Store.Delete(key)
}
for _, tx := range b.Transactions {

View file

@ -43,7 +43,8 @@ type ManagementCache struct {
const (
ManagementContractID = -1
prefixContract = 8
// PrefixContract is a prefix used to store contract states inside Management native contract.
PrefixContract = 8
defaultMinimumDeploymentFee = 10_00000000
contractDeployNotificationName = "Deploy"
@ -87,7 +88,7 @@ func (c *ManagementCache) Copy() dao.NativeContractCache {
// MakeContractKey creates a key from the account script hash.
func MakeContractKey(h util.Uint160) []byte {
return makeUint160Key(prefixContract, h)
return makeUint160Key(PrefixContract, h)
}
// newManagement creates a new Management native contract.
@ -539,7 +540,7 @@ func (m *Management) InitializeCache(d *dao.Simple) error {
}
var initErr error
d.Seek(m.ID, storage.SeekRange{Prefix: []byte{prefixContract}}, func(_, v []byte) bool {
d.Seek(m.ID, storage.SeekRange{Prefix: []byte{PrefixContract}}, func(_, v []byte) bool {
var cs = new(state.Contract)
initErr = stackitem.DeserializeConvertible(v, cs)
if initErr != nil {

View file

@ -81,7 +81,7 @@ func TestManagement_Initialize(t *testing.T) {
t.Run("invalid contract state", func(t *testing.T) {
d := dao.NewSimple(storage.NewMemoryStore(), false, false)
mgmt := newManagement()
d.PutStorageItem(mgmt.ID, []byte{prefixContract}, state.StorageItem{0xFF})
d.PutStorageItem(mgmt.ID, []byte{PrefixContract}, state.StorageItem{0xFF})
require.Error(t, mgmt.InitializeCache(d))
})
}

View file

@ -16,6 +16,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/atomic"
"go.uber.org/zap"
@ -202,6 +203,67 @@ func (s *Module) JumpToState(sr *state.MPTRoot) {
s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), s.mode, s.Store)
}
// ResetState resets MPT state to the given height.
func (s *Module) ResetState(height uint32, cache *storage.MemCachedStore) error {
// Update local stateroot.
sr, err := s.GetStateRoot(height)
if err != nil {
return fmt.Errorf("failed to retrieve state root for height %d: %w", height, err)
}
s.addLocalStateRoot(cache, sr)
// Remove all stateroots newer than the given height.
srKey := makeStateRootKey(height)
var srSeen bool
cache.Seek(storage.SeekRange{
Prefix: srKey[0:1],
Start: srKey[1:5],
Backwards: false,
}, func(k, v []byte) bool {
if len(k) == 5 {
if srSeen {
cache.Delete(k)
} else if bytes.Equal(k, srKey) {
srSeen = true
}
}
return true
})
// Retrieve the most recent validated stateroot before the given height.
witnessesLenOffset := 1 /* version */ + 4 /* index */ + smartcontract.Hash256Len /* root */
var validated *uint32
cache.Seek(storage.SeekRange{
Prefix: srKey[0:1],
Start: srKey[1:5],
Backwards: true,
}, func(k, v []byte) bool {
if len(k) == 5 {
if len(v) > witnessesLenOffset && v[witnessesLenOffset] != 0 {
i := binary.BigEndian.Uint32(k[1:])
validated = &i
return false
}
}
return true
})
if validated != nil {
validatedBytes := make([]byte, 4)
binary.LittleEndian.PutUint32(validatedBytes, *validated)
cache.Put([]byte{byte(storage.DataMPTAux), prefixValidated}, validatedBytes)
s.validatedHeight.Store(*validated)
} else {
cache.Delete([]byte{byte(storage.DataMPTAux), prefixValidated})
}
s.currentLocal.Store(sr.Root)
s.localHeight.Store(sr.Index)
s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), s.mode, s.Store)
// Do not reset MPT nodes, leave the trie state itself as is.
return nil
}
// GC performs garbage collection.
func (s *Module) GC(index uint32, store storage.Store) time.Duration {
if !s.mode.GC() {

View file

@ -32,8 +32,13 @@ const (
SYSCurrentHeader KeyPrefix = 0xc1
SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2
SYSStateSyncPoint KeyPrefix = 0xc3
SYSStateJumpStage KeyPrefix = 0xc4
SYSVersion KeyPrefix = 0xf0
// SYSStateChangeStage is used to store the phase of a state changing process
// which is one of the state jump or state reset. Its value is one byte containing
// state reset / state jump stages bits (first seven bits are reserved for that)
// and the last bit reserved for the state reset process marker (set to 1 on
// unfinished state reset and to 0 on unfinished state jump).
SYSStateChangeStage KeyPrefix = 0xc4
SYSVersion KeyPrefix = 0xf0
)
// Executable subtypes.