Merge pull request #2773 from nspcc-dev/state-reset

core: implement state reset
This commit is contained in:
Roman Khimov 2022-11-10 22:26:43 +07:00 committed by GitHub
commit f78231fd9c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 880 additions and 80 deletions

View file

@ -73,6 +73,13 @@ func NewCommands() []cli.Command {
Usage: "use if dump is incremental", Usage: "use if dump is incremental",
}, },
) )
var cfgHeightFlags = make([]cli.Flag, len(cfgFlags)+1)
copy(cfgHeightFlags, cfgFlags)
cfgHeightFlags[len(cfgHeightFlags)-1] = cli.UintFlag{
Name: "height",
Usage: "Height of the state to reset DB to",
Required: true,
}
return []cli.Command{ return []cli.Command{
{ {
Name: "node", Name: "node",
@ -99,6 +106,13 @@ func NewCommands() []cli.Command {
Action: restoreDB, Action: restoreDB,
Flags: cfgCountInFlags, Flags: cfgCountInFlags,
}, },
{
Name: "reset",
Usage: "reset database to the previous state",
UsageText: "neo-go db reset --height height [--config-path path] [-p/-m/-t]",
Action: resetDB,
Flags: cfgHeightFlags,
},
}, },
}, },
} }
@ -302,6 +316,35 @@ func restoreDB(ctx *cli.Context) error {
return nil return nil
} }
func resetDB(ctx *cli.Context) error {
if err := cmdargs.EnsureNone(ctx); err != nil {
return err
}
cfg, err := options.GetConfigFromContext(ctx)
if err != nil {
return cli.NewExitError(err, 1)
}
h := uint32(ctx.Uint("height"))
log, logCloser, err := options.HandleLoggingParams(ctx.Bool("debug"), cfg.ApplicationConfiguration)
if err != nil {
return cli.NewExitError(err, 1)
}
if logCloser != nil {
defer func() { _ = logCloser() }()
}
chain, err := initBlockChain(cfg, log)
if err != nil {
return cli.NewExitError(fmt.Errorf("failed to create Blockchain instance: %w", err), 1)
}
err = chain.Reset(h)
if err != nil {
return cli.NewExitError(fmt.Errorf("failed to reset chain state to height %d: %w", h, err), 1)
}
return nil
}
func mkOracle(config config.OracleConfiguration, magic netmode.Magic, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*oracle.Oracle, error) { func mkOracle(config config.OracleConfiguration, magic netmode.Magic, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*oracle.Oracle, error) {
if !config.Enabled { if !config.Enabled {
return nil, nil return nil, nil

View file

@ -351,3 +351,18 @@ func TestInitBlockChain(t *testing.T) {
require.Error(t, err) require.Error(t, err)
}) })
} }
func TestResetDB(t *testing.T) {
d := t.TempDir()
err := os.Chdir(d)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, os.Chdir(serverTestWD)) })
set := flag.NewFlagSet("flagSet", flag.ExitOnError)
set.String("config-path", filepath.Join(serverTestWD, "..", "..", "config"), "")
set.Bool("privnet", true, "")
set.Bool("debug", true, "")
set.Int("height", 0, "")
ctx := cli.NewContext(cli.NewApp(), set, nil)
err = resetDB(ctx)
require.NoError(t, err)
}

View file

@ -89,13 +89,23 @@ Typical scenarios when this can be useful (without full node restart):
* updating TLS certificates for the RPC server * updating TLS certificates for the RPC server
* resolving operational issues * resolving operational issues
### DB import/exports ### DB import/exports/reset
Node operates using some database as a backend to store blockchain data. NeoGo Node operates using some database as a backend to store blockchain data. NeoGo
allows to dump chain into a file from the database (when node is stopped) or to allows to dump chain into a file from the database (when node is stopped) or to
import blocks from a file into the database (also when node is stopped). Use import blocks from a file into the database (also when node is stopped). Use
`db` command for that. `db` command for that.
NeoGo allows to reset the node state to a particular point. It is possible for
those nodes that do store complete chain state or for nodes with `RemoveUntraceableBlocks`
setting on that are not yet reached `MaxTraceableBlocks` number of blocks. Use
`db reset` command with the target block specified to roll back all the changes
made since the target block (not including changes made by the specified block
acceptance). The set of changes to be removed includes blocks, transactions,
execution results, contract storage changes, MPT-related auxiliary data and NEP
transfers data. Some stale MPT nodes may be left in storage after reset.
Once DB reset is finished, the node can be started in a regular manner.
## Smart contracts ## Smart contracts
Use `contract` command to create/compile/deploy/invoke/debug smart contracts, Use `contract` command to create/compile/deploy/invoke/debug smart contracts,

View file

@ -23,6 +23,23 @@ import (
const neoAmount = 99999000 const neoAmount = 99999000
// Various contract IDs that were deployed to basic chain.
const (
RublesContractID = int32(1)
VerifyContractID = int32(2)
VerifyWithArgsContractID = int32(3)
NNSContractID = int32(4)
NFSOContractID = int32(5)
StorageContractID = int32(6)
)
const (
// RublesOldTestvalue is a value initially stored by `testkey` key inside Rubles contract.
RublesOldTestvalue = "testvalue"
// RublesNewTestvalue is an updated value of Rubles' storage item with `testkey` key.
RublesNewTestvalue = "newtestvalue"
)
// InitSimple initializes chain with simple contracts from 'examples' folder. // InitSimple initializes chain with simple contracts from 'examples' folder.
// It's not as complicated as chain got after Init and may be used for tests where // It's not as complicated as chain got after Init and may be used for tests where
// chain with a small amount of data is needed and for historical functionality testing. // chain with a small amount of data is needed and for historical functionality testing.
@ -138,13 +155,13 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
// Block #2: deploy test_contract (Rubles contract). // Block #2: deploy test_contract (Rubles contract).
cfgPath := filepath.Join(testDataPrefix, "test_contract.yml") cfgPath := filepath.Join(testDataPrefix, "test_contract.yml")
block2H, txDeployH, cHash := deployContractFromPriv0(t, filepath.Join(testDataPrefix, "test_contract.go"), "Rubl", cfgPath, 1) block2H, txDeployH, cHash := deployContractFromPriv0(t, filepath.Join(testDataPrefix, "test_contract.go"), "Rubl", cfgPath, RublesContractID)
t.Logf("txDeploy: %s", txDeployH.StringLE()) t.Logf("txDeploy: %s", txDeployH.StringLE())
t.Logf("Block2 hash: %s", block2H.StringLE()) t.Logf("Block2 hash: %s", block2H.StringLE())
// Block #3: invoke `putValue` method on the test_contract. // Block #3: invoke `putValue` method on the test_contract.
rublPriv0Invoker := e.NewInvoker(cHash, acc0) rublPriv0Invoker := e.NewInvoker(cHash, acc0)
txInvH := rublPriv0Invoker.Invoke(t, true, "putValue", "testkey", "testvalue") txInvH := rublPriv0Invoker.Invoke(t, true, "putValue", "testkey", RublesOldTestvalue)
t.Logf("txInv: %s", txInvH.StringLE()) t.Logf("txInv: %s", txInvH.StringLE())
// Block #4: transfer 1000 NEO from priv0 to priv1. // Block #4: transfer 1000 NEO from priv0 to priv1.
@ -166,7 +183,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
// Block #7: push verification contract into the chain. // Block #7: push verification contract into the chain.
verifyPath := filepath.Join(testDataPrefix, "verify", "verification_contract.go") verifyPath := filepath.Join(testDataPrefix, "verify", "verification_contract.go")
verifyCfg := filepath.Join(testDataPrefix, "verify", "verification_contract.yml") verifyCfg := filepath.Join(testDataPrefix, "verify", "verification_contract.yml")
_, _, _ = deployContractFromPriv0(t, verifyPath, "Verify", verifyCfg, 2) _, _, _ = deployContractFromPriv0(t, verifyPath, "Verify", verifyCfg, VerifyContractID)
// Block #8: deposit some GAS to notary contract for priv0. // Block #8: deposit some GAS to notary contract for priv0.
transferTxH = gasPriv0Invoker.Invoke(t, true, "transfer", priv0ScriptHash, notaryHash, 10_0000_0000, []interface{}{priv0ScriptHash, int64(e.Chain.BlockHeight() + 1000)}) transferTxH = gasPriv0Invoker.Invoke(t, true, "transfer", priv0ScriptHash, notaryHash, 10_0000_0000, []interface{}{priv0ScriptHash, int64(e.Chain.BlockHeight() + 1000)})
@ -183,12 +200,12 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
// Block #10: push verification contract with arguments into the chain. // Block #10: push verification contract with arguments into the chain.
verifyPath = filepath.Join(testDataPrefix, "verify_args", "verification_with_args_contract.go") verifyPath = filepath.Join(testDataPrefix, "verify_args", "verification_with_args_contract.go")
verifyCfg = filepath.Join(testDataPrefix, "verify_args", "verification_with_args_contract.yml") verifyCfg = filepath.Join(testDataPrefix, "verify_args", "verification_with_args_contract.yml")
_, _, _ = deployContractFromPriv0(t, verifyPath, "VerifyWithArgs", verifyCfg, 3) // block #10 _, _, _ = deployContractFromPriv0(t, verifyPath, "VerifyWithArgs", verifyCfg, VerifyWithArgsContractID) // block #10
// Block #11: push NameService contract into the chain. // Block #11: push NameService contract into the chain.
nsPath := filepath.Join(examplesPrefix, "nft-nd-nns") nsPath := filepath.Join(examplesPrefix, "nft-nd-nns")
nsConfigPath := filepath.Join(nsPath, "nns.yml") nsConfigPath := filepath.Join(nsPath, "nns.yml")
_, _, nsHash := deployContractFromPriv0(t, nsPath, nsPath, nsConfigPath, 4) // block #11 _, _, nsHash := deployContractFromPriv0(t, nsPath, nsPath, nsConfigPath, NNSContractID) // block #11
nsCommitteeInvoker := e.CommitteeInvoker(nsHash) nsCommitteeInvoker := e.CommitteeInvoker(nsHash)
nsPriv0Invoker := e.NewInvoker(nsHash, acc0) nsPriv0Invoker := e.NewInvoker(nsHash, acc0)
@ -212,7 +229,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
nsPriv0Invoker.Invoke(t, stackitem.Null{}, "setRecord", "neo.com", int64(nns.A), "1.2.3.4") // block #15 nsPriv0Invoker.Invoke(t, stackitem.Null{}, "setRecord", "neo.com", int64(nns.A), "1.2.3.4") // block #15
// Block #16: invoke `test_contract.go`: put new value with the same key to check `getstate` RPC call // Block #16: invoke `test_contract.go`: put new value with the same key to check `getstate` RPC call
txPutNewValue := rublPriv0Invoker.PrepareInvoke(t, "putValue", "testkey", "newtestvalue") // tx1 txPutNewValue := rublPriv0Invoker.PrepareInvoke(t, "putValue", "testkey", RublesNewTestvalue) // tx1
// Invoke `test_contract.go`: put values to check `findstates` RPC call. // Invoke `test_contract.go`: put values to check `findstates` RPC call.
txPut1 := rublPriv0Invoker.PrepareInvoke(t, "putValue", "aa", "v1") // tx2 txPut1 := rublPriv0Invoker.PrepareInvoke(t, "putValue", "aa", "v1") // tx2
txPut2 := rublPriv0Invoker.PrepareInvoke(t, "putValue", "aa10", "v2") // tx3 txPut2 := rublPriv0Invoker.PrepareInvoke(t, "putValue", "aa10", "v2") // tx3
@ -226,7 +243,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
// Block #17: deploy NeoFS Object contract (NEP11-Divisible). // Block #17: deploy NeoFS Object contract (NEP11-Divisible).
nfsPath := filepath.Join(examplesPrefix, "nft-d") nfsPath := filepath.Join(examplesPrefix, "nft-d")
nfsConfigPath := filepath.Join(nfsPath, "nft.yml") nfsConfigPath := filepath.Join(nfsPath, "nft.yml")
_, _, nfsHash := deployContractFromPriv0(t, nfsPath, nfsPath, nfsConfigPath, 5) // block #17 _, _, nfsHash := deployContractFromPriv0(t, nfsPath, nfsPath, nfsConfigPath, NFSOContractID) // block #17
nfsPriv0Invoker := e.NewInvoker(nfsHash, acc0) nfsPriv0Invoker := e.NewInvoker(nfsHash, acc0)
nfsPriv1Invoker := e.NewInvoker(nfsHash, acc1) nfsPriv1Invoker := e.NewInvoker(nfsHash, acc1)
@ -254,7 +271,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) {
// Block #22: deploy storage_contract (Storage contract for `traverseiterator` and `terminatesession` RPC calls test). // Block #22: deploy storage_contract (Storage contract for `traverseiterator` and `terminatesession` RPC calls test).
storagePath := filepath.Join(testDataPrefix, "storage", "storage_contract.go") storagePath := filepath.Join(testDataPrefix, "storage", "storage_contract.go")
storageCfg := filepath.Join(testDataPrefix, "storage", "storage_contract.yml") storageCfg := filepath.Join(testDataPrefix, "storage", "storage_contract.yml")
_, _, _ = deployContractFromPriv0(t, storagePath, "Storage", storageCfg, 6) _, _, _ = deployContractFromPriv0(t, storagePath, "Storage", storageCfg, StorageContractID)
// Compile contract to test `invokescript` RPC call // Compile contract to test `invokescript` RPC call
invokePath := filepath.Join(testDataPrefix, "invoke", "invokescript_contract.go") invokePath := filepath.Join(testDataPrefix, "invoke", "invokescript_contract.go")

View file

@ -36,6 +36,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest" "github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"github.com/nspcc-dev/neo-go/pkg/vm" "github.com/nspcc-dev/neo-go/pkg/vm"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate" "github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
@ -61,21 +62,30 @@ const (
defaultStateSyncInterval = 40000 defaultStateSyncInterval = 40000
) )
// stateJumpStage denotes the stage of state jump process. // stateChangeStage denotes the stage of state modification process.
type stateJumpStage byte type stateChangeStage byte
// A set of stages used to split state jump / state reset into atomic operations.
const ( const (
// none means that no state jump process was initiated yet. // none means that no state jump or state reset process was initiated yet.
none stateJumpStage = 1 << iota none stateChangeStage = 1 << iota
// stateJumpStarted means that state jump was just initiated, but outdated storage items // stateJumpStarted means that state jump was just initiated, but outdated storage items
// were not yet removed. // were not yet removed.
stateJumpStarted stateJumpStarted
// newStorageItemsAdded means that contract storage items are up-to-date with the current // newStorageItemsAdded means that contract storage items are up-to-date with the current
// state. // state.
newStorageItemsAdded newStorageItemsAdded
// genesisStateRemoved means that state corresponding to the genesis block was removed // staleBlocksRemoved means that state corresponding to the stale blocks (genesis block in
// from the storage. // in case of state jump) was removed from the storage.
genesisStateRemoved staleBlocksRemoved
// headersReset denotes stale SYS-prefixed and IX-prefixed information was removed from
// the storage (applicable to state reset only).
headersReset
// transfersReset denotes NEP transfers were successfully updated (applicable to state reset only).
transfersReset
// stateResetBit represents a bit identifier for state reset process. If this bit is not set, then
// it's an unfinished state jump.
stateResetBit byte = 1 << 7
) )
var ( var (
@ -151,6 +161,8 @@ type Blockchain struct {
// Stop synchronization mechanisms. // Stop synchronization mechanisms.
stopCh chan struct{} stopCh chan struct{}
runToExitCh chan struct{} runToExitCh chan struct{}
// isRunning denotes whether blockchain routines are currently running.
isRunning atomic.Value
memPool *mempool.Pool memPool *mempool.Pool
@ -302,6 +314,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
return nil, err return nil, err
} }
bc.isRunning.Store(false)
return bc, nil return bc, nil
} }
@ -448,22 +461,25 @@ func (bc *Blockchain) init() error {
} }
} }
// Check whether StateJump stage is in the storage and continue interrupted state jump if so. // Check whether StateChangeState stage is in the storage and continue interrupted state jump / state reset if so.
jumpStage, err := bc.dao.Store.Get([]byte{byte(storage.SYSStateJumpStage)}) stateChStage, err := bc.dao.Store.Get([]byte{byte(storage.SYSStateChangeStage)})
if err == nil { if err == nil {
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().RemoveUntraceableBlocks) { if len(stateChStage) != 1 {
return errors.New("state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on. " +
"To start an archival node drop the database manually and restart the node")
}
if len(jumpStage) != 1 {
return fmt.Errorf("invalid state jump stage format") return fmt.Errorf("invalid state jump stage format")
} }
// State jump wasn't finished yet, thus continue it. // State jump / state reset wasn't finished yet, thus continue it.
stateSyncPoint, err := bc.dao.GetStateSyncPoint() stateSyncPoint, err := bc.dao.GetStateSyncPoint()
if err != nil { if err != nil {
return fmt.Errorf("failed to get state sync point from the storage") return fmt.Errorf("failed to get state sync point from the storage")
} }
return bc.jumpToStateInternal(stateSyncPoint, stateJumpStage(jumpStage[0])) if (stateChStage[0] & stateResetBit) != 0 {
return bc.resetStateInternal(stateSyncPoint, stateChangeStage(stateChStage[0]&(^stateResetBit)))
}
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().RemoveUntraceableBlocks) {
return errors.New("state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on. " +
"To start an archival node drop the database manually and restart the node")
}
return bc.jumpToStateInternal(stateSyncPoint, stateChangeStage(stateChStage[0]))
} }
bHeight, err := bc.dao.GetCurrentBlockHeight() bHeight, err := bc.dao.GetCurrentBlockHeight()
@ -534,14 +550,14 @@ func (bc *Blockchain) jumpToState(p uint32) error {
// changes Blockchain state to the one specified by state sync point p and state // changes Blockchain state to the one specified by state sync point p and state
// jump stage. All the data needed for the jump must be in the DB, otherwise an // jump stage. All the data needed for the jump must be in the DB, otherwise an
// error is returned. It is not protected by mutex. // error is returned. It is not protected by mutex.
func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error { func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) error {
if p+1 >= uint32(len(bc.headerHashes)) { if p+1 >= uint32(len(bc.headerHashes)) {
return fmt.Errorf("invalid state sync point %d: headerHeignt is %d", p, len(bc.headerHashes)) return fmt.Errorf("invalid state sync point %d: headerHeignt is %d", p, len(bc.headerHashes))
} }
bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p)) bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p))
jumpStageKey := []byte{byte(storage.SYSStateJumpStage)} jumpStageKey := []byte{byte(storage.SYSStateChangeStage)}
switch stage { switch stage {
case none: case none:
bc.dao.Store.Put(jumpStageKey, []byte{byte(stateJumpStarted)}) bc.dao.Store.Put(jumpStageKey, []byte{byte(stateJumpStarted)})
@ -583,28 +599,24 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
}) })
} }
} }
cache.Store.Put(jumpStageKey, []byte{byte(genesisStateRemoved)}) // Update SYS-prefixed info.
_, err := cache.Persist() block, err := bc.dao.GetBlock(bc.headerHashes[p])
if err != nil {
return fmt.Errorf("failed to get current block: %w", err)
}
cache.StoreAsCurrentBlock(block)
cache.Store.Put(jumpStageKey, []byte{byte(staleBlocksRemoved)})
_, err = cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed to persist old items removal: %w", err) return fmt.Errorf("failed to persist old items removal: %w", err)
} }
case genesisStateRemoved: case staleBlocksRemoved:
// there's nothing to do after that, so just continue with common operations // there's nothing to do after that, so just continue with common operations
// and remove state jump stage in the end. // and remove state jump stage in the end.
default: default:
return errors.New("unknown state jump stage") return fmt.Errorf("unknown state jump stage: %d", stage)
} }
block, err := bc.dao.GetBlock(bc.headerHashes[p+1])
block, err := bc.dao.GetBlock(bc.headerHashes[p])
if err != nil {
return fmt.Errorf("failed to get current block: %w", err)
}
bc.dao.StoreAsCurrentBlock(block)
bc.topBlock.Store(block)
atomic.StoreUint32(&bc.blockHeight, p)
atomic.StoreUint32(&bc.persistedHeight, p)
block, err = bc.dao.GetBlock(bc.headerHashes[p+1])
if err != nil { if err != nil {
return fmt.Errorf("failed to get block to init MPT: %w", err) return fmt.Errorf("failed to get block to init MPT: %w", err)
} }
@ -613,18 +625,246 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
Root: block.PrevStateRoot, Root: block.PrevStateRoot,
}) })
bc.dao.Store.Delete(jumpStageKey)
err = bc.resetRAMState(p, false)
if err != nil {
return fmt.Errorf("failed to update in-memory blockchain data: %w", err)
}
return nil
}
// resetRAMState resets in-memory cached info.
func (bc *Blockchain) resetRAMState(height uint32, resetHeaders bool) error {
if resetHeaders {
bc.headerHashes = bc.headerHashes[:height+1]
bc.storedHeaderCount = height + 1
}
block, err := bc.dao.GetBlock(bc.headerHashes[height])
if err != nil {
return fmt.Errorf("failed to get current block: %w", err)
}
bc.topBlock.Store(block)
atomic.StoreUint32(&bc.blockHeight, height)
atomic.StoreUint32(&bc.persistedHeight, height)
err = bc.initializeNativeCache(block.Index, bc.dao) err = bc.initializeNativeCache(block.Index, bc.dao)
if err != nil { if err != nil {
return fmt.Errorf("failed to initialize natives cache: %w", err) return fmt.Errorf("failed to initialize natives cache: %w", err)
} }
if err := bc.updateExtensibleWhitelist(p); err != nil { if err := bc.updateExtensibleWhitelist(height); err != nil {
return fmt.Errorf("failed to update extensible whitelist: %w", err) return fmt.Errorf("failed to update extensible whitelist: %w", err)
} }
updateBlockHeightMetric(p) updateBlockHeightMetric(height)
return nil
}
bc.dao.Store.Delete(jumpStageKey) // Reset resets chain state to the specified height if possible. This method
// performs direct DB changes and can be called on non-running Blockchain only.
func (bc *Blockchain) Reset(height uint32) error {
if bc.isRunning.Load().(bool) {
return errors.New("can't reset state of the running blockchain")
}
bc.dao.PutStateSyncPoint(height)
return bc.resetStateInternal(height, none)
}
func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) error {
currHeight := bc.BlockHeight()
if height > currHeight {
return fmt.Errorf("current block height is %d, can't reset state to height %d", currHeight, height)
}
if height == currHeight && stage == none {
bc.log.Info("chain is already at the proper state", zap.Uint32("height", height))
return nil
}
if bc.config.KeepOnlyLatestState {
return fmt.Errorf("KeepOnlyLatestState is enabled, state for height %d is outdated and removed from the storage", height)
}
if bc.config.RemoveUntraceableBlocks && currHeight >= bc.config.MaxTraceableBlocks {
return fmt.Errorf("RemoveUntraceableBlocks is enabled, a necessary batch of traceable blocks has already been removed")
}
// Retrieve necessary state before the DB modification.
hHeight := bc.HeaderHeight()
b, err := bc.GetBlock(bc.headerHashes[height])
if err != nil {
return fmt.Errorf("failed to retrieve block %d: %w", height, err)
}
sr, err := bc.stateRoot.GetStateRoot(height)
if err != nil {
return fmt.Errorf("failed to retrieve stateroot for height %d: %w", height, err)
}
v := bc.dao.Version
cache := bc.dao // dao is MemCachedStore over DB, so use dao directly to persist cached changes right to the underlying DB
bc.log.Info("initialize state reset", zap.Uint32("target height", height))
start := time.Now()
p := start
resetStageKey := []byte{byte(storage.SYSStateChangeStage)}
switch stage {
case none:
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err)
}
fallthrough
case stateJumpStarted:
// Remove headers/blocks/transactions/aers from currHeight down to height (not including height itself).
for i := height + 1; i <= hHeight; i++ {
err := cache.PurgeBlock(bc.headerHashes[i])
if err != nil {
return fmt.Errorf("error while removing block %d: %w", i, err)
}
}
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist blocks, transactions ans AERs changes to the DB: %w", err)
}
bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("duration", time.Since(p)))
p = time.Now()
fallthrough
case staleBlocksRemoved:
// Completely remove contract IDs to update them later.
cache.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.STContractID)}}, func(k, _ []byte) bool {
cache.Store.Delete(k)
return true
})
// Reset contracts storage and store new contract IDs.
var mode = mpt.ModeAll
if bc.config.RemoveUntraceableBlocks {
mode |= mpt.ModeGCFlag
}
trieStore := mpt.NewTrieStore(sr.Root, mode, cache.Store)
oldStoragePrefix := v.StoragePrefix
newStoragePrefix := statesync.TemporaryPrefix(oldStoragePrefix)
mgmtCSPrefixLen := 1 + 4 + 1 // STStorage + Management ID + contract state prefix
mgmtContractPrefix := make([]byte, mgmtCSPrefixLen-1)
id := int32(native.ManagementContractID)
binary.BigEndian.PutUint32(mgmtContractPrefix, uint32(id))
mgmtContractPrefix[4] = native.PrefixContract
cs := new(state.Contract)
const persistBatchSize = 10000
var (
seekErr error
cnt int
)
trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool {
if cnt >= persistBatchSize {
_, seekErr = cache.Persist()
if seekErr != nil {
return false
}
}
// May safely omit KV copying.
k[0] = byte(newStoragePrefix)
cache.Store.Put(k, v)
// @fixme: remove this part after #2702.
if bytes.HasPrefix(k[1:], mgmtContractPrefix) {
var hash util.Uint160
copy(hash[:], k[mgmtCSPrefixLen:])
err = stackitem.DeserializeConvertible(v, cs)
if err != nil {
seekErr = fmt.Errorf("failed to deserialize contract state: %w", err)
}
cache.PutContractID(cs.ID, hash)
}
cnt++
return seekErr == nil
})
if seekErr != nil {
return fmt.Errorf("failed to reset contract IDs: %w", err)
}
trieStore.Close()
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err)
}
bc.log.Info("contracts storage and IDs are reset", zap.Duration("duration", time.Since(p)))
p = time.Now()
fallthrough
case newStorageItemsAdded:
// Reset SYS-prefixed and IX-prefixed information.
cache.DeleteHeaderHashes(height+1, headerBatchCount)
cache.StoreAsCurrentBlock(b)
cache.PutCurrentHeader(b.Hash(), height)
v.StoragePrefix = statesync.TemporaryPrefix(v.StoragePrefix)
cache.PutVersion(v)
bc.persistent.Version = v
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist headers changes to the DB: %w", err)
}
bc.log.Info("headers are reset", zap.Duration("duration", time.Since(p)))
p = time.Now()
fallthrough
case headersReset:
// Reset MPT.
err = bc.stateRoot.ResetState(height, cache.Store)
if err != nil {
return fmt.Errorf("failed to rollback MPT state: %w", err)
}
// Reset transfers.
err = bc.resetTransfers(cache, height)
if err != nil {
return fmt.Errorf("failed to strip transfer log / transfer info: %w", err)
}
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err)
}
bc.log.Info("MPT and transfers are reset", zap.Duration("duration", time.Since(p)))
fallthrough
case transfersReset:
// there's nothing to do after that, so just continue with common operations
// and remove state reset stage in the end.
default:
return fmt.Errorf("unknown state reset stage: %d", stage)
}
// Direct (cache-less) DB operation: remove stale storage items.
err = bc.store.SeekGC(storage.SeekRange{
Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))},
}, func(_, _ []byte) bool {
return false
})
if err != nil {
return fmt.Errorf("faield to remove stale storage items from DB: %w", err)
}
cache.Store.Delete(resetStageKey)
// Unlike the state jump, state sync point must be removed as we have complete state for this height.
cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist state reset stage to DAO: %w", err)
}
err = bc.resetRAMState(height, true)
if err != nil {
return fmt.Errorf("failed to update in-memory blockchain data: %w", err)
}
bc.log.Info("reset finished successfully", zap.Duration("duration", time.Since(start)))
return nil return nil
} }
@ -658,6 +898,7 @@ func (bc *Blockchain) initializeNativeCache(blockHeight uint32, d *dao.Simple) e
// Run runs chain loop, it needs to be run as goroutine and executing it is // Run runs chain loop, it needs to be run as goroutine and executing it is
// critical for correct Blockchain operation. // critical for correct Blockchain operation.
func (bc *Blockchain) Run() { func (bc *Blockchain) Run() {
bc.isRunning.Store(true)
persistTimer := time.NewTimer(persistInterval) persistTimer := time.NewTimer(persistInterval)
defer func() { defer func() {
persistTimer.Stop() persistTimer.Stop()
@ -667,6 +908,7 @@ func (bc *Blockchain) Run() {
if err := bc.dao.Store.Close(); err != nil { if err := bc.dao.Store.Close(); err != nil {
bc.log.Warn("failed to close db", zap.Error(err)) bc.log.Warn("failed to close db", zap.Error(err))
} }
bc.isRunning.Store(false)
close(bc.runToExitCh) close(bc.runToExitCh)
}() }()
go bc.notificationDispatcher() go bc.notificationDispatcher()
@ -729,6 +971,143 @@ func (bc *Blockchain) tryRunGC(oldHeight uint32) time.Duration {
return dur return dur
} }
// resetTransfers is a helper function that strips the top newest NEP17 and NEP11 transfer logs
// down to the given height (not including the height itself) and updates corresponding token
// transfer info.
func (bc *Blockchain) resetTransfers(cache *dao.Simple, height uint32) error {
// Completely remove transfer info, updating it takes too much effort. We'll gather new
// transfer info on-the-fly later.
cache.Store.Seek(storage.SeekRange{
Prefix: []byte{byte(storage.STTokenTransferInfo)},
}, func(k, v []byte) bool {
cache.Store.Delete(k)
return true
})
// Look inside each transfer batch and iterate over the batch transfers, picking those that
// not newer than the given height. Also, for each suitable transfer update transfer info
// flushing changes after complete account's transfers processing.
prefixes := []byte{byte(storage.STNEP11Transfers), byte(storage.STNEP17Transfers)}
for i := range prefixes {
var (
acc util.Uint160
trInfo *state.TokenTransferInfo
removeFollowing bool
seekErr error
)
cache.Store.Seek(storage.SeekRange{
Prefix: prefixes[i : i+1],
Backwards: false, // From oldest to newest batch.
}, func(k, v []byte) bool {
var batchAcc util.Uint160
copy(batchAcc[:], k[1:])
if batchAcc != acc { // Some new account we're iterating over.
if trInfo != nil {
seekErr = cache.PutTokenTransferInfo(acc, trInfo)
if seekErr != nil {
return false
}
}
acc = batchAcc
trInfo = nil
removeFollowing = false
} else if removeFollowing {
cache.Store.Delete(slice.Copy(k))
return seekErr == nil
}
r := io.NewBinReaderFromBuf(v[1:])
l := len(v)
bytesRead := 1 // 1 is for batch size byte which is read by default.
var (
oldBatchSize = v[0]
newBatchSize byte
)
for i := byte(0); i < v[0]; i++ { // From oldest to newest transfer of the batch.
var t *state.NEP17Transfer
if k[0] == byte(storage.STNEP11Transfers) {
tr := new(state.NEP11Transfer)
tr.DecodeBinary(r)
t = &tr.NEP17Transfer
} else {
t = new(state.NEP17Transfer)
t.DecodeBinary(r)
}
if r.Err != nil {
seekErr = fmt.Errorf("failed to decode subsequent transfer: %w", r.Err)
break
}
if t.Block > height {
break
}
bytesRead = l - r.Len() // Including batch size byte.
newBatchSize++
if trInfo == nil {
var err error
trInfo, err = cache.GetTokenTransferInfo(batchAcc)
if err != nil {
seekErr = fmt.Errorf("failed to retrieve token transfer info for %s: %w", batchAcc.StringLE(), r.Err)
return false
}
}
appendTokenTransferInfo(trInfo, t.Asset, t.Block, t.Timestamp, k[0] == byte(storage.STNEP11Transfers), newBatchSize >= state.TokenTransferBatchSize)
}
if newBatchSize == oldBatchSize {
// The batch is already in storage and doesn't need to be changed.
return seekErr == nil
}
if newBatchSize > 0 {
v[0] = newBatchSize
cache.Store.Put(k, v[:bytesRead])
} else {
cache.Store.Delete(k)
removeFollowing = true
}
return seekErr == nil
})
if seekErr != nil {
return seekErr
}
if trInfo != nil {
// Flush the last batch of transfer info changes.
err := cache.PutTokenTransferInfo(acc, trInfo)
if err != nil {
return err
}
}
}
return nil
}
// appendTokenTransferInfo is a helper for resetTransfers that updates token transfer info
// wrt the given transfer that was added to the subsequent transfer batch.
func appendTokenTransferInfo(transferData *state.TokenTransferInfo,
token int32, bIndex uint32, bTimestamp uint64, isNEP11 bool, lastTransferInBatch bool) {
var (
newBatch *bool
nextBatch *uint32
currTimestamp *uint64
)
if !isNEP11 {
newBatch = &transferData.NewNEP17Batch
nextBatch = &transferData.NextNEP17Batch
currTimestamp = &transferData.NextNEP17NewestTimestamp
} else {
newBatch = &transferData.NewNEP11Batch
nextBatch = &transferData.NextNEP11Batch
currTimestamp = &transferData.NextNEP11NewestTimestamp
}
transferData.LastUpdated[token] = bIndex
*newBatch = lastTransferInBatch
if *newBatch {
*nextBatch++
*currTimestamp = bTimestamp
}
}
func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration { func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration {
bc.log.Info("starting transfer data garbage collection", zap.Uint32("index", index)) bc.log.Info("starting transfer data garbage collection", zap.Uint32("index", index))
start := time.Now() start := time.Now()
@ -1473,34 +1852,29 @@ func appendTokenTransfer(cache *dao.Simple, transCache map[util.Uint160]transfer
} }
var ( var (
log *state.TokenTransferLog log *state.TokenTransferLog
newBatch *bool nextBatch uint32
nextBatch *uint32 currTimestamp uint64
currTimestamp *uint64
) )
if !isNEP11 { if !isNEP11 {
log = &transferData.Log17 log = &transferData.Log17
newBatch = &transferData.Info.NewNEP17Batch nextBatch = transferData.Info.NextNEP17Batch
nextBatch = &transferData.Info.NextNEP17Batch currTimestamp = transferData.Info.NextNEP17NewestTimestamp
currTimestamp = &transferData.Info.NextNEP17NewestTimestamp
} else { } else {
log = &transferData.Log11 log = &transferData.Log11
newBatch = &transferData.Info.NewNEP11Batch nextBatch = transferData.Info.NextNEP11Batch
nextBatch = &transferData.Info.NextNEP11Batch currTimestamp = transferData.Info.NextNEP11NewestTimestamp
currTimestamp = &transferData.Info.NextNEP11NewestTimestamp
} }
err := log.Append(transfer) err := log.Append(transfer)
if err != nil { if err != nil {
return err return err
} }
transferData.Info.LastUpdated[token] = bIndex newBatch := log.Size() >= state.TokenTransferBatchSize
*newBatch = log.Size() >= state.TokenTransferBatchSize if newBatch {
if *newBatch { cache.PutTokenTransferLog(addr, currTimestamp, nextBatch, isNEP11, log)
cache.PutTokenTransferLog(addr, *currTimestamp, *nextBatch, isNEP11, log)
*nextBatch++
*currTimestamp = bTimestamp
// Put makes a copy of it anyway. // Put makes a copy of it anyway.
log.Reset() log.Reset()
} }
appendTokenTransferInfo(&transferData.Info, token, bIndex, bTimestamp, isNEP11, newBatch)
transCache[addr] = transferData transCache[addr] = transferData
return nil return nil
} }

View file

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -202,14 +203,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
c.ProtocolConfiguration.KeepOnlyLatestState = true c.ProtocolConfiguration.KeepOnlyLatestState = true
} }
// manually store statejump stage to check statejump recover process // manually store statejump stage to check statejump recover process
bPrefix[0] = byte(storage.SYSStateJumpStage) bPrefix[0] = byte(storage.SYSStateChangeStage)
t.Run("invalid RemoveUntraceableBlocks setting", func(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
checkNewBlockchainErr(t, func(c *config.Config) {
boltCfg(c)
c.ProtocolConfiguration.RemoveUntraceableBlocks = false
}, bcSpout.dao.Store, "state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on")
})
t.Run("invalid state jump stage format", func(t *testing.T) { t.Run("invalid state jump stage format", func(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{0x01, 0x02}) bcSpout.dao.Store.Put(bPrefix, []byte{0x01, 0x02})
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "invalid state jump stage format") checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "invalid state jump stage format")
@ -218,6 +212,16 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)}) bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "failed to get state sync point from the storage") checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "failed to get state sync point from the storage")
}) })
t.Run("invalid RemoveUntraceableBlocks setting", func(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
point := make([]byte, 4)
binary.LittleEndian.PutUint32(point, uint32(stateSyncPoint))
bcSpout.dao.Store.Put([]byte{byte(storage.SYSStateSyncPoint)}, point)
checkNewBlockchainErr(t, func(c *config.Config) {
boltCfg(c)
c.ProtocolConfiguration.RemoveUntraceableBlocks = false
}, bcSpout.dao.Store, "state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on")
})
t.Run("invalid state sync point", func(t *testing.T) { t.Run("invalid state sync point", func(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)}) bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
point := make([]byte, 4) point := make([]byte, 4)
@ -225,7 +229,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
bcSpout.dao.Store.Put([]byte{byte(storage.SYSStateSyncPoint)}, point) bcSpout.dao.Store.Put([]byte{byte(storage.SYSStateSyncPoint)}, point)
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "invalid state sync point") checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "invalid state sync point")
}) })
for _, stage := range []stateJumpStage{stateJumpStarted, newStorageItemsAdded, genesisStateRemoved, 0x03} { for _, stage := range []stateChangeStage{stateJumpStarted, newStorageItemsAdded, staleBlocksRemoved, 0x03} {
t.Run(fmt.Sprintf("state jump stage %d", stage), func(t *testing.T) { t.Run(fmt.Sprintf("state jump stage %d", stage), func(t *testing.T) {
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stage)}) bcSpout.dao.Store.Put(bPrefix, []byte{byte(stage)})
point := make([]byte, 4) point := make([]byte, 4)
@ -340,3 +344,20 @@ func TestBlockchain_BaseExecFeeBaseStoragePrice_Compat(t *testing.T) {
check(t) check(t)
}) })
} }
func TestBlockchain_IsRunning(t *testing.T) {
chain := initTestChain(t, nil, nil)
require.False(t, chain.isRunning.Load().(bool))
oldPersisted := atomic.LoadUint32(&chain.persistedHeight)
go chain.Run()
require.NoError(t, chain.AddBlock(chain.newBlock()))
require.Eventually(t, func() bool {
persisted := atomic.LoadUint32(&chain.persistedHeight)
return persisted > oldPersisted
}, 2*persistInterval, 100*time.Millisecond)
require.True(t, chain.isRunning.Load().(bool))
chain.Close()
require.False(t, chain.isRunning.Load().(bool))
}

View file

@ -1897,3 +1897,220 @@ func TestBlockchain_Bug1728(t *testing.T) {
c := neotest.CompileSource(t, acc.ScriptHash(), strings.NewReader(src), &compiler.Options{Name: "TestContract"}) c := neotest.CompileSource(t, acc.ScriptHash(), strings.NewReader(src), &compiler.Options{Name: "TestContract"})
managementInvoker.DeployContract(t, c, nil) managementInvoker.DeployContract(t, c, nil)
} }
func TestBlockchain_ResetStateErrors(t *testing.T) {
chainHeight := 3
checkResetErr := func(t *testing.T, cfg func(c *config.ProtocolConfiguration), h uint32, errText string) {
db, path := newLevelDBForTestingWithPath(t, t.TempDir())
bc, validators, committee := chain.NewMultiWithCustomConfigAndStore(t, cfg, db, false)
e := neotest.NewExecutor(t, bc, validators, committee)
go bc.Run()
for i := 0; i < chainHeight; i++ {
e.AddNewBlock(t) // get some height
}
bc.Close()
db, _ = newLevelDBForTestingWithPath(t, path)
defer db.Close()
bc, _, _ = chain.NewMultiWithCustomConfigAndStore(t, cfg, db, false)
err := bc.Reset(h)
if errText != "" {
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), errText), err)
} else {
require.NoError(t, err)
}
}
t.Run("large height", func(t *testing.T) {
checkResetErr(t, nil, uint32(chainHeight+1), "can't reset state to height 4")
})
t.Run("already at height", func(t *testing.T) {
checkResetErr(t, nil, uint32(chainHeight), "")
})
t.Run("KeepOnlyLatestState is enabled", func(t *testing.T) {
checkResetErr(t, func(c *config.ProtocolConfiguration) {
c.KeepOnlyLatestState = true
}, uint32(chainHeight-1), "KeepOnlyLatestState is enabled")
})
t.Run("some blocks where removed", func(t *testing.T) {
checkResetErr(t, func(c *config.ProtocolConfiguration) {
c.RemoveUntraceableBlocks = true
c.MaxTraceableBlocks = 2
}, uint32(chainHeight-3), "RemoveUntraceableBlocks is enabled, a necessary batch of traceable blocks has already been removed")
})
}
// TestBlockchain_ResetState is based on knowledge about basic chain transactions,
// it performs basic chain reset and checks that reset chain has proper state.
func TestBlockchain_ResetState(t *testing.T) {
// Create the DB.
db, path := newLevelDBForTestingWithPath(t, t.TempDir())
bc, validators, committee := chain.NewMultiWithCustomConfigAndStore(t, func(cfg *config.ProtocolConfiguration) {
cfg.P2PSigExtensions = true
}, db, false)
go bc.Run()
e := neotest.NewExecutor(t, bc, validators, committee)
basicchain.Init(t, "../../", e)
// Gather some reference information.
resetBlockIndex := uint32(15)
staleID := basicchain.NFSOContractID // NEP11
rublesH := e.ContractHash(t, basicchain.RublesContractID)
nnsH := e.ContractHash(t, basicchain.NNSContractID)
staleH := e.ContractHash(t, staleID)
gasH := e.NativeHash(t, nativenames.Gas)
neoH := e.NativeHash(t, nativenames.Neo)
gasID := e.NativeID(t, nativenames.Gas)
neoID := e.NativeID(t, nativenames.Neo)
resetBlockHash := bc.GetHeaderHash(int(resetBlockIndex))
resetBlockHeader, err := bc.GetHeader(resetBlockHash)
require.NoError(t, err)
topBlockHeight := bc.BlockHeight()
topBH := bc.GetHeaderHash(int(bc.BlockHeight()))
staleBH := bc.GetHeaderHash(int(resetBlockIndex + 1))
staleB, err := bc.GetBlock(staleBH)
require.NoError(t, err)
staleTx := staleB.Transactions[0]
_, err = bc.GetAppExecResults(staleTx.Hash(), trigger.Application)
require.NoError(t, err)
sr, err := bc.GetStateModule().GetStateRoot(resetBlockIndex)
require.NoError(t, err)
staleSR, err := bc.GetStateModule().GetStateRoot(resetBlockIndex + 1)
require.NoError(t, err)
rublesKey := []byte("testkey")
rublesStaleKey := []byte("aa")
rublesStaleValue := bc.GetStorageItem(basicchain.RublesContractID, rublesKey) // check value is there
require.Equal(t, []byte(basicchain.RublesNewTestvalue), []byte(rublesStaleValue))
acc0 := e.Validator.(neotest.MultiSigner).Single(2) // priv0 index->order and order->index conversion
priv0ScriptHash := acc0.ScriptHash()
var (
expectedNEP11t []*state.NEP11Transfer
expectedNEP17t []*state.NEP17Transfer
)
require.NoError(t, bc.ForEachNEP11Transfer(priv0ScriptHash, resetBlockHeader.Timestamp, func(t *state.NEP11Transfer) (bool, error) {
if t.Block <= resetBlockIndex {
expectedNEP11t = append(expectedNEP11t, t)
}
return true, nil
}))
require.NoError(t, bc.ForEachNEP17Transfer(priv0ScriptHash, resetBlockHeader.Timestamp, func(t *state.NEP17Transfer) (bool, error) {
if t.Block <= resetBlockIndex {
expectedNEP17t = append(expectedNEP17t, t)
}
return true, nil
}))
// checkProof checks that some stale proof is reachable
checkProof := func() {
rublesStaleFullKey := make([]byte, 4)
binary.LittleEndian.PutUint32(rublesStaleFullKey, uint32(basicchain.RublesContractID))
rublesStaleFullKey = append(rublesStaleFullKey, rublesStaleKey...)
proof, err := bc.GetStateModule().GetStateProof(staleSR.Root, rublesStaleFullKey)
require.NoError(t, err)
require.NotEmpty(t, proof)
}
checkProof()
// Ensure all changes were persisted.
bc.Close()
// Start new chain with existing DB, but do not run it.
db, _ = newLevelDBForTestingWithPath(t, path)
bc, _, _ = chain.NewMultiWithCustomConfigAndStore(t, func(cfg *config.ProtocolConfiguration) {
cfg.P2PSigExtensions = true
}, db, false)
defer db.Close()
require.Equal(t, topBlockHeight, bc.BlockHeight()) // ensure DB was properly initialized.
// Reset state.
require.NoError(t, bc.Reset(resetBlockIndex))
// Check that state was properly reset.
require.Equal(t, resetBlockIndex, bc.BlockHeight())
require.Equal(t, resetBlockIndex, bc.HeaderHeight())
require.Equal(t, resetBlockHash, bc.CurrentHeaderHash())
require.Equal(t, resetBlockHash, bc.CurrentBlockHash())
require.Equal(t, resetBlockIndex, bc.GetStateModule().CurrentLocalHeight())
require.Equal(t, sr.Root, bc.GetStateModule().CurrentLocalStateRoot())
require.Equal(t, uint32(0), bc.GetStateModule().CurrentValidatedHeight())
// Try to get the latest block\header.
bh := bc.GetHeaderHash(int(resetBlockIndex))
require.Equal(t, resetBlockHash, bh)
h, err := bc.GetHeader(bh)
require.NoError(t, err)
require.Equal(t, resetBlockHeader, h)
actualRublesHash, err := bc.GetContractScriptHash(basicchain.RublesContractID)
require.NoError(t, err)
require.Equal(t, rublesH, actualRublesHash)
// Check that stale blocks/headers/txs/aers/sr are not reachable.
for i := resetBlockIndex + 1; i <= topBlockHeight; i++ {
hHash := bc.GetHeaderHash(int(i))
require.Equal(t, util.Uint256{}, hHash)
_, err = bc.GetStateRoot(i)
require.Error(t, err)
}
for _, h := range []util.Uint256{staleBH, topBH} {
_, err = bc.GetHeader(h)
require.Error(t, err)
_, err = bc.GetHeader(h)
require.Error(t, err)
}
_, _, err = bc.GetTransaction(staleTx.Hash())
require.Error(t, err)
_, err = bc.GetAppExecResults(staleTx.Hash(), trigger.Application)
require.Error(t, err)
// However, proofs and everything related to stale MPT nodes still should work properly,
// because we don't remove stale MPT nodes.
checkProof()
// Check NEP-compatible contracts.
nep11 := bc.GetNEP11Contracts()
require.Equal(t, 1, len(nep11)) // NNS
require.Equal(t, nnsH, nep11[0])
nep17 := bc.GetNEP17Contracts()
require.Equal(t, 3, len(nep17)) // Neo, Gas, Rubles
require.ElementsMatch(t, []util.Uint160{gasH, neoH, rublesH}, nep17)
// Retrieve stale contract.
cs := bc.GetContractState(staleH)
require.Nil(t, cs)
// Retrieve stale storage item.
rublesValue := bc.GetStorageItem(basicchain.RublesContractID, rublesKey)
require.Equal(t, []byte(basicchain.RublesOldTestvalue), []byte(rublesValue)) // the one with historic state
require.Nil(t, bc.GetStorageItem(basicchain.RublesContractID, rublesStaleKey)) // the one that was added after target reset block
db.Seek(storage.SeekRange{
Prefix: []byte{byte(storage.STStorage)}, // no items with old prefix
}, func(k, v []byte) bool {
t.Fatal("no stale items must be left in storage")
return false
})
// Check transfers.
var (
actualNEP11t []*state.NEP11Transfer
actualNEP17t []*state.NEP17Transfer
)
require.NoError(t, bc.ForEachNEP11Transfer(priv0ScriptHash, e.TopBlock(t).Timestamp, func(t *state.NEP11Transfer) (bool, error) {
actualNEP11t = append(actualNEP11t, t)
return true, nil
}))
require.NoError(t, bc.ForEachNEP17Transfer(priv0ScriptHash, e.TopBlock(t).Timestamp, func(t *state.NEP17Transfer) (bool, error) {
actualNEP17t = append(actualNEP17t, t)
return true, nil
}))
assert.Equal(t, expectedNEP11t, actualNEP11t)
assert.Equal(t, expectedNEP17t, actualNEP17t)
lub, err := bc.GetTokenLastUpdated(priv0ScriptHash)
require.NoError(t, err)
expectedLUB := map[int32]uint32{ // this information is extracted from basic chain initialization code
basicchain.NNSContractID: resetBlockIndex - 1, // `neo.com` registration
basicchain.RublesContractID: 6, // transfer of 123 RUR to priv1
gasID: resetBlockIndex, // fee for `1.2.3.4` A record registration
neoID: 4, // transfer of 1000 NEO to priv1
}
require.Equal(t, expectedLUB, lub)
}

View file

@ -603,6 +603,26 @@ func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
return hashes, seekErr return hashes, seekErr
} }
// DeleteHeaderHashes removes batches of header hashes starting from the one that
// contains header with index `since` up to the most recent batch. It assumes that
// all stored batches contain `batchSize` hashes.
func (dao *Simple) DeleteHeaderHashes(since uint32, batchSize int) {
dao.Store.Seek(storage.SeekRange{
Prefix: dao.mkKeyPrefix(storage.IXHeaderHashList),
Backwards: true,
}, func(k, _ []byte) bool {
first := binary.BigEndian.Uint32(k[1:])
if first >= since {
dao.Store.Delete(k)
return first != since
}
if first+uint32(batchSize)-1 >= since {
dao.Store.Delete(k)
}
return false
})
}
// GetTransaction returns Transaction and its height by the given hash // GetTransaction returns Transaction and its height by the given hash
// if it exists in the store. It does not return dummy transactions. // if it exists in the store. It does not return dummy transactions.
func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) { func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) {
@ -739,6 +759,17 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a
// DeleteBlock removes the block from dao. It's not atomic, so make sure you're // DeleteBlock removes the block from dao. It's not atomic, so make sure you're
// using private MemCached instance here. // using private MemCached instance here.
func (dao *Simple) DeleteBlock(h util.Uint256) error { func (dao *Simple) DeleteBlock(h util.Uint256) error {
return dao.deleteBlock(h, true)
}
// PurgeBlock completely removes specified block (or just block header) from dao.
// It differs from DeleteBlock in that it removes header anyway. It's not atomic,
// so make sure you're using private MemCached instance here.
func (dao *Simple) PurgeBlock(h util.Uint256) error {
return dao.deleteBlock(h, false)
}
func (dao *Simple) deleteBlock(h util.Uint256, keepHeader bool) error {
key := dao.makeExecutableKey(h) key := dao.makeExecutableKey(h)
b, err := dao.getBlock(key) b, err := dao.getBlock(key)
@ -746,9 +777,13 @@ func (dao *Simple) DeleteBlock(h util.Uint256) error {
return err return err
} }
err = dao.storeHeader(key, &b.Header) if keepHeader {
if err != nil { err = dao.storeHeader(key, &b.Header)
return err if err != nil {
return err
}
} else {
dao.Store.Delete(key)
} }
for _, tx := range b.Transactions { for _, tx := range b.Transactions {

View file

@ -43,7 +43,8 @@ type ManagementCache struct {
const ( const (
ManagementContractID = -1 ManagementContractID = -1
prefixContract = 8 // PrefixContract is a prefix used to store contract states inside Management native contract.
PrefixContract = 8
defaultMinimumDeploymentFee = 10_00000000 defaultMinimumDeploymentFee = 10_00000000
contractDeployNotificationName = "Deploy" contractDeployNotificationName = "Deploy"
@ -87,7 +88,7 @@ func (c *ManagementCache) Copy() dao.NativeContractCache {
// MakeContractKey creates a key from the account script hash. // MakeContractKey creates a key from the account script hash.
func MakeContractKey(h util.Uint160) []byte { func MakeContractKey(h util.Uint160) []byte {
return makeUint160Key(prefixContract, h) return makeUint160Key(PrefixContract, h)
} }
// newManagement creates a new Management native contract. // newManagement creates a new Management native contract.
@ -539,7 +540,7 @@ func (m *Management) InitializeCache(d *dao.Simple) error {
} }
var initErr error var initErr error
d.Seek(m.ID, storage.SeekRange{Prefix: []byte{prefixContract}}, func(_, v []byte) bool { d.Seek(m.ID, storage.SeekRange{Prefix: []byte{PrefixContract}}, func(_, v []byte) bool {
var cs = new(state.Contract) var cs = new(state.Contract)
initErr = stackitem.DeserializeConvertible(v, cs) initErr = stackitem.DeserializeConvertible(v, cs)
if initErr != nil { if initErr != nil {

View file

@ -81,7 +81,7 @@ func TestManagement_Initialize(t *testing.T) {
t.Run("invalid contract state", func(t *testing.T) { t.Run("invalid contract state", func(t *testing.T) {
d := dao.NewSimple(storage.NewMemoryStore(), false, false) d := dao.NewSimple(storage.NewMemoryStore(), false, false)
mgmt := newManagement() mgmt := newManagement()
d.PutStorageItem(mgmt.ID, []byte{prefixContract}, state.StorageItem{0xFF}) d.PutStorageItem(mgmt.ID, []byte{PrefixContract}, state.StorageItem{0xFF})
require.Error(t, mgmt.InitializeCache(d)) require.Error(t, mgmt.InitializeCache(d))
}) })
} }

View file

@ -51,7 +51,7 @@ type NEP11Transfer struct {
// block trackers along with the information about NEP-17 and NEP-11 transfer batch. // block trackers along with the information about NEP-17 and NEP-11 transfer batch.
type TokenTransferInfo struct { type TokenTransferInfo struct {
LastUpdated map[int32]uint32 LastUpdated map[int32]uint32
// NextNEP11Batch stores the index of the next NEP-17 transfer batch. // NextNEP11Batch stores the index of the next NEP-11 transfer batch.
NextNEP11Batch uint32 NextNEP11Batch uint32
// NextNEP17Batch stores the index of the next NEP-17 transfer batch. // NextNEP17Batch stores the index of the next NEP-17 transfer batch.
NextNEP17Batch uint32 NextNEP17Batch uint32

View file

@ -16,6 +16,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
@ -170,7 +171,7 @@ func (s *Module) Init(height uint32) error {
// CleanStorage removes all MPT-related data from the storage (MPT nodes, validated stateroots) // CleanStorage removes all MPT-related data from the storage (MPT nodes, validated stateroots)
// except local stateroot for the current height and GC flag. This method is aimed to clean // except local stateroot for the current height and GC flag. This method is aimed to clean
// outdated MPT data before state sync process can be started. // outdated MPT data before state sync process can be started.
// Note: this method is aimed to be called for genesis block only, an error is returned otherwice. // Note: this method is aimed to be called for genesis block only, an error is returned otherwise.
func (s *Module) CleanStorage() error { func (s *Module) CleanStorage() error {
if s.localHeight.Load() != 0 { if s.localHeight.Load() != 0 {
return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load()) return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load())
@ -202,6 +203,67 @@ func (s *Module) JumpToState(sr *state.MPTRoot) {
s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), s.mode, s.Store) s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), s.mode, s.Store)
} }
// ResetState resets MPT state to the given height.
func (s *Module) ResetState(height uint32, cache *storage.MemCachedStore) error {
// Update local stateroot.
sr, err := s.GetStateRoot(height)
if err != nil {
return fmt.Errorf("failed to retrieve state root for height %d: %w", height, err)
}
s.addLocalStateRoot(cache, sr)
// Remove all stateroots newer than the given height.
srKey := makeStateRootKey(height)
var srSeen bool
cache.Seek(storage.SeekRange{
Prefix: srKey[0:1],
Start: srKey[1:5],
Backwards: false,
}, func(k, v []byte) bool {
if len(k) == 5 {
if srSeen {
cache.Delete(k)
} else if bytes.Equal(k, srKey) {
srSeen = true
}
}
return true
})
// Retrieve the most recent validated stateroot before the given height.
witnessesLenOffset := 1 /* version */ + 4 /* index */ + smartcontract.Hash256Len /* root */
var validated *uint32
cache.Seek(storage.SeekRange{
Prefix: srKey[0:1],
Start: srKey[1:5],
Backwards: true,
}, func(k, v []byte) bool {
if len(k) == 5 {
if len(v) > witnessesLenOffset && v[witnessesLenOffset] != 0 {
i := binary.BigEndian.Uint32(k[1:])
validated = &i
return false
}
}
return true
})
if validated != nil {
validatedBytes := make([]byte, 4)
binary.LittleEndian.PutUint32(validatedBytes, *validated)
cache.Put([]byte{byte(storage.DataMPTAux), prefixValidated}, validatedBytes)
s.validatedHeight.Store(*validated)
} else {
cache.Delete([]byte{byte(storage.DataMPTAux), prefixValidated})
}
s.currentLocal.Store(sr.Root)
s.localHeight.Store(sr.Index)
s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), s.mode, s.Store)
// Do not reset MPT nodes, leave the trie state itself as is.
return nil
}
// GC performs garbage collection. // GC performs garbage collection.
func (s *Module) GC(index uint32, store storage.Store) time.Duration { func (s *Module) GC(index uint32, store storage.Store) time.Duration {
if !s.mode.GC() { if !s.mode.GC() {

View file

@ -32,8 +32,13 @@ const (
SYSCurrentHeader KeyPrefix = 0xc1 SYSCurrentHeader KeyPrefix = 0xc1
SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2 SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2
SYSStateSyncPoint KeyPrefix = 0xc3 SYSStateSyncPoint KeyPrefix = 0xc3
SYSStateJumpStage KeyPrefix = 0xc4 // SYSStateChangeStage is used to store the phase of a state changing process
SYSVersion KeyPrefix = 0xf0 // which is one of the state jump or state reset. Its value is one byte containing
// state reset / state jump stages bits (first seven bits are reserved for that)
// and the last bit reserved for the state reset process marker (set to 1 on
// unfinished state reset and to 0 on unfinished state jump).
SYSStateChangeStage KeyPrefix = 0xc4
SYSVersion KeyPrefix = 0xf0
) )
// Executable subtypes. // Executable subtypes.