diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 8c919c7b6..d473d0b45 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -703,6 +703,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) bc.log.Info("initialize state reset", zap.Uint32("target height", height)) start := time.Now() p := start + keys := 0 resetStageKey := []byte{byte(storage.SYSStateChangeStage)} switch stage { @@ -714,6 +715,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } fallthrough case stateJumpStarted: + bc.log.Info("trying to reset blocks, transactions and AERs") // Remove headers/blocks/transactions/aers from currHeight down to height (not including height itself). for i := height + 1; i <= hHeight; i++ { err := cache.PurgeBlock(bc.headerHashes[i]) @@ -722,20 +724,28 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } } cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) - _, err = cache.Persist() + keys, err = cache.Persist() if err != nil { return fmt.Errorf("failed to persist blocks, transactions ans AERs changes to the DB: %w", err) } - bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("duration", time.Since(p))) + bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) p = time.Now() fallthrough case staleBlocksRemoved: // Completely remove contract IDs to update them later. + bc.log.Info("trying to reset contract storage items and IDs") + pStorageStart := p cache.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.STContractID)}}, func(k, _ []byte) bool { cache.Store.Delete(k) return true }) + keys, err = cache.Persist() + if err != nil { + return fmt.Errorf("failed to persist removed contract IDs: %w", err) + } + bc.log.Info("removed contract IDs are persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() // Reset contracts storage and store new contract IDs. var mode = mpt.ModeAll @@ -754,19 +764,26 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) const persistBatchSize = 10000 var ( - seekErr error - cnt int + seekErr error + cnt int + storageItmsCnt int + contractIDsCnt int ) trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool { if cnt >= persistBatchSize { - _, seekErr = cache.Persist() + keys, seekErr = cache.Persist() if seekErr != nil { + seekErr = fmt.Errorf("failed to persist intermediate batch of contract storage items and IDs: %w", seekErr) return false } + bc.log.Info("intermediate batch of contract storage items and IDs is persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() } // May safely omit KV copying. k[0] = byte(newStoragePrefix) cache.Store.Put(k, v) + cnt++ + storageItmsCnt++ // @fixme: remove this part after #2702. if bytes.HasPrefix(k[1:], mgmtContractPrefix) { @@ -774,29 +791,34 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) copy(hash[:], k[mgmtCSPrefixLen:]) err = stackitem.DeserializeConvertible(v, cs) if err != nil { - seekErr = fmt.Errorf("failed to deserialize contract state: %w", err) + seekErr = fmt.Errorf("failed to deserialize contract %s state: %w", hash.StringLE(), seekErr) } cache.PutContractID(cs.ID, hash) + cnt++ + contractIDsCnt++ } - cnt++ + return seekErr == nil }) if seekErr != nil { - return fmt.Errorf("failed to reset contract IDs: %w", err) + return fmt.Errorf("failed to reset contract contract storage items and IDs: %w", seekErr) } trieStore.Close() cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)}) - _, err = cache.Persist() + keys, err = cache.Persist() if err != nil { - return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err) + return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err) } - - bc.log.Info("contracts storage and IDs are reset", zap.Duration("duration", time.Since(p))) + bc.log.Info("last batch of contract storage items and IDs is persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + bc.log.Info("contract storage items and IDs are reset", zap.Duration("took", time.Since(pStorageStart)), + zap.Int("keys", storageItmsCnt), + zap.Int("ids", contractIDsCnt)) p = time.Now() fallthrough case newStorageItemsAdded: // Reset SYS-prefixed and IX-prefixed information. + bc.log.Info("trying to reset headers information") cache.DeleteHeaderHashes(height+1, headerBatchCount) cache.StoreAsCurrentBlock(b) cache.PutCurrentHeader(b.Hash(), height) @@ -805,16 +827,17 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) bc.persistent.Version = v cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)}) - _, err = cache.Persist() + keys, err = cache.Persist() if err != nil { return fmt.Errorf("failed to persist headers changes to the DB: %w", err) } - bc.log.Info("headers are reset", zap.Duration("duration", time.Since(p))) + bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) p = time.Now() fallthrough case headersReset: // Reset MPT. + bc.log.Info("trying to reset state root information and NEP transfers") err = bc.stateRoot.ResetState(height, cache.Store) if err != nil { return fmt.Errorf("failed to rollback MPT state: %w", err) @@ -827,12 +850,13 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)}) - _, err = cache.Persist() + keys, err = cache.Persist() if err != nil { return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err) } - bc.log.Info("MPT and transfers are reset", zap.Duration("duration", time.Since(p))) + bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() fallthrough case transfersReset: // there's nothing to do after that, so just continue with common operations @@ -842,29 +866,36 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } // Direct (cache-less) DB operation: remove stale storage items. + bc.log.Info("trying to remove stale storage items") + keys = 0 err = bc.store.SeekGC(storage.SeekRange{ Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))}, }, func(_, _ []byte) bool { + keys++ return false }) if err != nil { return fmt.Errorf("faield to remove stale storage items from DB: %w", err) } + bc.log.Info("stale storage items are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() + bc.log.Info("trying to remove state reset point") cache.Store.Delete(resetStageKey) // Unlike the state jump, state sync point must be removed as we have complete state for this height. cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) - _, err = cache.Persist() + keys, err = cache.Persist() if err != nil { return fmt.Errorf("failed to persist state reset stage to DAO: %w", err) } + bc.log.Info("stale reset point is removed", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) err = bc.resetRAMState(height, true) if err != nil { return fmt.Errorf("failed to update in-memory blockchain data: %w", err) } - bc.log.Info("reset finished successfully", zap.Duration("duration", time.Since(start))) + bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start))) return nil }