diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 707775189..f9996d3b4 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -680,7 +680,16 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) return fmt.Errorf("failed to retrieve stateroot for height %d: %w", height, err) } v := bc.dao.Version - cache := bc.dao // dao is MemCachedStore over DB, so use dao directly to persist cached changes right to the underlying DB + // dao is MemCachedStore over DB, we use dao directly to persist cached changes + // right to the underlying DB. + cache := bc.dao + // upperCache is a private MemCachedStore over cache. During each of the state + // sync stages we put the data inside the upperCache; in the end of each stage + // we persist changes from upperCache to cache. Changes from cache are persisted + // directly to the underlying persistent storage (boltDB, levelDB, etc.). + // upperCache/cache segregation is needed to keep the DB state clean and to + // persist data from different stages separately. + upperCache := cache.GetPrivate() bc.log.Info("initializing state reset", zap.Uint32("target height", height)) start := time.Now() @@ -715,7 +724,14 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) resetStageKey := []byte{byte(storage.SYSStateChangeStage)} switch stage { case none: - cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)}) + upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)}) + // Technically, there's no difference between Persist() and PersistSync() for the private + // MemCached storage, but we'd better use the sync version in case of some further code changes. + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err) @@ -735,7 +751,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) keysCnt = new(int) ) for i := height + 1; i <= currHeight; i++ { - err := cache.DeleteBlock(bc.GetHeaderHash(i)) + err := upperCache.DeleteBlock(bc.GetHeaderHash(i)) if err != nil { return fmt.Errorf("error while removing block %d: %w", i, err) } @@ -749,6 +765,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) persistStart := time.Now() persistBatch := batchCnt + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist intermediate batch of removed blocks, transactions and AERs: %w", err) @@ -763,7 +784,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) p = time.Now() } } - cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) + upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)}) batchCnt++ bc.log.Info("last batch of removed blocks, transactions and AERs is collected", zap.Int("batch", batchCnt), @@ -772,6 +793,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) persistStart := time.Now() persistBatch := batchCnt + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err) @@ -795,7 +821,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) if bc.config.Ledger.RemoveUntraceableBlocks { mode |= mpt.ModeGCFlag } - trieStore := mpt.NewTrieStore(sr.Root, mode, cache.Store) + trieStore := mpt.NewTrieStore(sr.Root, mode, upperCache.Store) oldStoragePrefix := v.StoragePrefix newStoragePrefix := statesync.TemporaryPrefix(oldStoragePrefix) @@ -811,6 +837,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) persistStart := time.Now() persistBatch := batchCnt + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist intermediate batch of contract storage items: %w", err) @@ -825,7 +856,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) } // May safely omit KV copying. k[0] = byte(newStoragePrefix) - cache.Store.Put(k, v) + upperCache.Store.Put(k, v) cnt++ storageItmsCnt++ @@ -833,7 +864,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) }) trieStore.Close() - cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)}) + upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)}) batchCnt++ persistBatch := batchCnt bc.log.Info("last batch of contract storage items is collected", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p))) @@ -841,6 +872,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) zap.Int("keys", storageItmsCnt)) lastStart := time.Now() + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err) @@ -854,19 +890,29 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) // Reset SYS-prefixed and IX-prefixed information. bc.log.Debug("trying to reset headers information") for i := height + 1; i <= hHeight; i++ { - cache.PurgeHeader(bc.GetHeaderHash(i)) + upperCache.PurgeHeader(bc.GetHeaderHash(i)) } - cache.DeleteHeaderHashes(height+1, headerBatchCount) - cache.StoreAsCurrentBlock(b) - cache.PutCurrentHeader(b.Hash(), height) + upperCache.DeleteHeaderHashes(height+1, headerBatchCount) + upperCache.StoreAsCurrentBlock(b) + upperCache.PutCurrentHeader(b.Hash(), height) v.StoragePrefix = statesync.TemporaryPrefix(v.StoragePrefix) - cache.PutVersion(v) + upperCache.PutVersion(v) + // It's important to manually change the cache's Version at this stage, so that native cache + // can be properly initialized (with the correct contract storage data prefix) at the final + // stage of the state reset. At the same time, DB's SYSVersion-prefixed data will be persisted + // from upperCache to cache in a standard way (several lines below). + cache.Version = v bc.persistent.Version = v - cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)}) + upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)}) bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p))) persistStart := time.Now() + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist headers changes to the DB: %w", err) @@ -879,21 +925,26 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) case headersReset: // Reset MPT. bc.log.Debug("trying to reset state root information and NEP transfers") - err = bc.stateRoot.ResetState(height, cache.Store) + err = bc.stateRoot.ResetState(height, upperCache.Store) if err != nil { return fmt.Errorf("failed to rollback MPT state: %w", err) } // Reset transfers. - err = bc.resetTransfers(cache, height) + err = bc.resetTransfers(upperCache, height) if err != nil { return fmt.Errorf("failed to strip transfer log / transfer info: %w", err) } - cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)}) + upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)}) bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p))) persistStart := time.Now() + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } + upperCache = cache.GetPrivate() persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err) @@ -927,12 +978,16 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) p = time.Now() bc.log.Debug("trying to remove state reset point") - cache.Store.Delete(resetStageKey) + upperCache.Store.Delete(resetStageKey) // Unlike the state jump, state sync point must be removed as we have complete state for this height. - cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) + upperCache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)}) bc.log.Info("state reset point is removed", zap.Duration("took", time.Since(p))) persistStart := time.Now() + _, uerr := upperCache.PersistSync() + if uerr != nil { + panic(uerr) + } persistCh <- func(persistedKeys int, err error) error { if err != nil { return fmt.Errorf("failed to persist state reset stage to DAO: %w", err)