core: move batch persist to a separate routine

Resetting mainnet from 2512046 blocks (full logs are attached
to https://github.com/nspcc-dev/neo-go/pull/2813#issuecomment-1324115555).

--------
LevelDB |
------------------------
  to  |  old   |   new  |
------|--------|--------|
  1   | 5m11s  | 4m50s  |
------|--------|--------|
  1M  | 10m40s | 9m40s  |
------|--------|--------|
 2.5M | 17m38s | 17m36s |
------------------------

--------
BoltDB  |
------------------------
  to  |  old   |   new  |
------|--------|--------|
  1   |  8m3s  | 5m51s  |
------|--------|--------|
  1M  | 20m30s | 13m2s  |
------|--------|--------|
 2.5M | 31m26s | 18m47s |
------------------------

Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
This commit is contained in:
Anna Shaleva 2022-11-21 12:04:52 +03:00
parent e3747b1d57
commit cb0f786b28

View file

@ -685,16 +685,43 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
bc.log.Info("initializing state reset", zap.Uint32("target height", height))
start := time.Now()
p := start
keys := 0
// Start batch persisting routine, it will be used for blocks/txs/AERs/storage items batches persist.
type postPersist func(persistedKeys int, err error) error
var (
persistCh = make(chan postPersist)
persistToExitCh = make(chan struct{})
)
go func() {
for {
f, ok := <-persistCh
if !ok {
break
}
persistErr := f(cache.Persist())
if persistErr != nil {
bc.log.Fatal("persist failed", zap.Error(persistErr))
panic(persistErr)
}
}
close(persistToExitCh)
}()
defer func() {
close(persistCh)
<-persistToExitCh
bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start)))
}()
resetStageKey := []byte{byte(storage.SYSStateChangeStage)}
switch stage {
case none:
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)})
_, err = cache.Persist()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err)
}
return nil
}
fallthrough
case stateJumpStarted:
bc.log.Debug("trying to reset blocks, transactions and AERs")
@ -704,7 +731,8 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
const persistBatchSize = 100 * headerBatchCount // count blocks only, should be enough to avoid OOM killer even for large blocks
var (
pBlocksStart = p
blocksCnt, batchCnt, keysCnt int
blocksCnt, batchCnt int
keysCnt = new(int)
)
for i := height + 1; i <= currHeight; i++ {
err := cache.DeleteBlock(bc.GetHeaderHash(i))
@ -713,34 +741,56 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
}
blocksCnt++
if blocksCnt == persistBatchSize {
keys, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to persist intermediate batch removed blocks, transactions and AERs: %w", err)
}
blocksCnt = 0
batchCnt++
keysCnt += keys
bc.log.Info("intermediate batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("intermediate batch of removed blocks, transactions and AERs is collected",
zap.Int("batch", batchCnt),
zap.Duration("took", time.Since(p)))
persistStart := time.Now()
persistBatch := batchCnt
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist intermediate batch of removed blocks, transactions and AERs: %w", err)
}
*keysCnt += persistedKeys
bc.log.Debug("intermediate batch of removed blocks, transactions and AERs is persisted",
zap.Int("batch", persistBatch),
zap.Duration("took", time.Since(persistStart)),
zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
}
}
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)})
keys, err = cache.Persist()
batchCnt++
bc.log.Info("last batch of removed blocks, transactions and AERs is collected",
zap.Int("batch", batchCnt),
zap.Duration("took", time.Since(p)))
bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(pBlocksStart)))
persistStart := time.Now()
persistBatch := batchCnt
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err)
}
batchCnt++
keysCnt += keys
bc.log.Info("last batch of removed blocks, transactions and AERs is persisted", zap.Int("batches persisted", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("blocks, transactions ans AERs are reset", zap.Duration("took", time.Since(pBlocksStart)),
zap.Int("overall persisted keys", keysCnt))
*keysCnt += persistedKeys
bc.log.Debug("last batch of removed blocks, transactions and AERs is persisted",
zap.Int("batch", persistBatch),
zap.Duration("took", time.Since(persistStart)),
zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
fallthrough
case staleBlocksRemoved:
// Completely remove contract IDs to update them later.
bc.log.Debug("trying to reset contract storage items")
pStorageStart := p
p = time.Now()
var mode = mpt.ModeAll
if bc.config.Ledger.RemoveUntraceableBlocks {
mode |= mpt.ModeGCFlag
@ -750,25 +800,27 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
newStoragePrefix := statesync.TemporaryPrefix(oldStoragePrefix)
const persistBatchSize = 200000
var (
seekErr error
cnt int
storageItmsCnt int
batchCnt int
)
var cnt, storageItmsCnt, batchCnt int
trieStore.Seek(storage.SeekRange{Prefix: []byte{byte(oldStoragePrefix)}}, func(k, v []byte) bool {
if seekErr != nil {
return false
}
if cnt >= persistBatchSize {
cnt = 0
keys, seekErr = cache.Persist()
if seekErr != nil {
seekErr = fmt.Errorf("failed to persist intermediate batch of contract storage items and IDs: %w", seekErr)
return false
}
batchCnt++
bc.log.Info("intermediate batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("intermediate batch of contract storage items and IDs is collected",
zap.Int("batch", batchCnt),
zap.Duration("took", time.Since(p)))
persistStart := time.Now()
persistBatch := batchCnt
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist intermediate batch of contract storage items: %w", err)
}
bc.log.Debug("intermediate batch of contract storage items is persisted",
zap.Int("batch", persistBatch),
zap.Duration("took", time.Since(persistStart)),
zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
}
// May safely omit KV copying.
@ -779,20 +831,23 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
return true
})
if seekErr != nil {
return fmt.Errorf("failed to reset contract contract storage items and IDs: %w", seekErr)
}
trieStore.Close()
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)})
keys, err = cache.Persist()
batchCnt++
persistBatch := batchCnt
bc.log.Info("last batch of contract storage items is collected", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)))
bc.log.Info("contract storage items are reset", zap.Duration("took", time.Since(pStorageStart)),
zap.Int("keys", storageItmsCnt))
lastStart := time.Now()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err)
}
batchCnt++
bc.log.Info("last batch of contract storage items and IDs is persisted", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("contract storage items and IDs are reset", zap.Duration("took", time.Since(pStorageStart)),
zap.Int("keys", storageItmsCnt))
bc.log.Debug("last batch of contract storage items and IDs is persisted", zap.Int("batch", persistBatch), zap.Duration("took", time.Since(lastStart)), zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
fallthrough
case newStorageItemsAdded:
@ -809,12 +864,16 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
bc.persistent.Version = v
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)})
keys, err = cache.Persist()
bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p)))
persistStart := time.Now()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist headers changes to the DB: %w", err)
}
bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Debug("headers information is persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
fallthrough
case headersReset:
@ -832,12 +891,17 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
}
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)})
keys, err = cache.Persist()
bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p)))
persistStart := time.Now()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed tpo persist contract storage items changes to the DB: %w", err)
return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err)
}
bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Debug("state root information and NEP transfers are persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
fallthrough
case transfersReset:
@ -849,7 +913,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
// Direct (cache-less) DB operation: remove stale storage items.
bc.log.Debug("trying to remove stale storage items")
keys = 0
keys := 0
err = bc.store.SeekGC(storage.SeekRange{
Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))},
}, func(_, _ []byte) bool {
@ -866,18 +930,22 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
cache.Store.Delete(resetStageKey)
// Unlike the state jump, state sync point must be removed as we have complete state for this height.
cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)})
keys, err = cache.Persist()
bc.log.Info("state reset point is removed", zap.Duration("took", time.Since(p)))
persistStart := time.Now()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist state reset stage to DAO: %w", err)
}
bc.log.Info("stale reset point is removed", zap.Duration("took", time.Since(p)), zap.Int("keys", keys))
bc.log.Info("state reset point information is persisted", zap.Duration("took", time.Since(persistStart)), zap.Int("keys", persistedKeys))
return nil
}
p = time.Now()
err = bc.resetRAMState(height, true)
if err != nil {
return fmt.Errorf("failed to update in-memory blockchain data: %w", err)
}
bc.log.Info("reset finished successfully", zap.Duration("took", time.Since(start)))
return nil
}