core: avoid squashing of data from different state reset stages

`cache` persisting operation is concurrent with the storage modifications
made by further state reset stages. We can't allow situation when data
from the next stage are leaking into the previous stage. State reset stages
must be atomic in turms of DB persisting, thus, use another `upperCache`
MemCached store to keep them apart.

Here are the results of mainnet's BoltDB reset from 1.1M to 6K:

This patch:
```
anna@kiwi:~/Documents/GitProjects/nspcc-dev/neo-go$ ./bin/neo-go db reset -m --debug --height 600000
2023-04-11T16:15:25.783+0300	INFO	MaxBlockSize is not set or wrong, setting default value	{"MaxBlockSize": 262144}
2023-04-11T16:15:25.783+0300	INFO	MaxBlockSystemFee is not set or wrong, setting default value	{"MaxBlockSystemFee": 900000000000}
2023-04-11T16:15:25.783+0300	INFO	MaxTransactionsPerBlock is not set or wrong, using default value	{"MaxTransactionsPerBlock": 512}
2023-04-11T16:15:25.783+0300	INFO	MaxValidUntilBlockIncrement is not set or wrong, using default value	{"MaxValidUntilBlockIncrement": 5760}
2023-04-11T16:15:25.787+0300	INFO	restoring blockchain	{"version": "0.2.8"}
2023-04-11T16:15:25.906+0300	INFO	initializing state reset	{"target height": 600000}
2023-04-11T16:15:25.906+0300	DEBUG	trying to reset blocks, transactions and AERs
2023-04-11T16:15:57.031+0300	INFO	intermediate batch of removed blocks, transactions and AERs is collected	{"batch": 1, "took": "31.125057214s"}
2023-04-11T16:16:12.644+0300	DEBUG	intermediate batch of removed blocks, transactions and AERs is persisted	{"batch": 1, "took": "15.613156971s", "keys": 321895}
2023-04-11T16:16:13.895+0300	INFO	intermediate batch of removed blocks, transactions and AERs is collected	{"batch": 2, "took": "16.663444208s"}
2023-04-11T16:16:19.784+0300	INFO	last batch of removed blocks, transactions and AERs is collected	{"batch": 3, "took": "5.760308543s"}
2023-04-11T16:16:19.784+0300	INFO	blocks, transactions ans AERs are reset	{"took": "53.878632911s"}
2023-04-11T16:16:22.870+0300	DEBUG	intermediate batch of removed blocks, transactions and AERs is persisted	{"batch": 2, "took": "8.974838893s", "keys": 334823}
2023-04-11T16:16:22.870+0300	DEBUG	trying to reset contract storage items
2023-04-11T16:16:27.272+0300	DEBUG	last batch of removed blocks, transactions and AERs is persisted	{"batch": 3, "took": "7.487357441s", "keys": 208913}
2023-04-11T16:17:23.678+0300	INFO	intermediate batch of contract storage items and IDs is collected	{"batch": 1, "took": "1m0.80711106s"}
2023-04-11T16:18:00.769+0300	INFO	intermediate batch of contract storage items and IDs is collected	{"batch": 2, "took": "36.967660061s"}
2023-04-11T16:18:20.478+0300	DEBUG	intermediate batch of contract storage items is persisted	{"batch": 1, "took": "56.796257788s", "keys": 200000}
2023-04-11T16:18:54.115+0300	INFO	intermediate batch of contract storage items and IDs is collected	{"batch": 3, "took": "33.637412437s"}
2023-04-11T16:19:18.844+0300	DEBUG	intermediate batch of contract storage items is persisted	{"batch": 2, "took": "1m18.0737668s", "keys": 200000}
2023-04-11T16:19:27.650+0300	INFO	last batch of contract storage items is collected	{"batch": 4, "took": "8.806264019s"}
2023-04-11T16:19:27.650+0300	INFO	contract storage items are reset	{"took": "3m4.780232077s", "keys": 656944}
2023-04-11T16:20:15.660+0300	DEBUG	intermediate batch of contract storage items is persisted	{"batch": 3, "took": "1m21.544386403s", "keys": 200000}
2023-04-11T16:20:15.660+0300	DEBUG	trying to reset headers information
2023-04-11T16:20:16.385+0300	INFO	headers information is reset	{"took": "725.174932ms"}
2023-04-11T16:20:19.586+0300	DEBUG	last batch of contract storage items and IDs is persisted	{"batch": 4, "took": "51.936278608s", "keys": 56945}
2023-04-11T16:20:19.587+0300	DEBUG	trying to reset state root information and NEP transfers
2023-04-11T16:20:35.845+0300	INFO	state root information and NEP transfers are reset	{"took": "16.25852114s"}
2023-04-11T16:21:10.000+0300	DEBUG	headers information is persisted	{"took": "53.613638429s", "keys": 528438}
2023-04-11T16:21:10.003+0300	DEBUG	trying to remove stale storage items
2023-04-11T16:21:18.108+0300	INFO	stale storage items are reset	{"took": "8.105140658s", "keys": 1350176}
2023-04-11T16:21:18.108+0300	DEBUG	trying to remove state reset point
2023-04-11T16:21:18.108+0300	INFO	state reset point is removed	{"took": "8.554µs"}
2023-04-11T16:21:20.151+0300	DEBUG	state root information and NEP transfers are persisted	{"took": "44.305707049s", "keys": 602578}
2023-04-11T16:21:20.212+0300	INFO	state reset point information is persisted	{"took": "2.103764633s", "keys": 2}
2023-04-11T16:21:20.213+0300	INFO	reset finished successfully	{"took": "5m54.306861367s"}
```

The previous commit:
```
anna@kiwi:~/Documents/GitProjects/nspcc-dev/neo-go$ ./bin/neo-go db reset -m --debug --height 600000
2023-04-11T16:24:04.256+0300	INFO	MaxBlockSize is not set or wrong, setting default value	{"MaxBlockSize": 262144}
2023-04-11T16:24:04.256+0300	INFO	MaxBlockSystemFee is not set or wrong, setting default value	{"MaxBlockSystemFee": 900000000000}
2023-04-11T16:24:04.256+0300	INFO	MaxTransactionsPerBlock is not set or wrong, using default value	{"MaxTransactionsPerBlock": 512}
2023-04-11T16:24:04.256+0300	INFO	MaxValidUntilBlockIncrement is not set or wrong, using default value	{"MaxValidUntilBlockIncrement": 5760}
2023-04-11T16:24:04.261+0300	INFO	restoring blockchain	{"version": "0.2.8"}
2023-04-11T16:24:04.368+0300	INFO	initializing state reset	{"target height": 600000}
2023-04-11T16:24:04.368+0300	DEBUG	trying to reset blocks, transactions and AERs
2023-04-11T16:24:30.363+0300	INFO	intermediate batch of removed blocks, transactions and AERs is collected	{"batch": 1, "took": "25.995261037s"}
2023-04-11T16:24:44.947+0300	DEBUG	intermediate batch of removed blocks, transactions and AERs is persisted	{"batch": 1, "took": "14.584447338s", "keys": 321897}
2023-04-11T16:24:45.791+0300	INFO	intermediate batch of removed blocks, transactions and AERs is collected	{"batch": 2, "took": "15.428492824s"}
2023-04-11T16:24:51.252+0300	INFO	last batch of removed blocks, transactions and AERs is collected	{"batch": 3, "took": "5.460662766s"}
2023-04-11T16:24:51.252+0300	INFO	blocks, transactions ans AERs are reset	{"took": "46.884558096s"}
2023-04-11T16:24:55.399+0300	DEBUG	intermediate batch of removed blocks, transactions and AERs is persisted	{"batch": 2, "took": "9.607820004s", "keys": 334821}
2023-04-11T16:24:55.399+0300	DEBUG	trying to reset contract storage items
2023-04-11T16:24:59.981+0300	DEBUG	last batch of removed blocks, transactions and AERs is persisted	{"batch": 3, "took": "8.728713255s", "keys": 208913}
2023-04-11T16:25:50.827+0300	INFO	intermediate batch of contract storage items and IDs is collected	{"batch": 1, "took": "55.426411416s"}
2023-04-11T16:26:28.734+0300	INFO	intermediate batch of contract storage items and IDs is collected	{"batch": 2, "took": "37.902647706s"}
2023-04-11T16:26:53.960+0300	DEBUG	intermediate batch of contract storage items is persisted	{"batch": 1, "took": "1m3.129453265s", "keys": 200001}
2023-04-11T16:27:27.645+0300	INFO	intermediate batch of contract storage items and IDs is collected	{"batch": 3, "took": "33.685283662s"}
2023-04-11T16:27:52.173+0300	DEBUG	intermediate batch of contract storage items is persisted	{"batch": 2, "took": "1m23.438465575s", "keys": 199999}
2023-04-11T16:28:00.995+0300	INFO	last batch of contract storage items is collected	{"batch": 4, "took": "8.821990443s"}
2023-04-11T16:28:00.995+0300	INFO	contract storage items are reset	{"took": "3m5.595950958s", "keys": 656944}
2023-04-11T16:28:49.164+0300	DEBUG	intermediate batch of contract storage items is persisted	{"batch": 3, "took": "1m21.518344712s", "keys": 200000}
2023-04-11T16:28:49.164+0300	DEBUG	trying to reset headers information
2023-04-11T16:28:49.936+0300	INFO	headers information is reset	{"took": "772.36435ms"}
2023-04-11T16:28:53.122+0300	DEBUG	last batch of contract storage items and IDs is persisted	{"batch": 4, "took": "52.126928092s", "keys": 56945}
2023-04-11T16:28:53.122+0300	DEBUG	trying to reset state root information and NEP transfers
2023-04-11T16:29:09.332+0300	INFO	state root information and NEP transfers are reset	{"took": "16.20921699s"}
2023-04-11T16:29:46.264+0300	DEBUG	headers information is persisted	{"took": "56.326715249s", "keys": 528438}
2023-04-11T16:29:46.267+0300	DEBUG	trying to remove stale storage items
2023-04-11T16:29:53.986+0300	INFO	stale storage items are reset	{"took": "7.718950145s", "keys": 1350176}
2023-04-11T16:29:53.986+0300	DEBUG	trying to remove state reset point
2023-04-11T16:29:53.986+0300	INFO	state reset point is removed	{"took": "6.013µs"}
2023-04-11T16:29:55.899+0300	DEBUG	state root information and NEP transfers are persisted	{"took": "46.567302762s", "keys": 602578}
2023-04-11T16:29:55.929+0300	INFO	state reset point information is persisted	{"took": "1.942392208s", "keys": 2}
2023-04-11T16:29:55.929+0300	INFO	reset finished successfully	{"took": "5m51.561573137s"}
```

Master:
```
anna@kiwi:~/Documents/GitProjects/nspcc-dev/neo-go$ ./bin/neo-go db reset -m --debug --height 600000
2023-04-11T16:34:12.410+0300	INFO	MaxBlockSize is not set or wrong, setting default value	{"MaxBlockSize": 262144}
2023-04-11T16:34:12.410+0300	INFO	MaxBlockSystemFee is not set or wrong, setting default value	{"MaxBlockSystemFee": 900000000000}
2023-04-11T16:34:12.410+0300	INFO	MaxTransactionsPerBlock is not set or wrong, using default value	{"MaxTransactionsPerBlock": 512}
2023-04-11T16:34:12.410+0300	INFO	MaxValidUntilBlockIncrement is not set or wrong, using default value	{"MaxValidUntilBlockIncrement": 5760}
2023-04-11T16:34:12.413+0300	INFO	restoring blockchain	{"version": "0.2.8"}
2023-04-11T16:34:12.495+0300	INFO	initialize state reset	{"target height": 600000}
2023-04-11T16:34:12.513+0300	INFO	trying to reset blocks, transactions and AERs
2023-04-11T16:35:03.582+0300	INFO	intermediate batch of removed blocks, transactions and AERs is persisted	{"batches persisted": 1, "took": "51.087226195s", "keys": 321895}
2023-04-11T16:35:31.302+0300	INFO	intermediate batch of removed blocks, transactions and AERs is persisted	{"batches persisted": 2, "took": "27.719871393s", "keys": 334823}
2023-04-11T16:35:41.309+0300	INFO	last batch of removed blocks, transactions and AERs is persisted	{"batches persisted": 3, "took": "10.007017388s", "keys": 208913}
2023-04-11T16:35:41.309+0300	INFO	blocks, transactions ans AERs are reset	{"took": "1m28.814245057s", "overall persisted keys": 865631}
2023-04-11T16:35:41.309+0300	INFO	trying to reset contract storage items
2023-04-11T16:37:38.315+0300	INFO	intermediate batch of contract storage items and IDs is persisted	{"batch": 1, "took": "1m57.00650253s", "keys": 200000}
2023-04-11T16:39:29.704+0300	INFO	intermediate batch of contract storage items and IDs is persisted	{"batch": 2, "took": "1m51.385224725s", "keys": 200000}
2023-04-11T16:41:14.991+0300	INFO	intermediate batch of contract storage items and IDs is persisted	{"batch": 3, "took": "1m45.287483794s", "keys": 200000}
2023-04-11T16:41:31.667+0300	INFO	last batch of contract storage items and IDs is persisted	{"batch": 4, "took": "16.675347478s", "keys": 56945}
2023-04-11T16:41:31.667+0300	INFO	contract storage items and IDs are reset	{"took": "5m50.357775401s", "keys": 656944}
2023-04-11T16:41:31.667+0300	INFO	trying to reset headers information
2023-04-11T16:42:16.779+0300	INFO	headers information is reset	{"took": "45.111354262s", "keys": 528438}
2023-04-11T16:42:16.784+0300	INFO	trying to reset state root information and NEP transfers
2023-04-11T16:42:35.778+0300	INFO	state root information and NEP transfers are reset	{"took": "18.99373117s", "keys": 602578}
2023-04-11T16:42:35.781+0300	INFO	trying to remove stale storage items
2023-04-11T16:42:43.884+0300	INFO	stale storage items are reset	{"took": "8.103929306s", "keys": 1350176}
2023-04-11T16:42:43.885+0300	INFO	trying to remove state reset point
2023-04-11T16:42:43.926+0300	INFO	stale reset point is removed	{"took": "41.858883ms", "keys": 2}
2023-04-11T16:42:43.932+0300	INFO	reset finished successfully	{"took": "8m31.437493325s"}
```

Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
This commit is contained in:
Anna Shaleva 2023-04-11 14:18:48 +03:00
parent cb0f786b28
commit 400620a9fb

View file

@ -680,7 +680,16 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
return fmt.Errorf("failed to retrieve stateroot for height %d: %w", height, err)
}
v := bc.dao.Version
cache := bc.dao // dao is MemCachedStore over DB, so use dao directly to persist cached changes right to the underlying DB
// dao is MemCachedStore over DB, we use dao directly to persist cached changes
// right to the underlying DB.
cache := bc.dao
// upperCache is a private MemCachedStore over cache. During each of the state
// sync stages we put the data inside the upperCache; in the end of each stage
// we persist changes from upperCache to cache. Changes from cache are persisted
// directly to the underlying persistent storage (boltDB, levelDB, etc.).
// upperCache/cache segregation is needed to keep the DB state clean and to
// persist data from different stages separately.
upperCache := cache.GetPrivate()
bc.log.Info("initializing state reset", zap.Uint32("target height", height))
start := time.Now()
@ -715,7 +724,14 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
resetStageKey := []byte{byte(storage.SYSStateChangeStage)}
switch stage {
case none:
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)})
upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(stateJumpStarted)})
// Technically, there's no difference between Persist() and PersistSync() for the private
// MemCached storage, but we'd better use the sync version in case of some further code changes.
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist state reset start marker to the DB: %w", err)
@ -735,7 +751,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
keysCnt = new(int)
)
for i := height + 1; i <= currHeight; i++ {
err := cache.DeleteBlock(bc.GetHeaderHash(i))
err := upperCache.DeleteBlock(bc.GetHeaderHash(i))
if err != nil {
return fmt.Errorf("error while removing block %d: %w", i, err)
}
@ -749,6 +765,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
persistStart := time.Now()
persistBatch := batchCnt
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist intermediate batch of removed blocks, transactions and AERs: %w", err)
@ -763,7 +784,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
p = time.Now()
}
}
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)})
upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(staleBlocksRemoved)})
batchCnt++
bc.log.Info("last batch of removed blocks, transactions and AERs is collected",
zap.Int("batch", batchCnt),
@ -772,6 +793,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
persistStart := time.Now()
persistBatch := batchCnt
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist last batch of removed blocks, transactions ans AERs: %w", err)
@ -795,7 +821,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
if bc.config.Ledger.RemoveUntraceableBlocks {
mode |= mpt.ModeGCFlag
}
trieStore := mpt.NewTrieStore(sr.Root, mode, cache.Store)
trieStore := mpt.NewTrieStore(sr.Root, mode, upperCache.Store)
oldStoragePrefix := v.StoragePrefix
newStoragePrefix := statesync.TemporaryPrefix(oldStoragePrefix)
@ -811,6 +837,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
persistStart := time.Now()
persistBatch := batchCnt
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist intermediate batch of contract storage items: %w", err)
@ -825,7 +856,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
}
// May safely omit KV copying.
k[0] = byte(newStoragePrefix)
cache.Store.Put(k, v)
upperCache.Store.Put(k, v)
cnt++
storageItmsCnt++
@ -833,7 +864,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
})
trieStore.Close()
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)})
upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(newStorageItemsAdded)})
batchCnt++
persistBatch := batchCnt
bc.log.Info("last batch of contract storage items is collected", zap.Int("batch", batchCnt), zap.Duration("took", time.Since(p)))
@ -841,6 +872,11 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
zap.Int("keys", storageItmsCnt))
lastStart := time.Now()
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist contract storage items and IDs changes to the DB: %w", err)
@ -854,19 +890,29 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
// Reset SYS-prefixed and IX-prefixed information.
bc.log.Debug("trying to reset headers information")
for i := height + 1; i <= hHeight; i++ {
cache.PurgeHeader(bc.GetHeaderHash(i))
upperCache.PurgeHeader(bc.GetHeaderHash(i))
}
cache.DeleteHeaderHashes(height+1, headerBatchCount)
cache.StoreAsCurrentBlock(b)
cache.PutCurrentHeader(b.Hash(), height)
upperCache.DeleteHeaderHashes(height+1, headerBatchCount)
upperCache.StoreAsCurrentBlock(b)
upperCache.PutCurrentHeader(b.Hash(), height)
v.StoragePrefix = statesync.TemporaryPrefix(v.StoragePrefix)
cache.PutVersion(v)
upperCache.PutVersion(v)
// It's important to manually change the cache's Version at this stage, so that native cache
// can be properly initialized (with the correct contract storage data prefix) at the final
// stage of the state reset. At the same time, DB's SYSVersion-prefixed data will be persisted
// from upperCache to cache in a standard way (several lines below).
cache.Version = v
bc.persistent.Version = v
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)})
upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(headersReset)})
bc.log.Info("headers information is reset", zap.Duration("took", time.Since(p)))
persistStart := time.Now()
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist headers changes to the DB: %w", err)
@ -879,21 +925,26 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
case headersReset:
// Reset MPT.
bc.log.Debug("trying to reset state root information and NEP transfers")
err = bc.stateRoot.ResetState(height, cache.Store)
err = bc.stateRoot.ResetState(height, upperCache.Store)
if err != nil {
return fmt.Errorf("failed to rollback MPT state: %w", err)
}
// Reset transfers.
err = bc.resetTransfers(cache, height)
err = bc.resetTransfers(upperCache, height)
if err != nil {
return fmt.Errorf("failed to strip transfer log / transfer info: %w", err)
}
cache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)})
upperCache.Store.Put(resetStageKey, []byte{stateResetBit | byte(transfersReset)})
bc.log.Info("state root information and NEP transfers are reset", zap.Duration("took", time.Since(p)))
persistStart := time.Now()
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
upperCache = cache.GetPrivate()
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist contract storage items changes to the DB: %w", err)
@ -927,12 +978,16 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
p = time.Now()
bc.log.Debug("trying to remove state reset point")
cache.Store.Delete(resetStageKey)
upperCache.Store.Delete(resetStageKey)
// Unlike the state jump, state sync point must be removed as we have complete state for this height.
cache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)})
upperCache.Store.Delete([]byte{byte(storage.SYSStateSyncPoint)})
bc.log.Info("state reset point is removed", zap.Duration("took", time.Since(p)))
persistStart := time.Now()
_, uerr := upperCache.PersistSync()
if uerr != nil {
panic(uerr)
}
persistCh <- func(persistedKeys int, err error) error {
if err != nil {
return fmt.Errorf("failed to persist state reset stage to DAO: %w", err)