From 9f23fafc03bf32576e80eecb7ca13775cc21237d Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 18 Nov 2022 12:51:17 +0300 Subject: [PATCH 01/12] core: improve logging of resetStateInternal Inform when starting subsequent stage, inform about keys persisted. --- pkg/core/blockchain.go | 67 ++++++++++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 18 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 8c919c7b6..d473d0b45 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -703,6 +703,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) bc.log.Info("initialize state reset", zap.Uint32("target height", height)) start := time.Now() p := start + keys := 0 resetStageKey := []byte{byte(storage.SYSStateChangeStage)} switch stage { @@ -714,6 +715,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } fallthrough case stateJumpStarted: + bc.log.Info("trying to reset blocks, transactions and AERs") // Remove headers/blocks/transactions/aers from currHeight down to height (not including height itself). for i := height + 1; i <= hHeight; i++ { err := cache.PurgeBlock(bc.headerHashes[i]) @@ -722,20 +724,28 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } } cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) - _, err = cache.Persist() + keys, err = cache.Persist() if err != nil { return fmt.Errorf("failed to persist blocks, transactions ans AERs changes to the DB: %w", err) } - bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("duration", time.Since(p))) + bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) p = time.Now() fallthrough case staleBlocksRemoved: // Completely remove contract IDs to update them later. + bc.log.Info("trying to reset contract storage items and IDs") + pStorageStart := p cache.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.STContractID)}}, func(k, _ []byte) bool { cache.Store.Delete(k) return true }) + keys, err = cache.Persist() + if err != nil { + return fmt.Errorf("failed to persist removed contract IDs: %w", err) + } + bc.log.Info("removed contract IDs are persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() // Reset contracts storage and store new contract IDs. var mode = mpt.ModeAll @@ -754,19 +764,26 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) const persistBatchSize = 10000 var ( - seekErr error - cnt int + seekErr error + cnt int + storageItmsCnt int + contractIDsCnt int ) trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool { if cnt >= persistBatchSize { - _, seekErr = cache.Persist() + keys, seekErr = cache.Persist() if seekErr != nil { + seekErr = fmt.Errorf("failed to persist intermediate batch of contract storage items and IDs: %w", seekErr) return false } + bc.log.Info("intermediate batch of contract storage items and IDs is persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() } // May safely omit KV copying. k[0] = byte(newStoragePrefix) cache.Store.Put(k, v) + cnt++ + storageItmsCnt++ // @fixme: remove this part after #2702. if bytes.HasPrefix(k[1:], mgmtContractPrefix) { @@ -774,29 +791,34 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) copy(hash[:], k[mgmtCSPrefixLen:]) err = stackitem.DeserializeConvertible(v, cs) if err != nil { - seekErr = fmt.Errorf("failed to deserialize contract state: %w", err) + seekErr = fmt.Errorf("failed to deserialize contract %s state: %w", hash.StringLE(), seekErr) } cache.PutContractID(cs.ID, hash) + cnt++ + contractIDsCnt++ } - cnt++ + return seekErr == nil }) if seekErr != nil { - return fmt.Errorf("failed to reset contract IDs: %w", err) + return fmt.Errorf("failed to reset contract contract storage items and IDs: %w", seekErr) } trieStore.Close() cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)}) - _, err = cache.Persist() + keys, err = cache.Persist() if err != nil { - return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err) + return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err) } - - bc.log.Info("contracts storage and IDs are reset", zap.Duration("duration", time.Since(p))) + bc.log.Info("last batch of contract storage items and IDs is persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + bc.log.Info("contract storage items and IDs are reset", zap.Duration("took", time.Since(pStorageStart)), + zap.Int("keys", storageItmsCnt), + zap.Int("ids", contractIDsCnt)) p = time.Now() fallthrough case newStorageItemsAdded: // Reset SYS-prefixed and IX-prefixed information. + bc.log.Info("trying to reset headers information") cache.DeleteHeaderHashes(height+1, headerBatchCount) cache.StoreAsCurrentBlock(b) cache.PutCurrentHeader(b.Hash(), height) @@ -805,16 +827,17 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) bc.persistent.Version = v cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)}) - _, err = cache.Persist() + keys, err = cache.Persist() if err != nil { return fmt.Errorf("failed to persist headers changes to the DB: %w", err) } - bc.log.Info("headers are reset", zap.Duration("duration", time.Since(p))) + bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) p = time.Now() fallthrough case headersReset: // Reset MPT. + bc.log.Info("trying to reset state root information and NEP transfers") err = bc.stateRoot.ResetState(height, cache.Store) if err != nil { return fmt.Errorf("failed to rollback MPT state: %w", err) @@ -827,12 +850,13 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)}) - _, err = cache.Persist() + keys, err = cache.Persist() if err != nil { return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err) } - bc.log.Info("MPT and transfers are reset", zap.Duration("duration", time.Since(p))) + bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() fallthrough case transfersReset: // there's nothing to do after that, so just continue with common operations @@ -842,29 +866,36 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } // Direct (cache-less) DB operation: remove stale storage items. + bc.log.Info("trying to remove stale storage items") + keys = 0 err = bc.store.SeekGC(storage.SeekRange{ Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))}, }, func(_, _ []byte) bool { + keys++ return false }) if err != nil { return fmt.Errorf("faield to remove stale storage items from DB: %w", err) } + bc.log.Info("stale storage items are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() + bc.log.Info("trying to remove state reset point") cache.Store.Delete(resetStageKey) // Unlike the state jump, state sync point must be removed as we have complete state for this height. cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) - _, err = cache.Persist() + keys, err = cache.Persist() if err != nil { return fmt.Errorf("failed to persist state reset stage to DAO: %w", err) } + bc.log.Info("stale reset point is removed", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) err = bc.resetRAMState(height, true) if err != nil { return fmt.Errorf("failed to update in-memory blockchain data: %w", err) } - bc.log.Info("reset finished successfully", zap.Duration("duration", time.Since(start))) + bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start))) return nil } From 235518eb6c93de50d5d3be777808bb1d94a700f9 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 18 Nov 2022 12:53:09 +0300 Subject: [PATCH 02/12] core: reset batch counter to zero after each persist in resetStateInternal It's a bug, otherwise we'll persist each storage item after 10K-th one, that's the reason of abnormous long storage items resetting stage. --- pkg/core/blockchain.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index d473d0b45..5f73dbe42 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -771,6 +771,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) ) trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool { if cnt >= persistBatchSize { + cnt = 0 keys, seekErr = cache.Persist() if seekErr != nil { seekErr = fmt.Errorf("failed to persist intermediate batch of contract storage items and IDs: %w", seekErr) From bfe7aeae7b051de0172889c12897b890a750d452 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 18 Nov 2022 12:55:37 +0300 Subject: [PATCH 03/12] core: stop storage items reset after the first persist error It's a bug, we mustn't continue if something bad had happend on persist, otherwise this error will be overwritten by subsequent successfull persist. --- pkg/core/blockchain.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 5f73dbe42..b78e69f91 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -770,6 +770,9 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) contractIDsCnt int ) trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool { + if seekErr != nil { + return false + } if cnt >= persistBatchSize { cnt = 0 keys, seekErr = cache.Persist() From ecda07736e74d5715c32e089df4c7075f818d185 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 18 Nov 2022 12:57:35 +0300 Subject: [PATCH 04/12] core: stop storage items reset after any seek error --- pkg/core/blockchain.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index b78e69f91..6161abb7d 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -796,13 +796,14 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) err = stackitem.DeserializeConvertible(v, cs) if err != nil { seekErr = fmt.Errorf("failed to deserialize contract %s state: %w", hash.StringLE(), seekErr) + return false } cache.PutContractID(cs.ID, hash) cnt++ contractIDsCnt++ } - return seekErr == nil + return true }) if seekErr != nil { return fmt.Errorf("failed to reset contract contract storage items and IDs: %w", seekErr) From f52451e5820e7cc2f2d2c9f8f3e12fdaca21fb11 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 18 Nov 2022 12:59:39 +0300 Subject: [PATCH 05/12] core: fix state reset with broken contract Sync up with #2802, bad contract -> no contract ID at all. --- pkg/core/blockchain.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 6161abb7d..03f8c3ddc 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -795,12 +795,14 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) copy(hash[:], k[mgmtCSPrefixLen:]) err = stackitem.DeserializeConvertible(v, cs) if err != nil { - seekErr = fmt.Errorf("failed to deserialize contract %s state: %w", hash.StringLE(), seekErr) - return false + bc.log.Warn("failed to deserialize contract; ID for this contract won't be stored in the DB", + zap.String("hash", hash.StringLE()), + zap.Error(err)) + } else { + cache.PutContractID(cs.ID, hash) + cnt++ + contractIDsCnt++ } - cache.PutContractID(cs.ID, hash) - cnt++ - contractIDsCnt++ } return true From 7d55bf2cc1d026e8f34afddaada615cc6c3b5d0b Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 18 Nov 2022 13:20:54 +0300 Subject: [PATCH 06/12] core: log persisted storage item batches count during state reset --- pkg/core/blockchain.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 03f8c3ddc..b2351231f 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -768,6 +768,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) cnt int storageItmsCnt int contractIDsCnt int + batchCnt int ) trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool { if seekErr != nil { @@ -780,7 +781,8 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) seekErr = fmt.Errorf("failed to persist intermediate batch of contract storage items and IDs: %w", seekErr) return false } - bc.log.Info("intermediate batch of contract storage items and IDs is persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + batchCnt++ + bc.log.Info("intermediate batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) p = time.Now() } // May safely omit KV copying. @@ -817,7 +819,8 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) if err != nil { return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err) } - bc.log.Info("last batch of contract storage items and IDs is persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + batchCnt++ + bc.log.Info("last batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) bc.log.Info("contract storage items and IDs are reset", zap.Duration("took", time.Since(pStorageStart)), zap.Int("keys", storageItmsCnt), zap.Int("ids", contractIDsCnt)) From 283da8f599872ac415cbd04b7082dec8196fd00e Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Sun, 20 Nov 2022 20:55:39 +0300 Subject: [PATCH 07/12] core: use DAO-provided block height during during state reset Don't use cache because it's not yet initialized. Also, perform safety checks only if state reset wasn't yet started. These fixes alloww to solve the following problem while recovering from interrupted state reset: ``` anna@kiwi:~/Documents/GitProjects/nspcc-dev/neo-go$ ./bin/neo-go db reset -t --height 83000 2022-11-20T15:51:31.431+0300 INFO MaxValidUntilBlockIncrement is not set or wrong, using default value {"MaxValidUntilBlockIncrement": 5760} 2022-11-20T15:51:31.434+0300 INFO restoring blockchain {"version": "0.2.6"} failed to create Blockchain instance: could not initialize blockchain: current block height is 0, can't reset state to height 83000 ``` --- pkg/core/blockchain.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index b2351231f..5696e6bd7 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -672,19 +672,26 @@ func (bc *Blockchain) Reset(height uint32) error { } func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) error { - currHeight := bc.BlockHeight() - if height > currHeight { - return fmt.Errorf("current block height is %d, can't reset state to height %d", currHeight, height) + // Cache isn't yet initialized, so retrieve header right from DAO. + currHeight, err := bc.dao.GetCurrentBlockHeight() + if err != nil { + return fmt.Errorf("failed to retrieve current block height: %w", err) } - if height == currHeight && stage == none { - bc.log.Info("chain is already at the proper state", zap.Uint32("height", height)) - return nil - } - if bc.config.KeepOnlyLatestState { - return fmt.Errorf("KeepOnlyLatestState is enabled, state for height %d is outdated and removed from the storage", height) - } - if bc.config.RemoveUntraceableBlocks && currHeight >= bc.config.MaxTraceableBlocks { - return fmt.Errorf("RemoveUntraceableBlocks is enabled, a necessary batch of traceable blocks has already been removed") + // State reset may already be started by this moment, so perform these checks only if it wasn't. + if stage == none { + if height > currHeight { + return fmt.Errorf("current block height is %d, can't reset state to height %d", currHeight, height) + } + if height == currHeight { + bc.log.Info("chain is already at the proper state", zap.Uint32("height", height)) + return nil + } + if bc.config.KeepOnlyLatestState { + return fmt.Errorf("KeepOnlyLatestState is enabled, state for height %d is outdated and removed from the storage", height) + } + if bc.config.RemoveUntraceableBlocks && currHeight >= bc.config.MaxTraceableBlocks { + return fmt.Errorf("RemoveUntraceableBlocks is enabled, a necessary batch of traceable blocks has already been removed") + } } // Retrieve necessary state before the DB modification. From d67f0df516df4bad8408cef44a8821b66ad36289 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Sun, 20 Nov 2022 20:55:45 +0300 Subject: [PATCH 08/12] core: reset block headers together with header height info We need to keep the headers information consistent with header batches and headers. This comit fixes the bug with failing blockchain initialization on recovering from state reset interrupted after the second stage (blocks/txs/AERs removal): ``` anna@kiwi:~/Documents/GitProjects/nspcc-dev/neo-go$ ./bin/neo-go db reset -t --height 83000 2022-11-20T16:28:29.437+0300 INFO MaxValidUntilBlockIncrement is not set or wrong, using default value {"MaxValidUntilBlockIncrement": 5760} 2022-11-20T16:28:29.440+0300 INFO restoring blockchain {"version": "0.2.6"} failed to create Blockchain instance: could not initialize blockchain: could not get header 1898cd356a4a2688ed1c6c7ba1fd6ba7d516959d8add3f8dd26232474d4539bd: key not found ``` --- pkg/core/blockchain.go | 12 ++++++++---- pkg/core/dao/dao.go | 30 +++++++++++------------------- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 5696e6bd7..0f277b504 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -672,7 +672,7 @@ func (bc *Blockchain) Reset(height uint32) error { } func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) error { - // Cache isn't yet initialized, so retrieve header right from DAO. + // Cache isn't yet initialized, so retrieve header height right from DAO. currHeight, err := bc.dao.GetCurrentBlockHeight() if err != nil { return fmt.Errorf("failed to retrieve current block height: %w", err) @@ -723,9 +723,10 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) fallthrough case stateJumpStarted: bc.log.Info("trying to reset blocks, transactions and AERs") - // Remove headers/blocks/transactions/aers from currHeight down to height (not including height itself). - for i := height + 1; i <= hHeight; i++ { - err := cache.PurgeBlock(bc.headerHashes[i]) + // Remove blocks/transactions/aers from currHeight down to height (not including height itself). + // Keep headers for now, they'll be removed later. + for i := height + 1; i <= currHeight; i++ { + err := cache.DeleteBlock(bc.GetHeaderHash(int(i))) if err != nil { return fmt.Errorf("error while removing block %d: %w", i, err) } @@ -836,6 +837,9 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) case newStorageItemsAdded: // Reset SYS-prefixed and IX-prefixed information. bc.log.Info("trying to reset headers information") + for i := height + 1; i <= hHeight; i++ { + cache.PurgeHeader(bc.GetHeaderHash(int(i))) + } cache.DeleteHeaderHashes(height+1, headerBatchCount) cache.StoreAsCurrentBlock(b) cache.PutCurrentHeader(b.Hash(), height) diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 14f4df1d6..f802dab52 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -759,31 +759,15 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a // DeleteBlock removes the block from dao. It's not atomic, so make sure you're // using private MemCached instance here. func (dao *Simple) DeleteBlock(h util.Uint256) error { - return dao.deleteBlock(h, true) -} - -// PurgeBlock completely removes specified block (or just block header) from dao. -// It differs from DeleteBlock in that it removes header anyway. It's not atomic, -// so make sure you're using private MemCached instance here. -func (dao *Simple) PurgeBlock(h util.Uint256) error { - return dao.deleteBlock(h, false) -} - -func (dao *Simple) deleteBlock(h util.Uint256, keepHeader bool) error { key := dao.makeExecutableKey(h) b, err := dao.getBlock(key) if err != nil { return err } - - if keepHeader { - err = dao.storeHeader(key, &b.Header) - if err != nil { - return err - } - } else { - dao.Store.Delete(key) + err = dao.storeHeader(key, &b.Header) + if err != nil { + return err } for _, tx := range b.Transactions { @@ -801,6 +785,14 @@ func (dao *Simple) deleteBlock(h util.Uint256, keepHeader bool) error { return nil } +// PurgeHeader completely removes specified header from dao. It differs from +// DeleteBlock in that it removes header anyway and does nothing except removing +// header. It does no checks for header existence. +func (dao *Simple) PurgeHeader(h util.Uint256) { + key := dao.makeExecutableKey(h) + dao.Store.Delete(key) +} + // StoreHeader saves the block header into the store. func (dao *Simple) StoreHeader(h *block.Header) error { return dao.storeHeader(dao.makeExecutableKey(h.Hash()), h) From bdc42cd5952ce963c760f25312399f25157d4f38 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Sun, 20 Nov 2022 20:55:48 +0300 Subject: [PATCH 09/12] core: reset blocks, txs and AERs in several stages Sometimes it can be hard to persist all changes at ones, the process can take almost all RAM and a lot of time. Here's the example of reset for mainnet from 2.4M to 1: ``` anna@kiwi:~/Documents/GitProjects/nspcc-dev/neo-go$ ./bin/neo-go db reset -m --height 1 2022-11-20T17:16:48.236+0300 INFO MaxBlockSize is not set or wrong, setting default value {"MaxBlockSize": 262144} 2022-11-20T17:16:48.236+0300 INFO MaxBlockSystemFee is not set or wrong, setting default value {"MaxBlockSystemFee": 900000000000} 2022-11-20T17:16:48.237+0300 INFO MaxTransactionsPerBlock is not set or wrong, using default value {"MaxTransactionsPerBlock": 512} 2022-11-20T17:16:48.237+0300 INFO MaxValidUntilBlockIncrement is not set or wrong, using default value {"MaxValidUntilBlockIncrement": 5760} 2022-11-20T17:16:48.240+0300 INFO restoring blockchain {"version": "0.2.6"} 2022-11-20T17:16:48.297+0300 INFO initialize state reset {"target height": 1} 2022-11-20T17:16:48.300+0300 INFO trying to reset blocks, transactions and AERs 2022-11-20T17:19:29.313+0300 INFO blocks, transactions ans AERs are reset {"took": "2m41.015126493s", "keys": 3958420} ... ``` To avoid OOM killer, split blocks reset into multiple stages. It increases operation time due to intermediate DB persists, but makes things cleaner, the result for almost the same DB height with the new approach: ``` anna@kiwi:~/Documents/GitProjects/nspcc-dev/neo-go$ ./bin/neo-go db reset -m --height 1 2022-11-20T17:39:42.023+0300 INFO MaxBlockSize is not set or wrong, setting default value {"MaxBlockSize": 262144} 2022-11-20T17:39:42.023+0300 INFO MaxBlockSystemFee is not set or wrong, setting default value {"MaxBlockSystemFee": 900000000000} 2022-11-20T17:39:42.023+0300 INFO MaxTransactionsPerBlock is not set or wrong, using default value {"MaxTransactionsPerBlock": 512} 2022-11-20T17:39:42.023+0300 INFO MaxValidUntilBlockIncrement is not set or wrong, using default value {"MaxValidUntilBlockIncrement": 5760} 2022-11-20T17:39:42.026+0300 INFO restoring blockchain {"version": "0.2.6"} 2022-11-20T17:39:42.071+0300 INFO initialize state reset {"target height": 1} 2022-11-20T17:39:42.073+0300 INFO trying to reset blocks, transactions and AERs 2022-11-20T17:40:11.735+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 1, "took": "29.66363737s", "keys": 210973} 2022-11-20T17:40:33.574+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 2, "took": "21.839208683s", "keys": 241203} 2022-11-20T17:41:29.325+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 3, "took": "55.750698386s", "keys": 250593} 2022-11-20T17:42:12.532+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 4, "took": "43.205892757s", "keys": 321896} 2022-11-20T17:43:07.978+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 5, "took": "55.445398156s", "keys": 334822} 2022-11-20T17:43:35.603+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 6, "took": "27.625292032s", "keys": 317131} 2022-11-20T17:43:51.747+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 7, "took": "16.144359017s", "keys": 355832} 2022-11-20T17:44:05.176+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 8, "took": "13.428733899s", "keys": 357690} 2022-11-20T17:44:32.895+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 9, "took": "27.718548783s", "keys": 393356} 2022-11-20T17:44:51.814+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 10, "took": "18.917954658s", "keys": 366492} 2022-11-20T17:45:07.208+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 11, "took": "15.392642196s", "keys": 326030} 2022-11-20T17:45:18.776+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 12, "took": "11.568255716s", "keys": 299884} 2022-11-20T17:45:25.862+0300 INFO last batch of removed blocks, transactions and AERs is persisted {"batches persisted": 13, "took": "7.086079594s", "keys": 190399} 2022-11-20T17:45:25.862+0300 INFO blocks, transactions ans AERs are reset {"took": "5m43.791214084s", "overall persisted keys": 3966301} ... ``` --- pkg/core/blockchain.go | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 0f277b504..aa8957ad9 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -724,20 +724,42 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) case stateJumpStarted: bc.log.Info("trying to reset blocks, transactions and AERs") // Remove blocks/transactions/aers from currHeight down to height (not including height itself). - // Keep headers for now, they'll be removed later. + // Keep headers for now, they'll be removed later. It's hard to handle the whole set of changes in + // one stage, so persist periodically. + const persistBatchSize = 100 * headerBatchCount // count blocks only, should be enough to avoid OOM killer even for large blocks + var ( + pBlocksStart = p + blocksCnt, batchCnt, keysCnt int + ) for i := height + 1; i <= currHeight; i++ { err := cache.DeleteBlock(bc.GetHeaderHash(int(i))) if err != nil { return fmt.Errorf("error while removing block %d: %w", i, err) } + blocksCnt++ + if blocksCnt == persistBatchSize { + keys, err = cache.Persist() + if err != nil { + return fmt.Errorf("failed to persist intermediate batch removed blocks, transactions and AERs: %w", err) + } + blocksCnt = 0 + batchCnt++ + keysCnt += keys + bc.log.Info("intermediate batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() + } } cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) keys, err = cache.Persist() if err != nil { - return fmt.Errorf("failed to persist blocks, transactions ans AERs changes to the DB: %w", err) + return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err) } + batchCnt++ + keysCnt += keys + bc.log.Info("last batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) - bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(pBlocksStart)), + zap.Int("overall persisted keys", keysCnt)) p = time.Now() fallthrough case staleBlocksRemoved: From b82374823e4c801c8920b3786dacbf668d3b2ddc Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Sun, 20 Nov 2022 20:55:51 +0300 Subject: [PATCH 10/12] core: increase persist batch size for reset storage changes --- pkg/core/blockchain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index aa8957ad9..1c48946c4 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -792,7 +792,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) mgmtContractPrefix[4] = native.PrefixContract cs := new(state.Contract) - const persistBatchSize = 10000 + const persistBatchSize = 200000 var ( seekErr error cnt int From b27a9bcf95cfc8a80899c72c0b667200efe5c716 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Sun, 20 Nov 2022 21:05:20 +0300 Subject: [PATCH 11/12] core: adjust info message for proper-stated chains Make it prettier for those cases when `db reset` command was called after interrupted reset. --- pkg/core/blockchain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 1c48946c4..ac3ac231b 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -683,7 +683,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) return fmt.Errorf("current block height is %d, can't reset state to height %d", currHeight, height) } if height == currHeight { - bc.log.Info("chain is already at the proper state", zap.Uint32("height", height)) + bc.log.Info("chain is at the proper state", zap.Uint32("height", height)) return nil } if bc.config.KeepOnlyLatestState { From f3ef2890f01269a66847688e87fd9cb7f450b78f Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Mon, 21 Nov 2022 15:50:44 +0300 Subject: [PATCH 12/12] core: check headers at the proper state on state reset And fix the comment along the way. --- pkg/core/blockchain.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index ac3ac231b..8b6ab5049 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -672,17 +672,19 @@ func (bc *Blockchain) Reset(height uint32) error { } func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) error { - // Cache isn't yet initialized, so retrieve header height right from DAO. + // Cache isn't yet initialized, so retrieve block height right from DAO. currHeight, err := bc.dao.GetCurrentBlockHeight() if err != nil { return fmt.Errorf("failed to retrieve current block height: %w", err) } + // Headers are already initialized by this moment, thus may use chain's API. + hHeight := bc.HeaderHeight() // State reset may already be started by this moment, so perform these checks only if it wasn't. if stage == none { if height > currHeight { return fmt.Errorf("current block height is %d, can't reset state to height %d", currHeight, height) } - if height == currHeight { + if height == currHeight && hHeight == currHeight { bc.log.Info("chain is at the proper state", zap.Uint32("height", height)) return nil } @@ -695,7 +697,6 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } // Retrieve necessary state before the DB modification. - hHeight := bc.HeaderHeight() b, err := bc.GetBlock(bc.headerHashes[height]) if err != nil { return fmt.Errorf("failed to retrieve block %d: %w", height, err)