From 41caeed5c075f976fb2d04bb79644ddb8cd77c51 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 11 Apr 2023 16:47:37 +0300 Subject: [PATCH 1/4] core: fix state reset log message Signed-off-by: Anna Shaleva --- pkg/core/blockchain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index bde36a856..813d46b34 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -682,7 +682,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) v := bc.dao.Version cache := bc.dao // dao is MemCachedStore over DB, so use dao directly to persist cached changes right to the underlying DB - bc.log.Info("initialize state reset", zap.Uint32("target height", height)) + bc.log.Info("initializing state reset", zap.Uint32("target height", height)) start := time.Now() p := start keys := 0 From e3747b1d57a15ed828a0282c4cc5feda223a8115 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 11 Apr 2023 16:53:24 +0300 Subject: [PATCH 2/4] core: change log level of reset stages notifications Signed-off-by: Anna Shaleva --- pkg/core/blockchain.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 813d46b34..e35b6846b 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -697,7 +697,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } fallthrough case stateJumpStarted: - bc.log.Info("trying to reset blocks, transactions and AERs") + bc.log.Debug("trying to reset blocks, transactions and AERs") // Remove blocks/transactions/aers from currHeight down to height (not including height itself). // Keep headers for now, they'll be removed later. It's hard to handle the whole set of changes in // one stage, so persist periodically. @@ -738,7 +738,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) p = time.Now() fallthrough case staleBlocksRemoved: - bc.log.Info("trying to reset contract storage items") + bc.log.Debug("trying to reset contract storage items") pStorageStart := p var mode = mpt.ModeAll @@ -797,7 +797,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) fallthrough case newStorageItemsAdded: // Reset SYS-prefixed and IX-prefixed information. - bc.log.Info("trying to reset headers information") + bc.log.Debug("trying to reset headers information") for i := height + 1; i <= hHeight; i++ { cache.PurgeHeader(bc.GetHeaderHash(i)) } @@ -819,7 +819,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) fallthrough case headersReset: // Reset MPT. - bc.log.Info("trying to reset state root information and NEP transfers") + bc.log.Debug("trying to reset state root information and NEP transfers") err = bc.stateRoot.ResetState(height, cache.Store) if err != nil { return fmt.Errorf("failed to rollback MPT state: %w", err) @@ -848,7 +848,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } // Direct (cache-less) DB operation: remove stale storage items. - bc.log.Info("trying to remove stale storage items") + bc.log.Debug("trying to remove stale storage items") keys = 0 err = bc.store.SeekGC(storage.SeekRange{ Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))}, @@ -862,7 +862,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) bc.log.Info("stale storage items are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) p = time.Now() - bc.log.Info("trying to remove state reset point") + bc.log.Debug("trying to remove state reset point") cache.Store.Delete(resetStageKey) // Unlike the state jump, state sync point must be removed as we have complete state for this height. cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) From cb0f786b28835229f066a31ac62d6e964c9d22f7 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Mon, 21 Nov 2022 12:04:52 +0300 Subject: [PATCH 3/4] core: move batch persist to a separate routine Resetting mainnet from 2512046 blocks (full logs are attached to https://github.com/nspcc-dev/neo-go/pull/2813#issuecomment-1324115555). -------- LevelDB | ------------------------ to | old | new | ------|--------|--------| 1 | 5m11s | 4m50s | ------|--------|--------| 1M | 10m40s | 9m40s | ------|--------|--------| 2.5M | 17m38s | 17m36s | ------------------------ -------- BoltDB | ------------------------ to | old | new | ------|--------|--------| 1 | 8m3s | 5m51s | ------|--------|--------| 1M | 20m30s | 13m2s | ------|--------|--------| 2.5M | 31m26s | 18m47s | ------------------------ Signed-off-by: Anna Shaleva --- pkg/core/blockchain.go | 190 ++++++++++++++++++++++++++++------------- 1 file changed, 129 insertions(+), 61 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index e35b6846b..707775189 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -685,15 +685,42 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) bc.log.Info("initializing state reset", zap.Uint32("target height", height)) start := time.Now() p := start - keys := 0 + + // Start batch persisting routine, it will be used for blocks/txs/AERs/storage items batches persist. + type postPersist func(persistedKeys int, err error) error + var ( + persistCh = make(chan postPersist) + persistToExitCh = make(chan struct{}) + ) + go func() { + for { + f, ok := <-persistCh + if !ok { + break + } + persistErr := f(cache.Persist()) + if persistErr != nil { + bc.log.Fatal("persist failed", zap.Error(persistErr)) + panic(persistErr) + } + } + close(persistToExitCh) + }() + defer func() { + close(persistCh) + <-persistToExitCh + bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start))) + }() resetStageKey := []byte{byte(storage.SYSStateChangeStage)} switch stage { case none: cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)}) - _, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err) + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err) + } + return nil } fallthrough case stateJumpStarted: @@ -703,8 +730,9 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) // one stage, so persist periodically. const persistBatchSize = 100 * headerBatchCount // count blocks only, should be enough to avoid OOM killer even for large blocks var ( - pBlocksStart = p - blocksCnt, batchCnt, keysCnt int + pBlocksStart = p + blocksCnt, batchCnt int + keysCnt = new(int) ) for i := height + 1; i <= currHeight; i++ { err := cache.DeleteBlock(bc.GetHeaderHash(i)) @@ -713,34 +741,56 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } blocksCnt++ if blocksCnt == persistBatchSize { - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist intermediate batch removed blocks, transactions and AERs: %w", err) - } blocksCnt = 0 batchCnt++ - keysCnt += keys - bc.log.Info("intermediate batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + bc.log.Info("intermediate batch of removed blocks, transactions and AERs is collected", + zap.Int("batch", batchCnt), + zap.Duration("took", time.Since(p))) + + persistStart := time.Now() + persistBatch := batchCnt + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist intermediate batch of removed blocks, transactions and AERs: %w", err) + } + *keysCnt += persistedKeys + bc.log.Debug("intermediate batch of removed blocks, transactions and AERs is persisted", + zap.Int("batch", persistBatch), + zap.Duration("took", time.Since(persistStart)), + zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() } } cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err) - } batchCnt++ - keysCnt += keys - bc.log.Info("last batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + bc.log.Info("last batch of removed blocks, transactions and AERs is collected", + zap.Int("batch", batchCnt), + zap.Duration("took", time.Since(p))) + bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(pBlocksStart))) - bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(pBlocksStart)), - zap.Int("overall persisted keys", keysCnt)) + persistStart := time.Now() + persistBatch := batchCnt + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err) + } + *keysCnt += persistedKeys + bc.log.Debug("last batch of removed blocks, transactions and AERs is persisted", + zap.Int("batch", persistBatch), + zap.Duration("took", time.Since(persistStart)), + zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() fallthrough case staleBlocksRemoved: + // Completely remove contract IDs to update them later. bc.log.Debug("trying to reset contract storage items") pStorageStart := p + p = time.Now() var mode = mpt.ModeAll if bc.config.Ledger.RemoveUntraceableBlocks { mode |= mpt.ModeGCFlag @@ -750,25 +800,27 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) newStoragePrefix := statesync.TemporaryPrefix(oldStoragePrefix) const persistBatchSize = 200000 - var ( - seekErr error - cnt int - storageItmsCnt int - batchCnt int - ) + var cnt, storageItmsCnt, batchCnt int trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool { - if seekErr != nil { - return false - } if cnt >= persistBatchSize { cnt = 0 - keys, seekErr = cache.Persist() - if seekErr != nil { - seekErr = fmt.Errorf("failed to persist intermediate batch of contract storage items and IDs: %w", seekErr) - return false - } batchCnt++ - bc.log.Info("intermediate batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + bc.log.Info("intermediate batch of contract storage items and IDs is collected", + zap.Int("batch", batchCnt), + zap.Duration("took", time.Since(p))) + + persistStart := time.Now() + persistBatch := batchCnt + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist intermediate batch of contract storage items: %w", err) + } + bc.log.Debug("intermediate batch of contract storage items is persisted", + zap.Int("batch", persistBatch), + zap.Duration("took", time.Since(persistStart)), + zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() } // May safely omit KV copying. @@ -779,20 +831,23 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) return true }) - if seekErr != nil { - return fmt.Errorf("failed to reset contract contract storage items and IDs: %w", seekErr) - } trieStore.Close() cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)}) - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err) - } batchCnt++ - bc.log.Info("last batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) - bc.log.Info("contract storage items and IDs are reset", zap.Duration("took", time.Since(pStorageStart)), + persistBatch := batchCnt + bc.log.Info("last batch of contract storage items is collected", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p))) + bc.log.Info("contract storage items are reset", zap.Duration("took", time.Since(pStorageStart)), zap.Int("keys", storageItmsCnt)) + + lastStart := time.Now() + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err) + } + bc.log.Debug("last batch of contract storage items and IDs is persisted", zap.Int("batch", persistBatch), zap.Duration("took", time.Since(lastStart)), zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() fallthrough case newStorageItemsAdded: @@ -809,12 +864,16 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) bc.persistent.Version = v cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)}) - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist headers changes to the DB: %w", err) - } + bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p))) - bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + persistStart := time.Now() + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist headers changes to the DB: %w", err) + } + bc.log.Debug("headers information is persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() fallthrough case headersReset: @@ -832,12 +891,17 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)}) - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err) - } + bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p))) - bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + persistStart := time.Now() + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err) + } + + bc.log.Debug("state root information and NEP transfers are persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() fallthrough case transfersReset: @@ -849,7 +913,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) // Direct (cache-less) DB operation: remove stale storage items. bc.log.Debug("trying to remove stale storage items") - keys = 0 + keys := 0 err = bc.store.SeekGC(storage.SeekRange{ Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))}, }, func(_, _ []byte) bool { @@ -866,18 +930,22 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) cache.Store.Delete(resetStageKey) // Unlike the state jump, state sync point must be removed as we have complete state for this height. cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist state reset stage to DAO: %w", err) + bc.log.Info("state reset point is removed", zap.Duration("took", time.Since(p))) + + persistStart := time.Now() + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist state reset stage to DAO: %w", err) + } + bc.log.Info("state reset point information is persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys)) + return nil } - bc.log.Info("stale reset point is removed", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() err = bc.resetRAMState(height, true) if err != nil { return fmt.Errorf("failed to update in-memory blockchain data: %w", err) } - - bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start))) return nil } From 400620a9fb60c8b946e067482903417e4a14e8d9 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 11 Apr 2023 14:18:48 +0300 Subject: [PATCH 4/4] core: avoid squashing of data from different state reset stages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `cache` persisting operation is concurrent with the storage modifications made by further state reset stages. We can't allow situation when data from the next stage are leaking into the previous stage. State reset stages must be atomic in turms of DB persisting, thus, use another `upperCache` MemCached store to keep them apart. Here are the results of mainnet's BoltDB reset from 1.1M to 6K: This patch: ``` anna@kiwi:~/Documents/GitProjects/nspcc-dev/neo-go$ ./bin/neo-go db reset -m --debug --height 600000 2023-04-11T16:15:25.783+0300 INFO MaxBlockSize is not set or wrong, setting default value {"MaxBlockSize": 262144} 2023-04-11T16:15:25.783+0300 INFO MaxBlockSystemFee is not set or wrong, setting default value {"MaxBlockSystemFee": 900000000000} 2023-04-11T16:15:25.783+0300 INFO MaxTransactionsPerBlock is not set or wrong, using default value {"MaxTransactionsPerBlock": 512} 2023-04-11T16:15:25.783+0300 INFO MaxValidUntilBlockIncrement is not set or wrong, using default value {"MaxValidUntilBlockIncrement": 5760} 2023-04-11T16:15:25.787+0300 INFO restoring blockchain {"version": "0.2.8"} 2023-04-11T16:15:25.906+0300 INFO initializing state reset {"target height": 600000} 2023-04-11T16:15:25.906+0300 DEBUG trying to reset blocks, transactions and AERs 2023-04-11T16:15:57.031+0300 INFO intermediate batch of removed blocks, transactions and AERs is collected {"batch": 1, "took": "31.125057214s"} 2023-04-11T16:16:12.644+0300 DEBUG intermediate batch of removed blocks, transactions and AERs is persisted {"batch": 1, "took": "15.613156971s", "keys": 321895} 2023-04-11T16:16:13.895+0300 INFO intermediate batch of removed blocks, transactions and AERs is collected {"batch": 2, "took": "16.663444208s"} 2023-04-11T16:16:19.784+0300 INFO last batch of removed blocks, transactions and AERs is collected {"batch": 3, "took": "5.760308543s"} 2023-04-11T16:16:19.784+0300 INFO blocks, transactions ans AERs are reset {"took": "53.878632911s"} 2023-04-11T16:16:22.870+0300 DEBUG intermediate batch of removed blocks, transactions and AERs is persisted {"batch": 2, "took": "8.974838893s", "keys": 334823} 2023-04-11T16:16:22.870+0300 DEBUG trying to reset contract storage items 2023-04-11T16:16:27.272+0300 DEBUG last batch of removed blocks, transactions and AERs is persisted {"batch": 3, "took": "7.487357441s", "keys": 208913} 2023-04-11T16:17:23.678+0300 INFO intermediate batch of contract storage items and IDs is collected {"batch": 1, "took": "1m0.80711106s"} 2023-04-11T16:18:00.769+0300 INFO intermediate batch of contract storage items and IDs is collected {"batch": 2, "took": "36.967660061s"} 2023-04-11T16:18:20.478+0300 DEBUG intermediate batch of contract storage items is persisted {"batch": 1, "took": "56.796257788s", "keys": 200000} 2023-04-11T16:18:54.115+0300 INFO intermediate batch of contract storage items and IDs is collected {"batch": 3, "took": "33.637412437s"} 2023-04-11T16:19:18.844+0300 DEBUG intermediate batch of contract storage items is persisted {"batch": 2, "took": "1m18.0737668s", "keys": 200000} 2023-04-11T16:19:27.650+0300 INFO last batch of contract storage items is collected {"batch": 4, "took": "8.806264019s"} 2023-04-11T16:19:27.650+0300 INFO contract storage items are reset {"took": "3m4.780232077s", "keys": 656944} 2023-04-11T16:20:15.660+0300 DEBUG intermediate batch of contract storage items is persisted {"batch": 3, "took": "1m21.544386403s", "keys": 200000} 2023-04-11T16:20:15.660+0300 DEBUG trying to reset headers information 2023-04-11T16:20:16.385+0300 INFO headers information is reset {"took": "725.174932ms"} 2023-04-11T16:20:19.586+0300 DEBUG last batch of contract storage items and IDs is persisted {"batch": 4, "took": "51.936278608s", "keys": 56945} 2023-04-11T16:20:19.587+0300 DEBUG trying to reset state root information and NEP transfers 2023-04-11T16:20:35.845+0300 INFO state root information and NEP transfers are reset {"took": "16.25852114s"} 2023-04-11T16:21:10.000+0300 DEBUG headers information is persisted {"took": "53.613638429s", "keys": 528438} 2023-04-11T16:21:10.003+0300 DEBUG trying to remove stale storage items 2023-04-11T16:21:18.108+0300 INFO stale storage items are reset {"took": "8.105140658s", "keys": 1350176} 2023-04-11T16:21:18.108+0300 DEBUG trying to remove state reset point 2023-04-11T16:21:18.108+0300 INFO state reset point is removed {"took": "8.554µs"} 2023-04-11T16:21:20.151+0300 DEBUG state root information and NEP transfers are persisted {"took": "44.305707049s", "keys": 602578} 2023-04-11T16:21:20.212+0300 INFO state reset point information is persisted {"took": "2.103764633s", "keys": 2} 2023-04-11T16:21:20.213+0300 INFO reset finished successfully {"took": "5m54.306861367s"} ``` The previous commit: ``` anna@kiwi:~/Documents/GitProjects/nspcc-dev/neo-go$ ./bin/neo-go db reset -m --debug --height 600000 2023-04-11T16:24:04.256+0300 INFO MaxBlockSize is not set or wrong, setting default value {"MaxBlockSize": 262144} 2023-04-11T16:24:04.256+0300 INFO MaxBlockSystemFee is not set or wrong, setting default value {"MaxBlockSystemFee": 900000000000} 2023-04-11T16:24:04.256+0300 INFO MaxTransactionsPerBlock is not set or wrong, using default value {"MaxTransactionsPerBlock": 512} 2023-04-11T16:24:04.256+0300 INFO MaxValidUntilBlockIncrement is not set or wrong, using default value {"MaxValidUntilBlockIncrement": 5760} 2023-04-11T16:24:04.261+0300 INFO restoring blockchain {"version": "0.2.8"} 2023-04-11T16:24:04.368+0300 INFO initializing state reset {"target height": 600000} 2023-04-11T16:24:04.368+0300 DEBUG trying to reset blocks, transactions and AERs 2023-04-11T16:24:30.363+0300 INFO intermediate batch of removed blocks, transactions and AERs is collected {"batch": 1, "took": "25.995261037s"} 2023-04-11T16:24:44.947+0300 DEBUG intermediate batch of removed blocks, transactions and AERs is persisted {"batch": 1, "took": "14.584447338s", "keys": 321897} 2023-04-11T16:24:45.791+0300 INFO intermediate batch of removed blocks, transactions and AERs is collected {"batch": 2, "took": "15.428492824s"} 2023-04-11T16:24:51.252+0300 INFO last batch of removed blocks, transactions and AERs is collected {"batch": 3, "took": "5.460662766s"} 2023-04-11T16:24:51.252+0300 INFO blocks, transactions ans AERs are reset {"took": "46.884558096s"} 2023-04-11T16:24:55.399+0300 DEBUG intermediate batch of removed blocks, transactions and AERs is persisted {"batch": 2, "took": "9.607820004s", "keys": 334821} 2023-04-11T16:24:55.399+0300 DEBUG trying to reset contract storage items 2023-04-11T16:24:59.981+0300 DEBUG last batch of removed blocks, transactions and AERs is persisted {"batch": 3, "took": "8.728713255s", "keys": 208913} 2023-04-11T16:25:50.827+0300 INFO intermediate batch of contract storage items and IDs is collected {"batch": 1, "took": "55.426411416s"} 2023-04-11T16:26:28.734+0300 INFO intermediate batch of contract storage items and IDs is collected {"batch": 2, "took": "37.902647706s"} 2023-04-11T16:26:53.960+0300 DEBUG intermediate batch of contract storage items is persisted {"batch": 1, "took": "1m3.129453265s", "keys": 200001} 2023-04-11T16:27:27.645+0300 INFO intermediate batch of contract storage items and IDs is collected {"batch": 3, "took": "33.685283662s"} 2023-04-11T16:27:52.173+0300 DEBUG intermediate batch of contract storage items is persisted {"batch": 2, "took": "1m23.438465575s", "keys": 199999} 2023-04-11T16:28:00.995+0300 INFO last batch of contract storage items is collected {"batch": 4, "took": "8.821990443s"} 2023-04-11T16:28:00.995+0300 INFO contract storage items are reset {"took": "3m5.595950958s", "keys": 656944} 2023-04-11T16:28:49.164+0300 DEBUG intermediate batch of contract storage items is persisted {"batch": 3, "took": "1m21.518344712s", "keys": 200000} 2023-04-11T16:28:49.164+0300 DEBUG trying to reset headers information 2023-04-11T16:28:49.936+0300 INFO headers information is reset {"took": "772.36435ms"} 2023-04-11T16:28:53.122+0300 DEBUG last batch of contract storage items and IDs is persisted {"batch": 4, "took": "52.126928092s", "keys": 56945} 2023-04-11T16:28:53.122+0300 DEBUG trying to reset state root information and NEP transfers 2023-04-11T16:29:09.332+0300 INFO state root information and NEP transfers are reset {"took": "16.20921699s"} 2023-04-11T16:29:46.264+0300 DEBUG headers information is persisted {"took": "56.326715249s", "keys": 528438} 2023-04-11T16:29:46.267+0300 DEBUG trying to remove stale storage items 2023-04-11T16:29:53.986+0300 INFO stale storage items are reset {"took": "7.718950145s", "keys": 1350176} 2023-04-11T16:29:53.986+0300 DEBUG trying to remove state reset point 2023-04-11T16:29:53.986+0300 INFO state reset point is removed {"took": "6.013µs"} 2023-04-11T16:29:55.899+0300 DEBUG state root information and NEP transfers are persisted {"took": "46.567302762s", "keys": 602578} 2023-04-11T16:29:55.929+0300 INFO state reset point information is persisted {"took": "1.942392208s", "keys": 2} 2023-04-11T16:29:55.929+0300 INFO reset finished successfully {"took": "5m51.561573137s"} ``` Master: ``` anna@kiwi:~/Documents/GitProjects/nspcc-dev/neo-go$ ./bin/neo-go db reset -m --debug --height 600000 2023-04-11T16:34:12.410+0300 INFO MaxBlockSize is not set or wrong, setting default value {"MaxBlockSize": 262144} 2023-04-11T16:34:12.410+0300 INFO MaxBlockSystemFee is not set or wrong, setting default value {"MaxBlockSystemFee": 900000000000} 2023-04-11T16:34:12.410+0300 INFO MaxTransactionsPerBlock is not set or wrong, using default value {"MaxTransactionsPerBlock": 512} 2023-04-11T16:34:12.410+0300 INFO MaxValidUntilBlockIncrement is not set or wrong, using default value {"MaxValidUntilBlockIncrement": 5760} 2023-04-11T16:34:12.413+0300 INFO restoring blockchain {"version": "0.2.8"} 2023-04-11T16:34:12.495+0300 INFO initialize state reset {"target height": 600000} 2023-04-11T16:34:12.513+0300 INFO trying to reset blocks, transactions and AERs 2023-04-11T16:35:03.582+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 1, "took": "51.087226195s", "keys": 321895} 2023-04-11T16:35:31.302+0300 INFO intermediate batch of removed blocks, transactions and AERs is persisted {"batches persisted": 2, "took": "27.719871393s", "keys": 334823} 2023-04-11T16:35:41.309+0300 INFO last batch of removed blocks, transactions and AERs is persisted {"batches persisted": 3, "took": "10.007017388s", "keys": 208913} 2023-04-11T16:35:41.309+0300 INFO blocks, transactions ans AERs are reset {"took": "1m28.814245057s", "overall persisted keys": 865631} 2023-04-11T16:35:41.309+0300 INFO trying to reset contract storage items 2023-04-11T16:37:38.315+0300 INFO intermediate batch of contract storage items and IDs is persisted {"batch": 1, "took": "1m57.00650253s", "keys": 200000} 2023-04-11T16:39:29.704+0300 INFO intermediate batch of contract storage items and IDs is persisted {"batch": 2, "took": "1m51.385224725s", "keys": 200000} 2023-04-11T16:41:14.991+0300 INFO intermediate batch of contract storage items and IDs is persisted {"batch": 3, "took": "1m45.287483794s", "keys": 200000} 2023-04-11T16:41:31.667+0300 INFO last batch of contract storage items and IDs is persisted {"batch": 4, "took": "16.675347478s", "keys": 56945} 2023-04-11T16:41:31.667+0300 INFO contract storage items and IDs are reset {"took": "5m50.357775401s", "keys": 656944} 2023-04-11T16:41:31.667+0300 INFO trying to reset headers information 2023-04-11T16:42:16.779+0300 INFO headers information is reset {"took": "45.111354262s", "keys": 528438} 2023-04-11T16:42:16.784+0300 INFO trying to reset state root information and NEP transfers 2023-04-11T16:42:35.778+0300 INFO state root information and NEP transfers are reset {"took": "18.99373117s", "keys": 602578} 2023-04-11T16:42:35.781+0300 INFO trying to remove stale storage items 2023-04-11T16:42:43.884+0300 INFO stale storage items are reset {"took": "8.103929306s", "keys": 1350176} 2023-04-11T16:42:43.885+0300 INFO trying to remove state reset point 2023-04-11T16:42:43.926+0300 INFO stale reset point is removed {"took": "41.858883ms", "keys": 2} 2023-04-11T16:42:43.932+0300 INFO reset finished successfully {"took": "8m31.437493325s"} ``` Signed-off-by: Anna Shaleva --- pkg/core/blockchain.go | 91 +++++++++++++++++++++++++++++++++--------- 1 file changed, 73 insertions(+), 18 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 707775189..f9996d3b4 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -680,7 +680,16 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) return fmt.Errorf("failed to retrieve stateroot for height %d: %w", height, err) } v := bc.dao.Version - cache := bc.dao // dao is MemCachedStore over DB, so use dao directly to persist cached changes right to the underlying DB + // dao is MemCachedStore over DB, we use dao directly to persist cached changes + // right to the underlying DB. + cache := bc.dao + // upperCache is a private MemCachedStore over cache. During each of the state + // sync stages we put the data inside the upperCache; in the end of each stage + // we persist changes from upperCache to cache. Changes from cache are persisted + // directly to the underlying persistent storage (boltDB, levelDB, etc.). + // upperCache/cache segregation is needed to keep the DB state clean and to + // persist data from different stages separately. + upperCache := cache.GetPrivate() bc.log.Info("initializing state reset", zap.Uint32("target height", height)) start := time.Now() @@ -715,7 +724,14 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) resetStageKey := []byte{byte(storage.SYSStateChangeStage)} switch stage { case none: - cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)}) + upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)}) + // Technically, there's no difference between Persist() and PersistSync() for the private + // MemCached storage, but we'd better use the sync version in case of some further code changes. + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err) @@ -735,7 +751,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) keysCnt = new(int) ) for i := height + 1; i <= currHeight; i++ { - err := cache.DeleteBlock(bc.GetHeaderHash(i)) + err := upperCache.DeleteBlock(bc.GetHeaderHash(i)) if err != nil { return fmt.Errorf("error while removing block %d: %w", i, err) } @@ -749,6 +765,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) persistStart := time.Now() persistBatch := batchCnt + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist intermediate batch of removed blocks, transactions and AERs: %w", err) @@ -763,7 +784,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) p = time.Now() } } - cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) + upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) batchCnt++ bc.log.Info("last batch of removed blocks, transactions and AERs is collected", zap.Int("batch", batchCnt), @@ -772,6 +793,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) persistStart := time.Now() persistBatch := batchCnt + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err) @@ -795,7 +821,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) if bc.config.Ledger.RemoveUntraceableBlocks { mode |= mpt.ModeGCFlag } - trieStore := mpt.NewTrieStore(sr.Root, mode, cache.Store) + trieStore := mpt.NewTrieStore(sr.Root, mode, upperCache.Store) oldStoragePrefix := v.StoragePrefix newStoragePrefix := statesync.TemporaryPrefix(oldStoragePrefix) @@ -811,6 +837,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) persistStart := time.Now() persistBatch := batchCnt + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist intermediate batch of contract storage items: %w", err) @@ -825,7 +856,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } // May safely omit KV copying. k[0] = byte(newStoragePrefix) - cache.Store.Put(k, v) + upperCache.Store.Put(k, v) cnt++ storageItmsCnt++ @@ -833,7 +864,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) }) trieStore.Close() - cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)}) + upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)}) batchCnt++ persistBatch := batchCnt bc.log.Info("last batch of contract storage items is collected", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p))) @@ -841,6 +872,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) zap.Int("keys", storageItmsCnt)) lastStart := time.Now() + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err) @@ -854,19 +890,29 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) // Reset SYS-prefixed and IX-prefixed information. bc.log.Debug("trying to reset headers information") for i := height + 1; i <= hHeight; i++ { - cache.PurgeHeader(bc.GetHeaderHash(i)) + upperCache.PurgeHeader(bc.GetHeaderHash(i)) } - cache.DeleteHeaderHashes(height+1, headerBatchCount) - cache.StoreAsCurrentBlock(b) - cache.PutCurrentHeader(b.Hash(), height) + upperCache.DeleteHeaderHashes(height+1, headerBatchCount) + upperCache.StoreAsCurrentBlock(b) + upperCache.PutCurrentHeader(b.Hash(), height) v.StoragePrefix = statesync.TemporaryPrefix(v.StoragePrefix) - cache.PutVersion(v) + upperCache.PutVersion(v) + // It's important to manually change the cache's Version at this stage, so that native cache + // can be properly initialized (with the correct contract storage data prefix) at the final + // stage of the state reset. At the same time, DB's SYSVersion-prefixed data will be persisted + // from upperCache to cache in a standard way (several lines below). + cache.Version = v bc.persistent.Version = v - cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)}) + upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)}) bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p))) persistStart := time.Now() + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist headers changes to the DB: %w", err) @@ -879,21 +925,26 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) case headersReset: // Reset MPT. bc.log.Debug("trying to reset state root information and NEP transfers") - err = bc.stateRoot.ResetState(height, cache.Store) + err = bc.stateRoot.ResetState(height, upperCache.Store) if err != nil { return fmt.Errorf("failed to rollback MPT state: %w", err) } // Reset transfers. - err = bc.resetTransfers(cache, height) + err = bc.resetTransfers(upperCache, height) if err != nil { return fmt.Errorf("failed to strip transfer log / transfer info: %w", err) } - cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)}) + upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)}) bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p))) persistStart := time.Now() + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err) @@ -927,12 +978,16 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) p = time.Now() bc.log.Debug("trying to remove state reset point") - cache.Store.Delete(resetStageKey) + upperCache.Store.Delete(resetStageKey) // Unlike the state jump, state sync point must be removed as we have complete state for this height. - cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) + upperCache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) bc.log.Info("state reset point is removed", zap.Duration("took", time.Since(p))) persistStart := time.Now() + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist state reset stage to DAO: %w", err)