diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index e35b6846b..707775189 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -685,15 +685,42 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) bc.log.Info("initializing state reset", zap.Uint32("target height", height)) start := time.Now() p := start - keys := 0 + + // Start batch persisting routine, it will be used for blocks/txs/AERs/storage items batches persist. + type postPersist func(persistedKeys int, err error) error + var ( + persistCh = make(chan postPersist) + persistToExitCh = make(chan struct{}) + ) + go func() { + for { + f, ok := <-persistCh + if !ok { + break + } + persistErr := f(cache.Persist()) + if persistErr != nil { + bc.log.Fatal("persist failed", zap.Error(persistErr)) + panic(persistErr) + } + } + close(persistToExitCh) + }() + defer func() { + close(persistCh) + <-persistToExitCh + bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start))) + }() resetStageKey := []byte{byte(storage.SYSStateChangeStage)} switch stage { case none: cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)}) - _, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err) + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err) + } + return nil } fallthrough case stateJumpStarted: @@ -703,8 +730,9 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) // one stage, so persist periodically. const persistBatchSize = 100 * headerBatchCount // count blocks only, should be enough to avoid OOM killer even for large blocks var ( - pBlocksStart = p - blocksCnt, batchCnt, keysCnt int + pBlocksStart = p + blocksCnt, batchCnt int + keysCnt = new(int) ) for i := height + 1; i <= currHeight; i++ { err := cache.DeleteBlock(bc.GetHeaderHash(i)) @@ -713,34 +741,56 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } blocksCnt++ if blocksCnt == persistBatchSize { - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist intermediate batch removed blocks, transactions and AERs: %w", err) - } blocksCnt = 0 batchCnt++ - keysCnt += keys - bc.log.Info("intermediate batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + bc.log.Info("intermediate batch of removed blocks, transactions and AERs is collected", + zap.Int("batch", batchCnt), + zap.Duration("took", time.Since(p))) + + persistStart := time.Now() + persistBatch := batchCnt + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist intermediate batch of removed blocks, transactions and AERs: %w", err) + } + *keysCnt += persistedKeys + bc.log.Debug("intermediate batch of removed blocks, transactions and AERs is persisted", + zap.Int("batch", persistBatch), + zap.Duration("took", time.Since(persistStart)), + zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() } } cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err) - } batchCnt++ - keysCnt += keys - bc.log.Info("last batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + bc.log.Info("last batch of removed blocks, transactions and AERs is collected", + zap.Int("batch", batchCnt), + zap.Duration("took", time.Since(p))) + bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(pBlocksStart))) - bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(pBlocksStart)), - zap.Int("overall persisted keys", keysCnt)) + persistStart := time.Now() + persistBatch := batchCnt + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err) + } + *keysCnt += persistedKeys + bc.log.Debug("last batch of removed blocks, transactions and AERs is persisted", + zap.Int("batch", persistBatch), + zap.Duration("took", time.Since(persistStart)), + zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() fallthrough case staleBlocksRemoved: + // Completely remove contract IDs to update them later. bc.log.Debug("trying to reset contract storage items") pStorageStart := p + p = time.Now() var mode = mpt.ModeAll if bc.config.Ledger.RemoveUntraceableBlocks { mode |= mpt.ModeGCFlag @@ -750,25 +800,27 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) newStoragePrefix := statesync.TemporaryPrefix(oldStoragePrefix) const persistBatchSize = 200000 - var ( - seekErr error - cnt int - storageItmsCnt int - batchCnt int - ) + var cnt, storageItmsCnt, batchCnt int trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool { - if seekErr != nil { - return false - } if cnt >= persistBatchSize { cnt = 0 - keys, seekErr = cache.Persist() - if seekErr != nil { - seekErr = fmt.Errorf("failed to persist intermediate batch of contract storage items and IDs: %w", seekErr) - return false - } batchCnt++ - bc.log.Info("intermediate batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + bc.log.Info("intermediate batch of contract storage items and IDs is collected", + zap.Int("batch", batchCnt), + zap.Duration("took", time.Since(p))) + + persistStart := time.Now() + persistBatch := batchCnt + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist intermediate batch of contract storage items: %w", err) + } + bc.log.Debug("intermediate batch of contract storage items is persisted", + zap.Int("batch", persistBatch), + zap.Duration("took", time.Since(persistStart)), + zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() } // May safely omit KV copying. @@ -779,20 +831,23 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) return true }) - if seekErr != nil { - return fmt.Errorf("failed to reset contract contract storage items and IDs: %w", seekErr) - } trieStore.Close() cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)}) - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err) - } batchCnt++ - bc.log.Info("last batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) - bc.log.Info("contract storage items and IDs are reset", zap.Duration("took", time.Since(pStorageStart)), + persistBatch := batchCnt + bc.log.Info("last batch of contract storage items is collected", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p))) + bc.log.Info("contract storage items are reset", zap.Duration("took", time.Since(pStorageStart)), zap.Int("keys", storageItmsCnt)) + + lastStart := time.Now() + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err) + } + bc.log.Debug("last batch of contract storage items and IDs is persisted", zap.Int("batch", persistBatch), zap.Duration("took", time.Since(lastStart)), zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() fallthrough case newStorageItemsAdded: @@ -809,12 +864,16 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) bc.persistent.Version = v cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)}) - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist headers changes to the DB: %w", err) - } + bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p))) - bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + persistStart := time.Now() + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist headers changes to the DB: %w", err) + } + bc.log.Debug("headers information is persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() fallthrough case headersReset: @@ -832,12 +891,17 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)}) - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err) - } + bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p))) - bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + persistStart := time.Now() + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err) + } + + bc.log.Debug("state root information and NEP transfers are persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys)) + return nil + } p = time.Now() fallthrough case transfersReset: @@ -849,7 +913,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) // Direct (cache-less) DB operation: remove stale storage items. bc.log.Debug("trying to remove stale storage items") - keys = 0 + keys := 0 err = bc.store.SeekGC(storage.SeekRange{ Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))}, }, func(_, _ []byte) bool { @@ -866,18 +930,22 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) cache.Store.Delete(resetStageKey) // Unlike the state jump, state sync point must be removed as we have complete state for this height. cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) - keys, err = cache.Persist() - if err != nil { - return fmt.Errorf("failed to persist state reset stage to DAO: %w", err) + bc.log.Info("state reset point is removed", zap.Duration("took", time.Since(p))) + + persistStart := time.Now() + persistCh <- func(persistedKeys int, err error) error { + if err != nil { + return fmt.Errorf("failed to persist state reset stage to DAO: %w", err) + } + bc.log.Info("state reset point information is persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys)) + return nil } - bc.log.Info("stale reset point is removed", zap.Duration("took", time.Since(p)), zap.Int("keys", keys)) + p = time.Now() err = bc.resetRAMState(height, true) if err != nil { return fmt.Errorf("failed to update in-memory blockchain data: %w", err) } - - bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start))) return nil }