Merge pull request #2813 from nspcc-dev/fix-state-reset

core: fix broken state reset
This commit is contained in:
Roman Khimov 2022-11-23 13:43:42 +07:00 committed by GitHub
commit 66ddeccdad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 124 additions and 57 deletions

View file

@ -672,23 +672,31 @@ func (bc *Blockchain) Reset(height uint32) error {
} }
func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) error { func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) error {
currHeight := bc.BlockHeight() // Cache isn't yet initialized, so retrieve block height right from DAO.
if height > currHeight { currHeight, err := bc.dao.GetCurrentBlockHeight()
return fmt.Errorf("current block height is %d, can't reset state to height %d", currHeight, height) if err != nil {
return fmt.Errorf("failed to retrieve current block height: %w", err)
} }
if height == currHeight && stage == none { // Headers are already initialized by this moment, thus may use chain's API.
bc.log.Info("chain is already at the proper state", zap.Uint32("height", height)) hHeight := bc.HeaderHeight()
return nil // State reset may already be started by this moment, so perform these checks only if it wasn't.
} if stage == none {
if bc.config.KeepOnlyLatestState { if height > currHeight {
return fmt.Errorf("KeepOnlyLatestState is enabled, state for height %d is outdated and removed from the storage", height) return fmt.Errorf("current block height is %d, can't reset state to height %d", currHeight, height)
} }
if bc.config.RemoveUntraceableBlocks && currHeight >= bc.config.MaxTraceableBlocks { if height == currHeight && hHeight == currHeight {
return fmt.Errorf("RemoveUntraceableBlocks is enabled, a necessary batch of traceable blocks has already been removed") bc.log.Info("chain is at the proper state", zap.Uint32("height", height))
return nil
}
if bc.config.KeepOnlyLatestState {
return fmt.Errorf("KeepOnlyLatestState is enabled, state for height %d is outdated and removed from the storage", height)
}
if bc.config.RemoveUntraceableBlocks && currHeight >= bc.config.MaxTraceableBlocks {
return fmt.Errorf("RemoveUntraceableBlocks is enabled, a necessary batch of traceable blocks has already been removed")
}
} }
// Retrieve necessary state before the DB modification. // Retrieve necessary state before the DB modification.
hHeight := bc.HeaderHeight()
b, err := bc.GetBlock(bc.headerHashes[height]) b, err := bc.GetBlock(bc.headerHashes[height])
if err != nil { if err != nil {
return fmt.Errorf("failed to retrieve block %d: %w", height, err) return fmt.Errorf("failed to retrieve block %d: %w", height, err)
@ -703,6 +711,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
bc.log.Info("initialize state reset", zap.Uint32("target height", height)) bc.log.Info("initialize state reset", zap.Uint32("target height", height))
start := time.Now() start := time.Now()
p := start p := start
keys := 0
resetStageKey := []byte{byte(storage.SYSStateChangeStage)} resetStageKey := []byte{byte(storage.SYSStateChangeStage)}
switch stage { switch stage {
@ -714,28 +723,60 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
} }
fallthrough fallthrough
case stateJumpStarted: case stateJumpStarted:
// Remove headers/blocks/transactions/aers from currHeight down to height (not including height itself). bc.log.Info("trying to reset blocks, transactions and AERs")
for i := height + 1; i <= hHeight; i++ { // Remove blocks/transactions/aers from currHeight down to height (not including height itself).
err := cache.PurgeBlock(bc.headerHashes[i]) // Keep headers for now, they'll be removed later. It's hard to handle the whole set of changes in
// one stage, so persist periodically.
const persistBatchSize = 100 * headerBatchCount // count blocks only, should be enough to avoid OOM killer even for large blocks
var (
pBlocksStart = p
blocksCnt, batchCnt, keysCnt int
)
for i := height + 1; i <= currHeight; i++ {
err := cache.DeleteBlock(bc.GetHeaderHash(int(i)))
if err != nil { if err != nil {
return fmt.Errorf("error while removing block %d: %w", i, err) return fmt.Errorf("error while removing block %d: %w", i, err)
} }
blocksCnt++
if blocksCnt == persistBatchSize {
keys, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist intermediate batch removed blocks, transactions and AERs: %w", err)
}
blocksCnt = 0
batchCnt++
keysCnt += keys
bc.log.Info("intermediate batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now()
}
} }
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)})
_, err = cache.Persist() keys, err = cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed to persist blocks, transactions ans AERs changes to the DB: %w", err) return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err)
} }
batchCnt++
keysCnt += keys
bc.log.Info("last batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("duration", time.Since(p))) bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(pBlocksStart)),
zap.Int("overall persisted keys", keysCnt))
p = time.Now() p = time.Now()
fallthrough fallthrough
case staleBlocksRemoved: case staleBlocksRemoved:
// Completely remove contract IDs to update them later. // Completely remove contract IDs to update them later.
bc.log.Info("trying to reset contract storage items and IDs")
pStorageStart := p
cache.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.STContractID)}}, func(k, _ []byte) bool { cache.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.STContractID)}}, func(k, _ []byte) bool {
cache.Store.Delete(k) cache.Store.Delete(k)
return true return true
}) })
keys, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist removed contract IDs: %w", err)
}
bc.log.Info("removed contract IDs are persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now()
// Reset contracts storage and store new contract IDs. // Reset contracts storage and store new contract IDs.
var mode = mpt.ModeAll var mode = mpt.ModeAll
@ -752,21 +793,34 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
mgmtContractPrefix[4] = native.PrefixContract mgmtContractPrefix[4] = native.PrefixContract
cs := new(state.Contract) cs := new(state.Contract)
const persistBatchSize = 10000 const persistBatchSize = 200000
var ( var (
seekErr error seekErr error
cnt int cnt int
storageItmsCnt int
contractIDsCnt int
batchCnt int
) )
trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool { trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool {
if seekErr != nil {
return false
}
if cnt >= persistBatchSize { if cnt >= persistBatchSize {
_, seekErr = cache.Persist() cnt = 0
keys, seekErr = cache.Persist()
if seekErr != nil { if seekErr != nil {
seekErr = fmt.Errorf("failed to persist intermediate batch of contract storage items and IDs: %w", seekErr)
return false return false
} }
batchCnt++
bc.log.Info("intermediate batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now()
} }
// May safely omit KV copying. // May safely omit KV copying.
k[0] = byte(newStoragePrefix) k[0] = byte(newStoragePrefix)
cache.Store.Put(k, v) cache.Store.Put(k, v)
cnt++
storageItmsCnt++
// @fixme: remove this part after #2702. // @fixme: remove this part after #2702.
if bytes.HasPrefix(k[1:], mgmtContractPrefix) { if bytes.HasPrefix(k[1:], mgmtContractPrefix) {
@ -774,29 +828,41 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
copy(hash[:], k[mgmtCSPrefixLen:]) copy(hash[:], k[mgmtCSPrefixLen:])
err = stackitem.DeserializeConvertible(v, cs) err = stackitem.DeserializeConvertible(v, cs)
if err != nil { if err != nil {
seekErr = fmt.Errorf("failed to deserialize contract state: %w", err) bc.log.Warn("failed to deserialize contract; ID for this contract won't be stored in the DB",
zap.String("hash", hash.StringLE()),
zap.Error(err))
} else {
cache.PutContractID(cs.ID, hash)
cnt++
contractIDsCnt++
} }
cache.PutContractID(cs.ID, hash)
} }
cnt++
return seekErr == nil return true
}) })
if seekErr != nil { if seekErr != nil {
return fmt.Errorf("failed to reset contract IDs: %w", err) return fmt.Errorf("failed to reset contract contract storage items and IDs: %w", seekErr)
} }
trieStore.Close() trieStore.Close()
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)}) cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)})
_, err = cache.Persist() keys, err = cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err) return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err)
} }
batchCnt++
bc.log.Info("contracts storage and IDs are reset", zap.Duration("duration", time.Since(p))) bc.log.Info("last batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("contract storage items and IDs are reset", zap.Duration("took", time.Since(pStorageStart)),
zap.Int("keys", storageItmsCnt),
zap.Int("ids", contractIDsCnt))
p = time.Now() p = time.Now()
fallthrough fallthrough
case newStorageItemsAdded: case newStorageItemsAdded:
// Reset SYS-prefixed and IX-prefixed information. // Reset SYS-prefixed and IX-prefixed information.
bc.log.Info("trying to reset headers information")
for i := height + 1; i <= hHeight; i++ {
cache.PurgeHeader(bc.GetHeaderHash(int(i)))
}
cache.DeleteHeaderHashes(height+1, headerBatchCount) cache.DeleteHeaderHashes(height+1, headerBatchCount)
cache.StoreAsCurrentBlock(b) cache.StoreAsCurrentBlock(b)
cache.PutCurrentHeader(b.Hash(), height) cache.PutCurrentHeader(b.Hash(), height)
@ -805,16 +871,17 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
bc.persistent.Version = v bc.persistent.Version = v
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)}) cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)})
_, err = cache.Persist() keys, err = cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed to persist headers changes to the DB: %w", err) return fmt.Errorf("failed to persist headers changes to the DB: %w", err)
} }
bc.log.Info("headers are reset", zap.Duration("duration", time.Since(p))) bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now() p = time.Now()
fallthrough fallthrough
case headersReset: case headersReset:
// Reset MPT. // Reset MPT.
bc.log.Info("trying to reset state root information and NEP transfers")
err = bc.stateRoot.ResetState(height, cache.Store) err = bc.stateRoot.ResetState(height, cache.Store)
if err != nil { if err != nil {
return fmt.Errorf("failed to rollback MPT state: %w", err) return fmt.Errorf("failed to rollback MPT state: %w", err)
@ -827,12 +894,13 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
} }
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)}) cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)})
_, err = cache.Persist() keys, err = cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err) return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err)
} }
bc.log.Info("MPT and transfers are reset", zap.Duration("duration", time.Since(p))) bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now()
fallthrough fallthrough
case transfersReset: case transfersReset:
// there's nothing to do after that, so just continue with common operations // there's nothing to do after that, so just continue with common operations
@ -842,29 +910,36 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
} }
// Direct (cache-less) DB operation: remove stale storage items. // Direct (cache-less) DB operation: remove stale storage items.
bc.log.Info("trying to remove stale storage items")
keys = 0
err = bc.store.SeekGC(storage.SeekRange{ err = bc.store.SeekGC(storage.SeekRange{
Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))}, Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))},
}, func(_, _ []byte) bool { }, func(_, _ []byte) bool {
keys++
return false return false
}) })
if err != nil { if err != nil {
return fmt.Errorf("faield to remove stale storage items from DB: %w", err) return fmt.Errorf("faield to remove stale storage items from DB: %w", err)
} }
bc.log.Info("stale storage items are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now()
bc.log.Info("trying to remove state reset point")
cache.Store.Delete(resetStageKey) cache.Store.Delete(resetStageKey)
// Unlike the state jump, state sync point must be removed as we have complete state for this height. // Unlike the state jump, state sync point must be removed as we have complete state for this height.
cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)})
_, err = cache.Persist() keys, err = cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed to persist state reset stage to DAO: %w", err) return fmt.Errorf("failed to persist state reset stage to DAO: %w", err)
} }
bc.log.Info("stale reset point is removed", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
err = bc.resetRAMState(height, true) err = bc.resetRAMState(height, true)
if err != nil { if err != nil {
return fmt.Errorf("failed to update in-memory blockchain data: %w", err) return fmt.Errorf("failed to update in-memory blockchain data: %w", err)
} }
bc.log.Info("reset finished successfully", zap.Duration("duration", time.Since(start))) bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start)))
return nil return nil
} }

View file

@ -759,31 +759,15 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a
// DeleteBlock removes the block from dao. It's not atomic, so make sure you're // DeleteBlock removes the block from dao. It's not atomic, so make sure you're
// using private MemCached instance here. // using private MemCached instance here.
func (dao *Simple) DeleteBlock(h util.Uint256) error { func (dao *Simple) DeleteBlock(h util.Uint256) error {
return dao.deleteBlock(h, true)
}
// PurgeBlock completely removes specified block (or just block header) from dao.
// It differs from DeleteBlock in that it removes header anyway. It's not atomic,
// so make sure you're using private MemCached instance here.
func (dao *Simple) PurgeBlock(h util.Uint256) error {
return dao.deleteBlock(h, false)
}
func (dao *Simple) deleteBlock(h util.Uint256, keepHeader bool) error {
key := dao.makeExecutableKey(h) key := dao.makeExecutableKey(h)
b, err := dao.getBlock(key) b, err := dao.getBlock(key)
if err != nil { if err != nil {
return err return err
} }
err = dao.storeHeader(key, &b.Header)
if keepHeader { if err != nil {
err = dao.storeHeader(key, &b.Header) return err
if err != nil {
return err
}
} else {
dao.Store.Delete(key)
} }
for _, tx := range b.Transactions { for _, tx := range b.Transactions {
@ -801,6 +785,14 @@ func (dao *Simple) deleteBlock(h util.Uint256, keepHeader bool) error {
return nil return nil
} }
// PurgeHeader completely removes specified header from dao. It differs from
// DeleteBlock in that it removes header anyway and does nothing except removing
// header. It does no checks for header existence.
func (dao *Simple) PurgeHeader(h util.Uint256) {
key := dao.makeExecutableKey(h)
dao.Store.Delete(key)
}
// StoreHeader saves the block header into the store. // StoreHeader saves the block header into the store.
func (dao *Simple) StoreHeader(h *block.Header) error { func (dao *Simple) StoreHeader(h *block.Header) error {
return dao.storeHeader(dao.makeExecutableKey(h.Hash()), h) return dao.storeHeader(dao.makeExecutableKey(h.Hash()), h)