core: improve logging of resetStateInternal

Inform when starting subsequent stage, inform about keys persisted.
This commit is contained in:
Anna Shaleva 2022-11-18 12:51:17 +03:00
parent ca9fde745b
commit 9f23fafc03

View file

@ -703,6 +703,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
bc.log.Info("initialize state reset", zap.Uint32("target height", height)) bc.log.Info("initialize state reset", zap.Uint32("target height", height))
start := time.Now() start := time.Now()
p := start p := start
keys := 0
resetStageKey := []byte{byte(storage.SYSStateChangeStage)} resetStageKey := []byte{byte(storage.SYSStateChangeStage)}
switch stage { switch stage {
@ -714,6 +715,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
} }
fallthrough fallthrough
case stateJumpStarted: case stateJumpStarted:
bc.log.Info("trying to reset blocks, transactions and AERs")
// Remove headers/blocks/transactions/aers from currHeight down to height (not including height itself). // Remove headers/blocks/transactions/aers from currHeight down to height (not including height itself).
for i := height + 1; i <= hHeight; i++ { for i := height + 1; i <= hHeight; i++ {
err := cache.PurgeBlock(bc.headerHashes[i]) err := cache.PurgeBlock(bc.headerHashes[i])
@ -722,20 +724,28 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
} }
} }
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)})
_, err = cache.Persist() keys, err = cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed to persist blocks, transactions ans AERs changes to the DB: %w", err) return fmt.Errorf("failed to persist blocks, transactions ans AERs changes to the DB: %w", err)
} }
bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("duration", time.Since(p))) bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now() p = time.Now()
fallthrough fallthrough
case staleBlocksRemoved: case staleBlocksRemoved:
// Completely remove contract IDs to update them later. // Completely remove contract IDs to update them later.
bc.log.Info("trying to reset contract storage items and IDs")
pStorageStart := p
cache.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.STContractID)}}, func(k, _ []byte) bool { cache.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.STContractID)}}, func(k, _ []byte) bool {
cache.Store.Delete(k) cache.Store.Delete(k)
return true return true
}) })
keys, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist removed contract IDs: %w", err)
}
bc.log.Info("removed contract IDs are persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now()
// Reset contracts storage and store new contract IDs. // Reset contracts storage and store new contract IDs.
var mode = mpt.ModeAll var mode = mpt.ModeAll
@ -754,19 +764,26 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
const persistBatchSize = 10000 const persistBatchSize = 10000
var ( var (
seekErr error seekErr error
cnt int cnt int
storageItmsCnt int
contractIDsCnt int
) )
trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool { trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool {
if cnt >= persistBatchSize { if cnt >= persistBatchSize {
_, seekErr = cache.Persist() keys, seekErr = cache.Persist()
if seekErr != nil { if seekErr != nil {
seekErr = fmt.Errorf("failed to persist intermediate batch of contract storage items and IDs: %w", seekErr)
return false return false
} }
bc.log.Info("intermediate batch of contract storage items and IDs is persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now()
} }
// May safely omit KV copying. // May safely omit KV copying.
k[0] = byte(newStoragePrefix) k[0] = byte(newStoragePrefix)
cache.Store.Put(k, v) cache.Store.Put(k, v)
cnt++
storageItmsCnt++
// @fixme: remove this part after #2702. // @fixme: remove this part after #2702.
if bytes.HasPrefix(k[1:], mgmtContractPrefix) { if bytes.HasPrefix(k[1:], mgmtContractPrefix) {
@ -774,29 +791,34 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
copy(hash[:], k[mgmtCSPrefixLen:]) copy(hash[:], k[mgmtCSPrefixLen:])
err = stackitem.DeserializeConvertible(v, cs) err = stackitem.DeserializeConvertible(v, cs)
if err != nil { if err != nil {
seekErr = fmt.Errorf("failed to deserialize contract state: %w", err) seekErr = fmt.Errorf("failed to deserialize contract %s state: %w", hash.StringLE(), seekErr)
} }
cache.PutContractID(cs.ID, hash) cache.PutContractID(cs.ID, hash)
cnt++
contractIDsCnt++
} }
cnt++
return seekErr == nil return seekErr == nil
}) })
if seekErr != nil { if seekErr != nil {
return fmt.Errorf("failed to reset contract IDs: %w", err) return fmt.Errorf("failed to reset contract contract storage items and IDs: %w", seekErr)
} }
trieStore.Close() trieStore.Close()
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)}) cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)})
_, err = cache.Persist() keys, err = cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err) return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err)
} }
bc.log.Info("last batch of contract storage items and IDs is persisted", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("contracts storage and IDs are reset", zap.Duration("duration", time.Since(p))) bc.log.Info("contract storage items and IDs are reset", zap.Duration("took", time.Since(pStorageStart)),
zap.Int("keys", storageItmsCnt),
zap.Int("ids", contractIDsCnt))
p = time.Now() p = time.Now()
fallthrough fallthrough
case newStorageItemsAdded: case newStorageItemsAdded:
// Reset SYS-prefixed and IX-prefixed information. // Reset SYS-prefixed and IX-prefixed information.
bc.log.Info("trying to reset headers information")
cache.DeleteHeaderHashes(height+1, headerBatchCount) cache.DeleteHeaderHashes(height+1, headerBatchCount)
cache.StoreAsCurrentBlock(b) cache.StoreAsCurrentBlock(b)
cache.PutCurrentHeader(b.Hash(), height) cache.PutCurrentHeader(b.Hash(), height)
@ -805,16 +827,17 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
bc.persistent.Version = v bc.persistent.Version = v
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)}) cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)})
_, err = cache.Persist() keys, err = cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed to persist headers changes to the DB: %w", err) return fmt.Errorf("failed to persist headers changes to the DB: %w", err)
} }
bc.log.Info("headers are reset", zap.Duration("duration", time.Since(p))) bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now() p = time.Now()
fallthrough fallthrough
case headersReset: case headersReset:
// Reset MPT. // Reset MPT.
bc.log.Info("trying to reset state root information and NEP transfers")
err = bc.stateRoot.ResetState(height, cache.Store) err = bc.stateRoot.ResetState(height, cache.Store)
if err != nil { if err != nil {
return fmt.Errorf("failed to rollback MPT state: %w", err) return fmt.Errorf("failed to rollback MPT state: %w", err)
@ -827,12 +850,13 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
} }
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)}) cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)})
_, err = cache.Persist() keys, err = cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err) return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err)
} }
bc.log.Info("MPT and transfers are reset", zap.Duration("duration", time.Since(p))) bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now()
fallthrough fallthrough
case transfersReset: case transfersReset:
// there's nothing to do after that, so just continue with common operations // there's nothing to do after that, so just continue with common operations
@ -842,29 +866,36 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
} }
// Direct (cache-less) DB operation: remove stale storage items. // Direct (cache-less) DB operation: remove stale storage items.
bc.log.Info("trying to remove stale storage items")
keys = 0
err = bc.store.SeekGC(storage.SeekRange{ err = bc.store.SeekGC(storage.SeekRange{
Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))}, Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))},
}, func(_, _ []byte) bool { }, func(_, _ []byte) bool {
keys++
return false return false
}) })
if err != nil { if err != nil {
return fmt.Errorf("faield to remove stale storage items from DB: %w", err) return fmt.Errorf("faield to remove stale storage items from DB: %w", err)
} }
bc.log.Info("stale storage items are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now()
bc.log.Info("trying to remove state reset point")
cache.Store.Delete(resetStageKey) cache.Store.Delete(resetStageKey)
// Unlike the state jump, state sync point must be removed as we have complete state for this height. // Unlike the state jump, state sync point must be removed as we have complete state for this height.
cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)})
_, err = cache.Persist() keys, err = cache.Persist()
if err != nil { if err != nil {
return fmt.Errorf("failed to persist state reset stage to DAO: %w", err) return fmt.Errorf("failed to persist state reset stage to DAO: %w", err)
} }
bc.log.Info("stale reset point is removed", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
err = bc.resetRAMState(height, true) err = bc.resetRAMState(height, true)
if err != nil { if err != nil {
return fmt.Errorf("failed to update in-memory blockchain data: %w", err) return fmt.Errorf("failed to update in-memory blockchain data: %w", err)
} }
bc.log.Info("reset finished successfully", zap.Duration("duration", time.Since(start))) bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start)))
return nil return nil
} }