diff --git a/internal/basicchain/basic.go b/internal/basicchain/basic.go index fc18d1153..5dbc6844f 100644 --- a/internal/basicchain/basic.go +++ b/internal/basicchain/basic.go @@ -23,6 +23,23 @@ import ( const neoAmount = 99999000 +// Various contract IDs that were deployed to basic chain. +const ( + RublesContractID = int32(1) + VerifyContractID = int32(2) + VerifyWithArgsContractID = int32(3) + NNSContractID = int32(4) + NFSOContractID = int32(5) + StorageContractID = int32(6) +) + +const ( + // RublesOldTestvalue is a value initially stored by `testkey` key inside Rubles contract. + RublesOldTestvalue = "testvalue" + // RublesNewTestvalue is an updated value of Rubles' storage item with `testkey` key. + RublesNewTestvalue = "newtestvalue" +) + // InitSimple initializes chain with simple contracts from 'examples' folder. // It's not as complicated as chain got after Init and may be used for tests where // chain with a small amount of data is needed and for historical functionality testing. @@ -138,13 +155,13 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) { // Block #2: deploy test_contract (Rubles contract). cfgPath := filepath.Join(testDataPrefix, "test_contract.yml") - block2H, txDeployH, cHash := deployContractFromPriv0(t, filepath.Join(testDataPrefix, "test_contract.go"), "Rubl", cfgPath, 1) + block2H, txDeployH, cHash := deployContractFromPriv0(t, filepath.Join(testDataPrefix, "test_contract.go"), "Rubl", cfgPath, RublesContractID) t.Logf("txDeploy: %s", txDeployH.StringLE()) t.Logf("Block2 hash: %s", block2H.StringLE()) // Block #3: invoke `putValue` method on the test_contract. rublPriv0Invoker := e.NewInvoker(cHash, acc0) - txInvH := rublPriv0Invoker.Invoke(t, true, "putValue", "testkey", "testvalue") + txInvH := rublPriv0Invoker.Invoke(t, true, "putValue", "testkey", RublesOldTestvalue) t.Logf("txInv: %s", txInvH.StringLE()) // Block #4: transfer 1000 NEO from priv0 to priv1. @@ -166,7 +183,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) { // Block #7: push verification contract into the chain. verifyPath := filepath.Join(testDataPrefix, "verify", "verification_contract.go") verifyCfg := filepath.Join(testDataPrefix, "verify", "verification_contract.yml") - _, _, _ = deployContractFromPriv0(t, verifyPath, "Verify", verifyCfg, 2) + _, _, _ = deployContractFromPriv0(t, verifyPath, "Verify", verifyCfg, VerifyContractID) // Block #8: deposit some GAS to notary contract for priv0. transferTxH = gasPriv0Invoker.Invoke(t, true, "transfer", priv0ScriptHash, notaryHash, 10_0000_0000, []interface{}{priv0ScriptHash, int64(e.Chain.BlockHeight() + 1000)}) @@ -183,12 +200,12 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) { // Block #10: push verification contract with arguments into the chain. verifyPath = filepath.Join(testDataPrefix, "verify_args", "verification_with_args_contract.go") verifyCfg = filepath.Join(testDataPrefix, "verify_args", "verification_with_args_contract.yml") - _, _, _ = deployContractFromPriv0(t, verifyPath, "VerifyWithArgs", verifyCfg, 3) // block #10 + _, _, _ = deployContractFromPriv0(t, verifyPath, "VerifyWithArgs", verifyCfg, VerifyWithArgsContractID) // block #10 // Block #11: push NameService contract into the chain. nsPath := filepath.Join(examplesPrefix, "nft-nd-nns") nsConfigPath := filepath.Join(nsPath, "nns.yml") - _, _, nsHash := deployContractFromPriv0(t, nsPath, nsPath, nsConfigPath, 4) // block #11 + _, _, nsHash := deployContractFromPriv0(t, nsPath, nsPath, nsConfigPath, NNSContractID) // block #11 nsCommitteeInvoker := e.CommitteeInvoker(nsHash) nsPriv0Invoker := e.NewInvoker(nsHash, acc0) @@ -212,7 +229,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) { nsPriv0Invoker.Invoke(t, stackitem.Null{}, "setRecord", "neo.com", int64(nns.A), "1.2.3.4") // block #15 // Block #16: invoke `test_contract.go`: put new value with the same key to check `getstate` RPC call - txPutNewValue := rublPriv0Invoker.PrepareInvoke(t, "putValue", "testkey", "newtestvalue") // tx1 + txPutNewValue := rublPriv0Invoker.PrepareInvoke(t, "putValue", "testkey", RublesNewTestvalue) // tx1 // Invoke `test_contract.go`: put values to check `findstates` RPC call. txPut1 := rublPriv0Invoker.PrepareInvoke(t, "putValue", "aa", "v1") // tx2 txPut2 := rublPriv0Invoker.PrepareInvoke(t, "putValue", "aa10", "v2") // tx3 @@ -226,7 +243,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) { // Block #17: deploy NeoFS Object contract (NEP11-Divisible). nfsPath := filepath.Join(examplesPrefix, "nft-d") nfsConfigPath := filepath.Join(nfsPath, "nft.yml") - _, _, nfsHash := deployContractFromPriv0(t, nfsPath, nfsPath, nfsConfigPath, 5) // block #17 + _, _, nfsHash := deployContractFromPriv0(t, nfsPath, nfsPath, nfsConfigPath, NFSOContractID) // block #17 nfsPriv0Invoker := e.NewInvoker(nfsHash, acc0) nfsPriv1Invoker := e.NewInvoker(nfsHash, acc1) @@ -254,7 +271,7 @@ func Init(t *testing.T, rootpath string, e *neotest.Executor) { // Block #22: deploy storage_contract (Storage contract for `traverseiterator` and `terminatesession` RPC calls test). storagePath := filepath.Join(testDataPrefix, "storage", "storage_contract.go") storageCfg := filepath.Join(testDataPrefix, "storage", "storage_contract.yml") - _, _, _ = deployContractFromPriv0(t, storagePath, "Storage", storageCfg, 6) + _, _, _ = deployContractFromPriv0(t, storagePath, "Storage", storageCfg, StorageContractID) // Compile contract to test `invokescript` RPC call invokePath := filepath.Join(testDataPrefix, "invoke", "invokescript_contract.go") diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index d913847b2..ebf2d22f2 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -36,6 +36,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/vm" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" @@ -61,21 +62,30 @@ const ( defaultStateSyncInterval = 40000 ) -// stateJumpStage denotes the stage of state jump process. -type stateJumpStage byte +// stateChangeStage denotes the stage of state modification process. +type stateChangeStage byte +// A set of stages used to split state jump / state reset into atomic operations. const ( - // none means that no state jump process was initiated yet. - none stateJumpStage = 1 << iota + // none means that no state jump or state reset process was initiated yet. + none stateChangeStage = 1 << iota // stateJumpStarted means that state jump was just initiated, but outdated storage items // were not yet removed. stateJumpStarted // newStorageItemsAdded means that contract storage items are up-to-date with the current // state. newStorageItemsAdded - // genesisStateRemoved means that state corresponding to the genesis block was removed - // from the storage. - genesisStateRemoved + // staleBlocksRemoved means that state corresponding to the stale blocks (genesis block in + // in case of state jump) was removed from the storage. + staleBlocksRemoved + // headersReset denotes stale SYS-prefixed and IX-prefixed information was removed from + // the storage (applicable to state reset only). + headersReset + // transfersReset denotes NEP transfers were successfully updated (applicable to state reset only). + transfersReset + // stateResetBit represents a bit identifier for state reset process. If this bit is not set, then + // it's an unfinished state jump. + stateResetBit byte = 1 << 7 ) var ( @@ -451,22 +461,25 @@ func (bc *Blockchain) init() error { } } - // Check whether StateJump stage is in the storage and continue interrupted state jump if so. - jumpStage, err := bc.dao.Store.Get([]byte{byte(storage.SYSStateJumpStage)}) + // Check whether StateChangeState stage is in the storage and continue interrupted state jump / state reset if so. + stateChStage, err := bc.dao.Store.Get([]byte{byte(storage.SYSStateChangeStage)}) if err == nil { - if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().RemoveUntraceableBlocks) { - return errors.New("state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on. " + - "To start an archival node drop the database manually and restart the node") - } - if len(jumpStage) != 1 { + if len(stateChStage) != 1 { return fmt.Errorf("invalid state jump stage format") } - // State jump wasn't finished yet, thus continue it. + // State jump / state reset wasn't finished yet, thus continue it. stateSyncPoint, err := bc.dao.GetStateSyncPoint() if err != nil { return fmt.Errorf("failed to get state sync point from the storage") } - return bc.jumpToStateInternal(stateSyncPoint, stateJumpStage(jumpStage[0])) + if (stateChStage[0] & stateResetBit) != 0 { + return bc.resetStateInternal(stateSyncPoint, stateChangeStage(stateChStage[0]&(^stateResetBit))) + } + if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().RemoveUntraceableBlocks) { + return errors.New("state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on. " + + "To start an archival node drop the database manually and restart the node") + } + return bc.jumpToStateInternal(stateSyncPoint, stateChangeStage(stateChStage[0])) } bHeight, err := bc.dao.GetCurrentBlockHeight() @@ -537,14 +550,14 @@ func (bc *Blockchain) jumpToState(p uint32) error { // changes Blockchain state to the one specified by state sync point p and state // jump stage. All the data needed for the jump must be in the DB, otherwise an // error is returned. It is not protected by mutex. -func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error { +func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) error { if p+1 >= uint32(len(bc.headerHashes)) { return fmt.Errorf("invalid state sync point %d: headerHeignt is %d", p, len(bc.headerHashes)) } bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p)) - jumpStageKey := []byte{byte(storage.SYSStateJumpStage)} + jumpStageKey := []byte{byte(storage.SYSStateChangeStage)} switch stage { case none: bc.dao.Store.Put(jumpStageKey, []byte{byte(stateJumpStarted)}) @@ -586,28 +599,24 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error }) } } - cache.Store.Put(jumpStageKey, []byte{byte(genesisStateRemoved)}) - _, err := cache.Persist() + // Update SYS-prefixed info. + block, err := bc.dao.GetBlock(bc.headerHashes[p]) + if err != nil { + return fmt.Errorf("failed to get current block: %w", err) + } + cache.StoreAsCurrentBlock(block) + cache.Store.Put(jumpStageKey, []byte{byte(staleBlocksRemoved)}) + _, err = cache.Persist() if err != nil { return fmt.Errorf("failed to persist old items removal: %w", err) } - case genesisStateRemoved: + case staleBlocksRemoved: // there's nothing to do after that, so just continue with common operations // and remove state jump stage in the end. default: - return errors.New("unknown state jump stage") + return fmt.Errorf("unknown state jump stage: %d", stage) } - - block, err := bc.dao.GetBlock(bc.headerHashes[p]) - if err != nil { - return fmt.Errorf("failed to get current block: %w", err) - } - bc.dao.StoreAsCurrentBlock(block) - bc.topBlock.Store(block) - atomic.StoreUint32(&bc.blockHeight, p) - atomic.StoreUint32(&bc.persistedHeight, p) - - block, err = bc.dao.GetBlock(bc.headerHashes[p+1]) + block, err := bc.dao.GetBlock(bc.headerHashes[p+1]) if err != nil { return fmt.Errorf("failed to get block to init MPT: %w", err) } @@ -616,18 +625,246 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error Root: block.PrevStateRoot, }) + bc.dao.Store.Delete(jumpStageKey) + + err = bc.resetRAMState(p, false) + if err != nil { + return fmt.Errorf("failed to update in-memory blockchain data: %w", err) + } + return nil +} + +// resetRAMState resets in-memory cached info. +func (bc *Blockchain) resetRAMState(height uint32, resetHeaders bool) error { + if resetHeaders { + bc.headerHashes = bc.headerHashes[:height+1] + bc.storedHeaderCount = height + 1 + } + block, err := bc.dao.GetBlock(bc.headerHashes[height]) + if err != nil { + return fmt.Errorf("failed to get current block: %w", err) + } + bc.topBlock.Store(block) + atomic.StoreUint32(&bc.blockHeight, height) + atomic.StoreUint32(&bc.persistedHeight, height) + err = bc.initializeNativeCache(block.Index, bc.dao) if err != nil { return fmt.Errorf("failed to initialize natives cache: %w", err) } - if err := bc.updateExtensibleWhitelist(p); err != nil { + if err := bc.updateExtensibleWhitelist(height); err != nil { return fmt.Errorf("failed to update extensible whitelist: %w", err) } - updateBlockHeightMetric(p) + updateBlockHeightMetric(height) + return nil +} - bc.dao.Store.Delete(jumpStageKey) +// Reset resets chain state to the specified height if possible. This method +// performs direct DB changes and can be called on non-running Blockchain only. +func (bc *Blockchain) Reset(height uint32) error { + if bc.isRunning.Load().(bool) { + return errors.New("can't reset state of the running blockchain") + } + bc.dao.PutStateSyncPoint(height) + return bc.resetStateInternal(height, none) +} + +func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) error { + currHeight := bc.BlockHeight() + if height > currHeight { + return fmt.Errorf("current block height is %d, can't reset state to height %d", currHeight, height) + } + if height == currHeight && stage == none { + bc.log.Info("chain is already at the proper state", zap.Uint32("height", height)) + return nil + } + if bc.config.KeepOnlyLatestState { + return fmt.Errorf("KeepOnlyLatestState is enabled, state for height %d is outdated and removed from the storage", height) + } + if bc.config.RemoveUntraceableBlocks && currHeight >= bc.config.MaxTraceableBlocks { + return fmt.Errorf("RemoveUntraceableBlocks is enabled, a necessary batch of traceable blocks has already been removed") + } + + // Retrieve necessary state before the DB modification. + hHeight := bc.HeaderHeight() + b, err := bc.GetBlock(bc.headerHashes[height]) + if err != nil { + return fmt.Errorf("failed to retrieve block %d: %w", height, err) + } + sr, err := bc.stateRoot.GetStateRoot(height) + if err != nil { + return fmt.Errorf("failed to retrieve stateroot for height %d: %w", height, err) + } + v := bc.dao.Version + cache := bc.dao // dao is MemCachedStore over DB, so use dao directly to persist cached changes right to the underlying DB + + bc.log.Info("initialize state reset", zap.Uint32("target height", height)) + start := time.Now() + p := start + + resetStageKey := []byte{byte(storage.SYSStateChangeStage)} + switch stage { + case none: + cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)}) + _, err = cache.Persist() + if err != nil { + return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err) + } + fallthrough + case stateJumpStarted: + // Remove headers/blocks/transactions/aers from currHeight down to height (not including height itself). + for i := height + 1; i <= hHeight; i++ { + err := cache.PurgeBlock(bc.headerHashes[i]) + if err != nil { + return fmt.Errorf("error while removing block %d: %w", i, err) + } + } + cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) + _, err = cache.Persist() + if err != nil { + return fmt.Errorf("failed to persist blocks, transactions ans AERs changes to the DB: %w", err) + } + + bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("duration", time.Since(p))) + p = time.Now() + fallthrough + case staleBlocksRemoved: + // Completely remove contract IDs to update them later. + cache.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.STContractID)}}, func(k, _ []byte) bool { + cache.Store.Delete(k) + return true + }) + + // Reset contracts storage and store new contract IDs. + var mode = mpt.ModeAll + if bc.config.RemoveUntraceableBlocks { + mode |= mpt.ModeGCFlag + } + trieStore := mpt.NewTrieStore(sr.Root, mode, cache.Store) + oldStoragePrefix := v.StoragePrefix + newStoragePrefix := statesync.TemporaryPrefix(oldStoragePrefix) + mgmtCSPrefixLen := 1 + 4 + 1 // STStorage + Management ID + contract state prefix + mgmtContractPrefix := make([]byte, mgmtCSPrefixLen-1) + id := int32(native.ManagementContractID) + binary.BigEndian.PutUint32(mgmtContractPrefix, uint32(id)) + mgmtContractPrefix[4] = native.PrefixContract + cs := new(state.Contract) + + const persistBatchSize = 10000 + var ( + seekErr error + cnt int + ) + trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool { + if cnt >= persistBatchSize { + _, seekErr = cache.Persist() + if seekErr != nil { + return false + } + } + // May safely omit KV copying. + k[0] = byte(newStoragePrefix) + cache.Store.Put(k, v) + + // @fixme: remove this part after #2702. + if bytes.HasPrefix(k[1:], mgmtContractPrefix) { + var hash util.Uint160 + copy(hash[:], k[mgmtCSPrefixLen:]) + err = stackitem.DeserializeConvertible(v, cs) + if err != nil { + seekErr = fmt.Errorf("failed to deserialize contract state: %w", err) + } + cache.PutContractID(cs.ID, hash) + } + cnt++ + return seekErr == nil + }) + if seekErr != nil { + return fmt.Errorf("failed to reset contract IDs: %w", err) + } + trieStore.Close() + + cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)}) + _, err = cache.Persist() + if err != nil { + return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err) + } + + bc.log.Info("contracts storage and IDs are reset", zap.Duration("duration", time.Since(p))) + p = time.Now() + fallthrough + case newStorageItemsAdded: + // Reset SYS-prefixed and IX-prefixed information. + cache.DeleteHeaderHashes(height+1, headerBatchCount) + cache.StoreAsCurrentBlock(b) + cache.PutCurrentHeader(b.Hash(), height) + v.StoragePrefix = statesync.TemporaryPrefix(v.StoragePrefix) + cache.PutVersion(v) + bc.persistent.Version = v + + cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)}) + _, err = cache.Persist() + if err != nil { + return fmt.Errorf("failed to persist headers changes to the DB: %w", err) + } + + bc.log.Info("headers are reset", zap.Duration("duration", time.Since(p))) + p = time.Now() + fallthrough + case headersReset: + // Reset MPT. + err = bc.stateRoot.ResetState(height, cache.Store) + if err != nil { + return fmt.Errorf("failed to rollback MPT state: %w", err) + } + + // Reset transfers. + err = bc.resetTransfers(cache, height) + if err != nil { + return fmt.Errorf("failed to strip transfer log / transfer info: %w", err) + } + + cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)}) + _, err = cache.Persist() + if err != nil { + return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err) + } + + bc.log.Info("MPT and transfers are reset", zap.Duration("duration", time.Since(p))) + fallthrough + case transfersReset: + // there's nothing to do after that, so just continue with common operations + // and remove state reset stage in the end. + default: + return fmt.Errorf("unknown state reset stage: %d", stage) + } + + // Direct (cache-less) DB operation: remove stale storage items. + err = bc.store.SeekGC(storage.SeekRange{ + Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))}, + }, func(_, _ []byte) bool { + return false + }) + if err != nil { + return fmt.Errorf("faield to remove stale storage items from DB: %w", err) + } + + cache.Store.Delete(resetStageKey) + // Unlike the state jump, state sync point must be removed as we have complete state for this height. + cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) + _, err = cache.Persist() + if err != nil { + return fmt.Errorf("failed to persist state reset stage to DAO: %w", err) + } + + err = bc.resetRAMState(height, true) + if err != nil { + return fmt.Errorf("failed to update in-memory blockchain data: %w", err) + } + + bc.log.Info("reset finished successfully", zap.Duration("duration", time.Since(start))) return nil } @@ -734,6 +971,143 @@ func (bc *Blockchain) tryRunGC(oldHeight uint32) time.Duration { return dur } +// resetTransfers is a helper function that strips the top newest NEP17 and NEP11 transfer logs +// down to the given height (not including the height itself) and updates corresponding token +// transfer info. +func (bc *Blockchain) resetTransfers(cache *dao.Simple, height uint32) error { + // Completely remove transfer info, updating it takes too much effort. We'll gather new + // transfer info on-the-fly later. + cache.Store.Seek(storage.SeekRange{ + Prefix: []byte{byte(storage.STTokenTransferInfo)}, + }, func(k, v []byte) bool { + cache.Store.Delete(k) + return true + }) + + // Look inside each transfer batch and iterate over the batch transfers, picking those that + // not newer than the given height. Also, for each suitable transfer update transfer info + // flushing changes after complete account's transfers processing. + prefixes := []byte{byte(storage.STNEP11Transfers), byte(storage.STNEP17Transfers)} + for i := range prefixes { + var ( + acc util.Uint160 + trInfo *state.TokenTransferInfo + removeFollowing bool + seekErr error + ) + + cache.Store.Seek(storage.SeekRange{ + Prefix: prefixes[i : i+1], + Backwards: false, // From oldest to newest batch. + }, func(k, v []byte) bool { + var batchAcc util.Uint160 + copy(batchAcc[:], k[1:]) + + if batchAcc != acc { // Some new account we're iterating over. + if trInfo != nil { + seekErr = cache.PutTokenTransferInfo(acc, trInfo) + if seekErr != nil { + return false + } + } + acc = batchAcc + trInfo = nil + removeFollowing = false + } else if removeFollowing { + cache.Store.Delete(slice.Copy(k)) + return seekErr == nil + } + + r := io.NewBinReaderFromBuf(v[1:]) + l := len(v) + bytesRead := 1 // 1 is for batch size byte which is read by default. + var ( + oldBatchSize = v[0] + newBatchSize byte + ) + for i := byte(0); i < v[0]; i++ { // From oldest to newest transfer of the batch. + var t *state.NEP17Transfer + if k[0] == byte(storage.STNEP11Transfers) { + tr := new(state.NEP11Transfer) + tr.DecodeBinary(r) + t = &tr.NEP17Transfer + } else { + t = new(state.NEP17Transfer) + t.DecodeBinary(r) + } + if r.Err != nil { + seekErr = fmt.Errorf("failed to decode subsequent transfer: %w", r.Err) + break + } + + if t.Block > height { + break + } + bytesRead = l - r.Len() // Including batch size byte. + newBatchSize++ + if trInfo == nil { + var err error + trInfo, err = cache.GetTokenTransferInfo(batchAcc) + if err != nil { + seekErr = fmt.Errorf("failed to retrieve token transfer info for %s: %w", batchAcc.StringLE(), r.Err) + return false + } + } + appendTokenTransferInfo(trInfo, t.Asset, t.Block, t.Timestamp, k[0] == byte(storage.STNEP11Transfers), newBatchSize >= state.TokenTransferBatchSize) + } + if newBatchSize == oldBatchSize { + // The batch is already in storage and doesn't need to be changed. + return seekErr == nil + } + if newBatchSize > 0 { + v[0] = newBatchSize + cache.Store.Put(k, v[:bytesRead]) + } else { + cache.Store.Delete(k) + removeFollowing = true + } + return seekErr == nil + }) + if seekErr != nil { + return seekErr + } + if trInfo != nil { + // Flush the last batch of transfer info changes. + err := cache.PutTokenTransferInfo(acc, trInfo) + if err != nil { + return err + } + } + } + return nil +} + +// appendTokenTransferInfo is a helper for resetTransfers that updates token transfer info +// wrt the given transfer that was added to the subsequent transfer batch. +func appendTokenTransferInfo(transferData *state.TokenTransferInfo, + token int32, bIndex uint32, bTimestamp uint64, isNEP11 bool, lastTransferInBatch bool) { + var ( + newBatch *bool + nextBatch *uint32 + currTimestamp *uint64 + ) + if !isNEP11 { + newBatch = &transferData.NewNEP17Batch + nextBatch = &transferData.NextNEP17Batch + currTimestamp = &transferData.NextNEP17NewestTimestamp + } else { + newBatch = &transferData.NewNEP11Batch + nextBatch = &transferData.NextNEP11Batch + currTimestamp = &transferData.NextNEP11NewestTimestamp + } + transferData.LastUpdated[token] = bIndex + *newBatch = lastTransferInBatch + if *newBatch { + *nextBatch++ + *currTimestamp = bTimestamp + } +} + func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration { bc.log.Info("starting transfer data garbage collection", zap.Uint32("index", index)) start := time.Now() diff --git a/pkg/core/blockchain_core_test.go b/pkg/core/blockchain_core_test.go index c7f652a63..500b526f8 100644 --- a/pkg/core/blockchain_core_test.go +++ b/pkg/core/blockchain_core_test.go @@ -203,14 +203,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { c.ProtocolConfiguration.KeepOnlyLatestState = true } // manually store statejump stage to check statejump recover process - bPrefix[0] = byte(storage.SYSStateJumpStage) - t.Run("invalid RemoveUntraceableBlocks setting", func(t *testing.T) { - bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)}) - checkNewBlockchainErr(t, func(c *config.Config) { - boltCfg(c) - c.ProtocolConfiguration.RemoveUntraceableBlocks = false - }, bcSpout.dao.Store, "state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on") - }) + bPrefix[0] = byte(storage.SYSStateChangeStage) t.Run("invalid state jump stage format", func(t *testing.T) { bcSpout.dao.Store.Put(bPrefix, []byte{0x01, 0x02}) checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "invalid state jump stage format") @@ -219,6 +212,16 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)}) checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "failed to get state sync point from the storage") }) + t.Run("invalid RemoveUntraceableBlocks setting", func(t *testing.T) { + bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)}) + point := make([]byte, 4) + binary.LittleEndian.PutUint32(point, uint32(stateSyncPoint)) + bcSpout.dao.Store.Put([]byte{byte(storage.SYSStateSyncPoint)}, point) + checkNewBlockchainErr(t, func(c *config.Config) { + boltCfg(c) + c.ProtocolConfiguration.RemoveUntraceableBlocks = false + }, bcSpout.dao.Store, "state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on") + }) t.Run("invalid state sync point", func(t *testing.T) { bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)}) point := make([]byte, 4) @@ -226,7 +229,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { bcSpout.dao.Store.Put([]byte{byte(storage.SYSStateSyncPoint)}, point) checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "invalid state sync point") }) - for _, stage := range []stateJumpStage{stateJumpStarted, newStorageItemsAdded, genesisStateRemoved, 0x03} { + for _, stage := range []stateChangeStage{stateJumpStarted, newStorageItemsAdded, staleBlocksRemoved, 0x03} { t.Run(fmt.Sprintf("state jump stage %d", stage), func(t *testing.T) { bcSpout.dao.Store.Put(bPrefix, []byte{byte(stage)}) point := make([]byte, 4) diff --git a/pkg/core/blockchain_neotest_test.go b/pkg/core/blockchain_neotest_test.go index 043939c5a..7ab6c53a2 100644 --- a/pkg/core/blockchain_neotest_test.go +++ b/pkg/core/blockchain_neotest_test.go @@ -1897,3 +1897,220 @@ func TestBlockchain_Bug1728(t *testing.T) { c := neotest.CompileSource(t, acc.ScriptHash(), strings.NewReader(src), &compiler.Options{Name: "TestContract"}) managementInvoker.DeployContract(t, c, nil) } + +func TestBlockchain_ResetStateErrors(t *testing.T) { + chainHeight := 3 + checkResetErr := func(t *testing.T, cfg func(c *config.ProtocolConfiguration), h uint32, errText string) { + db, path := newLevelDBForTestingWithPath(t, t.TempDir()) + bc, validators, committee := chain.NewMultiWithCustomConfigAndStore(t, cfg, db, false) + e := neotest.NewExecutor(t, bc, validators, committee) + go bc.Run() + for i := 0; i < chainHeight; i++ { + e.AddNewBlock(t) // get some height + } + bc.Close() + + db, _ = newLevelDBForTestingWithPath(t, path) + defer db.Close() + bc, _, _ = chain.NewMultiWithCustomConfigAndStore(t, cfg, db, false) + err := bc.Reset(h) + if errText != "" { + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), errText), err) + } else { + require.NoError(t, err) + } + } + t.Run("large height", func(t *testing.T) { + checkResetErr(t, nil, uint32(chainHeight+1), "can't reset state to height 4") + }) + t.Run("already at height", func(t *testing.T) { + checkResetErr(t, nil, uint32(chainHeight), "") + }) + t.Run("KeepOnlyLatestState is enabled", func(t *testing.T) { + checkResetErr(t, func(c *config.ProtocolConfiguration) { + c.KeepOnlyLatestState = true + }, uint32(chainHeight-1), "KeepOnlyLatestState is enabled") + }) + t.Run("some blocks where removed", func(t *testing.T) { + checkResetErr(t, func(c *config.ProtocolConfiguration) { + c.RemoveUntraceableBlocks = true + c.MaxTraceableBlocks = 2 + }, uint32(chainHeight-3), "RemoveUntraceableBlocks is enabled, a necessary batch of traceable blocks has already been removed") + }) +} + +// TestBlockchain_ResetState is based on knowledge about basic chain transactions, +// it performs basic chain reset and checks that reset chain has proper state. +func TestBlockchain_ResetState(t *testing.T) { + // Create the DB. + db, path := newLevelDBForTestingWithPath(t, t.TempDir()) + bc, validators, committee := chain.NewMultiWithCustomConfigAndStore(t, func(cfg *config.ProtocolConfiguration) { + cfg.P2PSigExtensions = true + }, db, false) + go bc.Run() + e := neotest.NewExecutor(t, bc, validators, committee) + basicchain.Init(t, "../../", e) + + // Gather some reference information. + resetBlockIndex := uint32(15) + staleID := basicchain.NFSOContractID // NEP11 + rublesH := e.ContractHash(t, basicchain.RublesContractID) + nnsH := e.ContractHash(t, basicchain.NNSContractID) + staleH := e.ContractHash(t, staleID) + gasH := e.NativeHash(t, nativenames.Gas) + neoH := e.NativeHash(t, nativenames.Neo) + gasID := e.NativeID(t, nativenames.Gas) + neoID := e.NativeID(t, nativenames.Neo) + resetBlockHash := bc.GetHeaderHash(int(resetBlockIndex)) + resetBlockHeader, err := bc.GetHeader(resetBlockHash) + require.NoError(t, err) + topBlockHeight := bc.BlockHeight() + topBH := bc.GetHeaderHash(int(bc.BlockHeight())) + staleBH := bc.GetHeaderHash(int(resetBlockIndex + 1)) + staleB, err := bc.GetBlock(staleBH) + require.NoError(t, err) + staleTx := staleB.Transactions[0] + _, err = bc.GetAppExecResults(staleTx.Hash(), trigger.Application) + require.NoError(t, err) + sr, err := bc.GetStateModule().GetStateRoot(resetBlockIndex) + require.NoError(t, err) + staleSR, err := bc.GetStateModule().GetStateRoot(resetBlockIndex + 1) + require.NoError(t, err) + rublesKey := []byte("testkey") + rublesStaleKey := []byte("aa") + rublesStaleValue := bc.GetStorageItem(basicchain.RublesContractID, rublesKey) // check value is there + require.Equal(t, []byte(basicchain.RublesNewTestvalue), []byte(rublesStaleValue)) + acc0 := e.Validator.(neotest.MultiSigner).Single(2) // priv0 index->order and order->index conversion + priv0ScriptHash := acc0.ScriptHash() + var ( + expectedNEP11t []*state.NEP11Transfer + expectedNEP17t []*state.NEP17Transfer + ) + require.NoError(t, bc.ForEachNEP11Transfer(priv0ScriptHash, resetBlockHeader.Timestamp, func(t *state.NEP11Transfer) (bool, error) { + if t.Block <= resetBlockIndex { + expectedNEP11t = append(expectedNEP11t, t) + } + return true, nil + })) + require.NoError(t, bc.ForEachNEP17Transfer(priv0ScriptHash, resetBlockHeader.Timestamp, func(t *state.NEP17Transfer) (bool, error) { + if t.Block <= resetBlockIndex { + expectedNEP17t = append(expectedNEP17t, t) + } + return true, nil + })) + + // checkProof checks that some stale proof is reachable + checkProof := func() { + rublesStaleFullKey := make([]byte, 4) + binary.LittleEndian.PutUint32(rublesStaleFullKey, uint32(basicchain.RublesContractID)) + rublesStaleFullKey = append(rublesStaleFullKey, rublesStaleKey...) + proof, err := bc.GetStateModule().GetStateProof(staleSR.Root, rublesStaleFullKey) + require.NoError(t, err) + require.NotEmpty(t, proof) + } + checkProof() + + // Ensure all changes were persisted. + bc.Close() + + // Start new chain with existing DB, but do not run it. + db, _ = newLevelDBForTestingWithPath(t, path) + bc, _, _ = chain.NewMultiWithCustomConfigAndStore(t, func(cfg *config.ProtocolConfiguration) { + cfg.P2PSigExtensions = true + }, db, false) + defer db.Close() + require.Equal(t, topBlockHeight, bc.BlockHeight()) // ensure DB was properly initialized. + + // Reset state. + require.NoError(t, bc.Reset(resetBlockIndex)) + + // Check that state was properly reset. + require.Equal(t, resetBlockIndex, bc.BlockHeight()) + require.Equal(t, resetBlockIndex, bc.HeaderHeight()) + require.Equal(t, resetBlockHash, bc.CurrentHeaderHash()) + require.Equal(t, resetBlockHash, bc.CurrentBlockHash()) + require.Equal(t, resetBlockIndex, bc.GetStateModule().CurrentLocalHeight()) + require.Equal(t, sr.Root, bc.GetStateModule().CurrentLocalStateRoot()) + require.Equal(t, uint32(0), bc.GetStateModule().CurrentValidatedHeight()) + + // Try to get the latest block\header. + bh := bc.GetHeaderHash(int(resetBlockIndex)) + require.Equal(t, resetBlockHash, bh) + h, err := bc.GetHeader(bh) + require.NoError(t, err) + require.Equal(t, resetBlockHeader, h) + actualRublesHash, err := bc.GetContractScriptHash(basicchain.RublesContractID) + require.NoError(t, err) + require.Equal(t, rublesH, actualRublesHash) + + // Check that stale blocks/headers/txs/aers/sr are not reachable. + for i := resetBlockIndex + 1; i <= topBlockHeight; i++ { + hHash := bc.GetHeaderHash(int(i)) + require.Equal(t, util.Uint256{}, hHash) + _, err = bc.GetStateRoot(i) + require.Error(t, err) + } + for _, h := range []util.Uint256{staleBH, topBH} { + _, err = bc.GetHeader(h) + require.Error(t, err) + _, err = bc.GetHeader(h) + require.Error(t, err) + } + _, _, err = bc.GetTransaction(staleTx.Hash()) + require.Error(t, err) + _, err = bc.GetAppExecResults(staleTx.Hash(), trigger.Application) + require.Error(t, err) + + // However, proofs and everything related to stale MPT nodes still should work properly, + // because we don't remove stale MPT nodes. + checkProof() + + // Check NEP-compatible contracts. + nep11 := bc.GetNEP11Contracts() + require.Equal(t, 1, len(nep11)) // NNS + require.Equal(t, nnsH, nep11[0]) + nep17 := bc.GetNEP17Contracts() + require.Equal(t, 3, len(nep17)) // Neo, Gas, Rubles + require.ElementsMatch(t, []util.Uint160{gasH, neoH, rublesH}, nep17) + + // Retrieve stale contract. + cs := bc.GetContractState(staleH) + require.Nil(t, cs) + + // Retrieve stale storage item. + rublesValue := bc.GetStorageItem(basicchain.RublesContractID, rublesKey) + require.Equal(t, []byte(basicchain.RublesOldTestvalue), []byte(rublesValue)) // the one with historic state + require.Nil(t, bc.GetStorageItem(basicchain.RublesContractID, rublesStaleKey)) // the one that was added after target reset block + db.Seek(storage.SeekRange{ + Prefix: []byte{byte(storage.STStorage)}, // no items with old prefix + }, func(k, v []byte) bool { + t.Fatal("no stale items must be left in storage") + return false + }) + + // Check transfers. + var ( + actualNEP11t []*state.NEP11Transfer + actualNEP17t []*state.NEP17Transfer + ) + require.NoError(t, bc.ForEachNEP11Transfer(priv0ScriptHash, e.TopBlock(t).Timestamp, func(t *state.NEP11Transfer) (bool, error) { + actualNEP11t = append(actualNEP11t, t) + return true, nil + })) + require.NoError(t, bc.ForEachNEP17Transfer(priv0ScriptHash, e.TopBlock(t).Timestamp, func(t *state.NEP17Transfer) (bool, error) { + actualNEP17t = append(actualNEP17t, t) + return true, nil + })) + assert.Equal(t, expectedNEP11t, actualNEP11t) + assert.Equal(t, expectedNEP17t, actualNEP17t) + lub, err := bc.GetTokenLastUpdated(priv0ScriptHash) + require.NoError(t, err) + expectedLUB := map[int32]uint32{ // this information is extracted from basic chain initialization code + basicchain.NNSContractID: resetBlockIndex - 1, // `neo.com` registration + basicchain.RublesContractID: 6, // transfer of 123 RUR to priv1 + gasID: resetBlockIndex, // fee for `1.2.3.4` A record registration + neoID: 4, // transfer of 1000 NEO to priv1 + } + require.Equal(t, expectedLUB, lub) +} diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index cd8266bef..14f4df1d6 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -603,6 +603,26 @@ func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) { return hashes, seekErr } +// DeleteHeaderHashes removes batches of header hashes starting from the one that +// contains header with index `since` up to the most recent batch. It assumes that +// all stored batches contain `batchSize` hashes. +func (dao *Simple) DeleteHeaderHashes(since uint32, batchSize int) { + dao.Store.Seek(storage.SeekRange{ + Prefix: dao.mkKeyPrefix(storage.IXHeaderHashList), + Backwards: true, + }, func(k, _ []byte) bool { + first := binary.BigEndian.Uint32(k[1:]) + if first >= since { + dao.Store.Delete(k) + return first != since + } + if first+uint32(batchSize)-1 >= since { + dao.Store.Delete(k) + } + return false + }) +} + // GetTransaction returns Transaction and its height by the given hash // if it exists in the store. It does not return dummy transactions. func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) { @@ -739,6 +759,17 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a // DeleteBlock removes the block from dao. It's not atomic, so make sure you're // using private MemCached instance here. func (dao *Simple) DeleteBlock(h util.Uint256) error { + return dao.deleteBlock(h, true) +} + +// PurgeBlock completely removes specified block (or just block header) from dao. +// It differs from DeleteBlock in that it removes header anyway. It's not atomic, +// so make sure you're using private MemCached instance here. +func (dao *Simple) PurgeBlock(h util.Uint256) error { + return dao.deleteBlock(h, false) +} + +func (dao *Simple) deleteBlock(h util.Uint256, keepHeader bool) error { key := dao.makeExecutableKey(h) b, err := dao.getBlock(key) @@ -746,9 +777,13 @@ func (dao *Simple) DeleteBlock(h util.Uint256) error { return err } - err = dao.storeHeader(key, &b.Header) - if err != nil { - return err + if keepHeader { + err = dao.storeHeader(key, &b.Header) + if err != nil { + return err + } + } else { + dao.Store.Delete(key) } for _, tx := range b.Transactions { diff --git a/pkg/core/native/management.go b/pkg/core/native/management.go index c305001d2..444cf07fd 100644 --- a/pkg/core/native/management.go +++ b/pkg/core/native/management.go @@ -43,7 +43,8 @@ type ManagementCache struct { const ( ManagementContractID = -1 - prefixContract = 8 + // PrefixContract is a prefix used to store contract states inside Management native contract. + PrefixContract = 8 defaultMinimumDeploymentFee = 10_00000000 contractDeployNotificationName = "Deploy" @@ -87,7 +88,7 @@ func (c *ManagementCache) Copy() dao.NativeContractCache { // MakeContractKey creates a key from the account script hash. func MakeContractKey(h util.Uint160) []byte { - return makeUint160Key(prefixContract, h) + return makeUint160Key(PrefixContract, h) } // newManagement creates a new Management native contract. @@ -539,7 +540,7 @@ func (m *Management) InitializeCache(d *dao.Simple) error { } var initErr error - d.Seek(m.ID, storage.SeekRange{Prefix: []byte{prefixContract}}, func(_, v []byte) bool { + d.Seek(m.ID, storage.SeekRange{Prefix: []byte{PrefixContract}}, func(_, v []byte) bool { var cs = new(state.Contract) initErr = stackitem.DeserializeConvertible(v, cs) if initErr != nil { diff --git a/pkg/core/native/management_test.go b/pkg/core/native/management_test.go index 1f8b92b89..123e67f3a 100644 --- a/pkg/core/native/management_test.go +++ b/pkg/core/native/management_test.go @@ -81,7 +81,7 @@ func TestManagement_Initialize(t *testing.T) { t.Run("invalid contract state", func(t *testing.T) { d := dao.NewSimple(storage.NewMemoryStore(), false, false) mgmt := newManagement() - d.PutStorageItem(mgmt.ID, []byte{prefixContract}, state.StorageItem{0xFF}) + d.PutStorageItem(mgmt.ID, []byte{PrefixContract}, state.StorageItem{0xFF}) require.Error(t, mgmt.InitializeCache(d)) }) } diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index 4d34bf5b4..07f58e006 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -16,6 +16,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" "go.uber.org/zap" @@ -202,6 +203,67 @@ func (s *Module) JumpToState(sr *state.MPTRoot) { s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), s.mode, s.Store) } +// ResetState resets MPT state to the given height. +func (s *Module) ResetState(height uint32, cache *storage.MemCachedStore) error { + // Update local stateroot. + sr, err := s.GetStateRoot(height) + if err != nil { + return fmt.Errorf("failed to retrieve state root for height %d: %w", height, err) + } + s.addLocalStateRoot(cache, sr) + + // Remove all stateroots newer than the given height. + srKey := makeStateRootKey(height) + var srSeen bool + cache.Seek(storage.SeekRange{ + Prefix: srKey[0:1], + Start: srKey[1:5], + Backwards: false, + }, func(k, v []byte) bool { + if len(k) == 5 { + if srSeen { + cache.Delete(k) + } else if bytes.Equal(k, srKey) { + srSeen = true + } + } + return true + }) + + // Retrieve the most recent validated stateroot before the given height. + witnessesLenOffset := 1 /* version */ + 4 /* index */ + smartcontract.Hash256Len /* root */ + var validated *uint32 + cache.Seek(storage.SeekRange{ + Prefix: srKey[0:1], + Start: srKey[1:5], + Backwards: true, + }, func(k, v []byte) bool { + if len(k) == 5 { + if len(v) > witnessesLenOffset && v[witnessesLenOffset] != 0 { + i := binary.BigEndian.Uint32(k[1:]) + validated = &i + return false + } + } + return true + }) + if validated != nil { + validatedBytes := make([]byte, 4) + binary.LittleEndian.PutUint32(validatedBytes, *validated) + cache.Put([]byte{byte(storage.DataMPTAux), prefixValidated}, validatedBytes) + s.validatedHeight.Store(*validated) + } else { + cache.Delete([]byte{byte(storage.DataMPTAux), prefixValidated}) + } + + s.currentLocal.Store(sr.Root) + s.localHeight.Store(sr.Index) + s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), s.mode, s.Store) + + // Do not reset MPT nodes, leave the trie state itself as is. + return nil +} + // GC performs garbage collection. func (s *Module) GC(index uint32, store storage.Store) time.Duration { if !s.mode.GC() { diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 0f2461bcd..71d0b765f 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -32,8 +32,13 @@ const ( SYSCurrentHeader KeyPrefix = 0xc1 SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2 SYSStateSyncPoint KeyPrefix = 0xc3 - SYSStateJumpStage KeyPrefix = 0xc4 - SYSVersion KeyPrefix = 0xf0 + // SYSStateChangeStage is used to store the phase of a state changing process + // which is one of the state jump or state reset. Its value is one byte containing + // state reset / state jump stages bits (first seven bits are reserved for that) + // and the last bit reserved for the state reset process marker (set to 1 on + // unfinished state reset and to 0 on unfinished state jump). + SYSStateChangeStage KeyPrefix = 0xc4 + SYSVersion KeyPrefix = 0xf0 ) // Executable subtypes.