diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 0f277b504..aa8957ad9 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -724,20 +724,42 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) case stateJumpStarted: bc.log.Info("trying to reset blocks, transactions and AERs") // Remove blocks/transactions/aers from currHeight down to height (not including height itself). - // Keep headers for now, they'll be removed later. + // Keep headers for now, they'll be removed later. It's hard to handle the whole set of changes in + // one stage, so persist periodically. + const persistBatchSize = 100 * headerBatchCount // count blocks only, should be enough to avoid OOM killer even for large blocks + var ( + pBlocksStart = p + blocksCnt, batchCnt, keysCnt int + ) for i := height + 1; i <= currHeight; i++ { err := cache.DeleteBlock(bc.GetHeaderHash(int(i))) if err != nil { return fmt.Errorf("error while removing block %d: %w", i, err) } + blocksCnt++ + if blocksCnt == persistBatchSize { + keys, err = cache.Persist() + if err != nil { + return fmt.Errorf("failed to persist intermediate batch removed blocks, transactions and AERs: %w", err) + } + blocksCnt = 0 + batchCnt++ + keysCnt += keys + bc.log.Info("intermediate batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() + } } cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) keys, err = cache.Persist() if err != nil { - return fmt.Errorf("failed to persist blocks, transactions ans AERs changes to the DB: %w", err) + return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err) } + batchCnt++ + keysCnt += keys + bc.log.Info("last batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) - bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(pBlocksStart)), + zap.Int("overall persisted keys", keysCnt)) p = time.Now() fallthrough case staleBlocksRemoved: