Merge pull request #2819 from nspcc-dev/optimize-state-reset

core: some state reset oprimizations
This commit is contained in:
Roman Khimov 2023-04-11 19:11:10 +03:00 committed by GitHub
commit 4f827de5cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -680,164 +680,279 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
return fmt.Errorf("failed to retrieve stateroot for height %d: %w", height, err)
}
v := bc.dao.Version
cache := bc.dao // dao is MemCachedStore over DB, so use dao directly to persist cached changes right to the underlying DB
// dao is MemCachedStore over DB, we use dao directly to persist cached changes
// right to the underlying DB.
cache := bc.dao
// upperCache is a private MemCachedStore over cache. During each of the state
// sync stages we put the data inside the upperCache; in the end of each stage
// we persist changes from upperCache to cache. Changes from cache are persisted
// directly to the underlying persistent storage (boltDB, levelDB, etc.).
// upperCache/cache segregation is needed to keep the DB state clean and to
// persist data from different stages separately.
upperCache := cache.GetPrivate()
bc.log.Info("initialize state reset", zap.Uint32("target height", height))
bc.log.Info("initializing state reset", zap.Uint32("target height", height))
start := time.Now()
p := start
keys := 0
// Start batch persisting routine, it will be used for blocks/txs/AERs/storage items batches persist.
type postPersist func(persistedKeys int, err error) error
var (
persistCh = make(chan postPersist)
persistToExitCh = make(chan struct{})
)
go func() {
for {
f, ok := <-persistCh
if !ok {
break
}
persistErr := f(cache.Persist())
if persistErr != nil {
bc.log.Fatal("persist failed", zap.Error(persistErr))
panic(persistErr)
}
}
close(persistToExitCh)
}()
defer func() {
close(persistCh)
<-persistToExitCh
bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start)))
}()
resetStageKey := []byte{byte(storage.SYSStateChangeStage)}
switch stage {
case none:
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)})
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err)
upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)})
// Technically, there's no difference between Persist() and PersistSync() for the private
// MemCached storage, but we'd better use the sync version in case of some further code changes.
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err)
}
return nil
}
fallthrough
case stateJumpStarted:
bc.log.Info("trying to reset blocks, transactions and AERs")
bc.log.Debug("trying to reset blocks, transactions and AERs")
// Remove blocks/transactions/aers from currHeight down to height (not including height itself).
// Keep headers for now, they'll be removed later. It's hard to handle the whole set of changes in
// one stage, so persist periodically.
const persistBatchSize = 100 * headerBatchCount // count blocks only, should be enough to avoid OOM killer even for large blocks
var (
pBlocksStart = p
blocksCnt, batchCnt, keysCnt int
pBlocksStart = p
blocksCnt, batchCnt int
keysCnt = new(int)
)
for i := height + 1; i <= currHeight; i++ {
err := cache.DeleteBlock(bc.GetHeaderHash(i))
err := upperCache.DeleteBlock(bc.GetHeaderHash(i))
if err != nil {
return fmt.Errorf("error while removing block %d: %w", i, err)
}
blocksCnt++
if blocksCnt == persistBatchSize {
keys, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist intermediate batch removed blocks, transactions and AERs: %w", err)
}
blocksCnt = 0
batchCnt++
keysCnt += keys
bc.log.Info("intermediate batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("intermediate batch of removed blocks, transactions and AERs is collected",
zap.Int("batch", batchCnt),
zap.Duration("took", time.Since(p)))
persistStart := time.Now()
persistBatch := batchCnt
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist intermediate batch of removed blocks, transactions and AERs: %w", err)
}
*keysCnt += persistedKeys
bc.log.Debug("intermediate batch of removed blocks, transactions and AERs is persisted",
zap.Int("batch", persistBatch),
zap.Duration("took", time.Since(persistStart)),
zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
}
}
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)})
keys, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err)
}
upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)})
batchCnt++
keysCnt += keys
bc.log.Info("last batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("last batch of removed blocks, transactions and AERs is collected",
zap.Int("batch", batchCnt),
zap.Duration("took", time.Since(p)))
bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(pBlocksStart)))
bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(pBlocksStart)),
zap.Int("overall persisted keys", keysCnt))
persistStart := time.Now()
persistBatch := batchCnt
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err)
}
*keysCnt += persistedKeys
bc.log.Debug("last batch of removed blocks, transactions and AERs is persisted",
zap.Int("batch", persistBatch),
zap.Duration("took", time.Since(persistStart)),
zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
fallthrough
case staleBlocksRemoved:
bc.log.Info("trying to reset contract storage items")
// Completely remove contract IDs to update them later.
bc.log.Debug("trying to reset contract storage items")
pStorageStart := p
p = time.Now()
var mode = mpt.ModeAll
if bc.config.Ledger.RemoveUntraceableBlocks {
mode |= mpt.ModeGCFlag
}
trieStore := mpt.NewTrieStore(sr.Root, mode, cache.Store)
trieStore := mpt.NewTrieStore(sr.Root, mode, upperCache.Store)
oldStoragePrefix := v.StoragePrefix
newStoragePrefix := statesync.TemporaryPrefix(oldStoragePrefix)
const persistBatchSize = 200000
var (
seekErr error
cnt int
storageItmsCnt int
batchCnt int
)
var cnt, storageItmsCnt, batchCnt int
trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool {
if seekErr != nil {
return false
}
if cnt >= persistBatchSize {
cnt = 0
keys, seekErr = cache.Persist()
if seekErr != nil {
seekErr = fmt.Errorf("failed to persist intermediate batch of contract storage items and IDs: %w", seekErr)
return false
}
batchCnt++
bc.log.Info("intermediate batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("intermediate batch of contract storage items and IDs is collected",
zap.Int("batch", batchCnt),
zap.Duration("took", time.Since(p)))
persistStart := time.Now()
persistBatch := batchCnt
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist intermediate batch of contract storage items: %w", err)
}
bc.log.Debug("intermediate batch of contract storage items is persisted",
zap.Int("batch", persistBatch),
zap.Duration("took", time.Since(persistStart)),
zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
}
// May safely omit KV copying.
k[0] = byte(newStoragePrefix)
cache.Store.Put(k, v)
upperCache.Store.Put(k, v)
cnt++
storageItmsCnt++
return true
})
if seekErr != nil {
return fmt.Errorf("failed to reset contract contract storage items and IDs: %w", seekErr)
}
trieStore.Close()
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)})
keys, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err)
}
upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)})
batchCnt++
bc.log.Info("last batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("contract storage items and IDs are reset", zap.Duration("took", time.Since(pStorageStart)),
persistBatch := batchCnt
bc.log.Info("last batch of contract storage items is collected", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)))
bc.log.Info("contract storage items are reset", zap.Duration("took", time.Since(pStorageStart)),
zap.Int("keys", storageItmsCnt))
lastStart := time.Now()
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err)
}
bc.log.Debug("last batch of contract storage items and IDs is persisted", zap.Int("batch", persistBatch), zap.Duration("took", time.Since(lastStart)), zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
fallthrough
case newStorageItemsAdded:
// Reset SYS-prefixed and IX-prefixed information.
bc.log.Info("trying to reset headers information")
bc.log.Debug("trying to reset headers information")
for i := height + 1; i <= hHeight; i++ {
cache.PurgeHeader(bc.GetHeaderHash(i))
upperCache.PurgeHeader(bc.GetHeaderHash(i))
}
cache.DeleteHeaderHashes(height+1, headerBatchCount)
cache.StoreAsCurrentBlock(b)
cache.PutCurrentHeader(b.Hash(), height)
upperCache.DeleteHeaderHashes(height+1, headerBatchCount)
upperCache.StoreAsCurrentBlock(b)
upperCache.PutCurrentHeader(b.Hash(), height)
v.StoragePrefix = statesync.TemporaryPrefix(v.StoragePrefix)
cache.PutVersion(v)
upperCache.PutVersion(v)
// It's important to manually change the cache's Version at this stage, so that native cache
// can be properly initialized (with the correct contract storage data prefix) at the final
// stage of the state reset. At the same time, DB's SYSVersion-prefixed data will be persisted
// from upperCache to cache in a standard way (several lines below).
cache.Version = v
bc.persistent.Version = v
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)})
keys, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist headers changes to the DB: %w", err)
}
upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)})
bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p)))
bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
persistStart := time.Now()
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist headers changes to the DB: %w", err)
}
bc.log.Debug("headers information is persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
fallthrough
case headersReset:
// Reset MPT.
bc.log.Info("trying to reset state root information and NEP transfers")
err = bc.stateRoot.ResetState(height, cache.Store)
bc.log.Debug("trying to reset state root information and NEP transfers")
err = bc.stateRoot.ResetState(height, upperCache.Store)
if err != nil {
return fmt.Errorf("failed to rollback MPT state: %w", err)
}
// Reset transfers.
err = bc.resetTransfers(cache, height)
err = bc.resetTransfers(upperCache, height)
if err != nil {
return fmt.Errorf("failed to strip transfer log / transfer info: %w", err)
}
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)})
keys, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err)
}
upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)})
bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p)))
bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
persistStart := time.Now()
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err)
}
bc.log.Debug("state root information and NEP transfers are persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
fallthrough
case transfersReset:
@ -848,8 +963,8 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
}
// Direct (cache-less) DB operation: remove stale storage items.
bc.log.Info("trying to remove stale storage items")
keys = 0
bc.log.Debug("trying to remove stale storage items")
keys := 0
err = bc.store.SeekGC(storage.SeekRange{
Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))},
}, func(_, _ []byte) bool {
@ -862,22 +977,30 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
bc.log.Info("stale storage items are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
p = time.Now()
bc.log.Info("trying to remove state reset point")
cache.Store.Delete(resetStageKey)
bc.log.Debug("trying to remove state reset point")
upperCache.Store.Delete(resetStageKey)
// Unlike the state jump, state sync point must be removed as we have complete state for this height.
cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)})
keys, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist state reset stage to DAO: %w", err)
upperCache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)})
bc.log.Info("state reset point is removed", zap.Duration("took", time.Since(p)))
persistStart := time.Now()
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
bc.log.Info("stale reset point is removed", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist state reset stage to DAO: %w", err)
}
bc.log.Info("state reset point information is persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
err = bc.resetRAMState(height, true)
if err != nil {
return fmt.Errorf("failed to update in-memory blockchain data: %w", err)
}
bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start)))
return nil
}