core: remove old storage items asynchronously
Signed-off-by: Evgeniy Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
582d489c90
commit
fac595bbdf
4 changed files with 41 additions and 21 deletions
|
@ -67,9 +67,6 @@ const (
|
||||||
// stateJumpStarted means that state jump was just initiated, but outdated storage items
|
// stateJumpStarted means that state jump was just initiated, but outdated storage items
|
||||||
// were not yet removed.
|
// were not yet removed.
|
||||||
stateJumpStarted
|
stateJumpStarted
|
||||||
// oldStorageItemsRemoved means that outdated contract storage items were removed, but
|
|
||||||
// new storage items were not yet saved.
|
|
||||||
oldStorageItemsRemoved
|
|
||||||
// newStorageItemsAdded means that contract storage items are up-to-date with the current
|
// newStorageItemsAdded means that contract storage items are up-to-date with the current
|
||||||
// state.
|
// state.
|
||||||
newStorageItemsAdded
|
newStorageItemsAdded
|
||||||
|
@ -354,6 +351,9 @@ func (bc *Blockchain) init() error {
|
||||||
bc.dao.Version = ver
|
bc.dao.Version = ver
|
||||||
bc.persistent.Version = ver
|
bc.persistent.Version = ver
|
||||||
|
|
||||||
|
// Always try to remove garbage. If there is nothing to do, it will exit quickly.
|
||||||
|
go bc.removeOldStorageItems()
|
||||||
|
|
||||||
// At this point there was no version found in the storage which
|
// At this point there was no version found in the storage which
|
||||||
// implies a creating fresh storage with the version specified
|
// implies a creating fresh storage with the version specified
|
||||||
// and the genesis block as first block.
|
// and the genesis block as first block.
|
||||||
|
@ -478,6 +478,26 @@ func (bc *Blockchain) init() error {
|
||||||
return bc.updateExtensibleWhitelist(bHeight)
|
return bc.updateExtensibleWhitelist(bHeight)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bc *Blockchain) removeOldStorageItems() {
|
||||||
|
_, err := bc.dao.Store.Get(storage.SYSCleanStorage.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
b := bc.dao.Store.Batch()
|
||||||
|
prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
|
||||||
|
bc.dao.Store.Seek([]byte{byte(prefix)}, func(k, _ []byte) {
|
||||||
|
// #1468, but don't need to copy here, because it is done by Store.
|
||||||
|
b.Delete(k)
|
||||||
|
})
|
||||||
|
b.Delete(storage.SYSCleanStorage.Bytes())
|
||||||
|
|
||||||
|
err = bc.dao.Store.PutBatch(b)
|
||||||
|
if err != nil {
|
||||||
|
bc.log.Error("failed to remove old storage items", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// jumpToState is an atomic operation that changes Blockchain state to the one
|
// jumpToState is an atomic operation that changes Blockchain state to the one
|
||||||
// specified by the state sync point p. All the data needed for the jump must be
|
// specified by the state sync point p. All the data needed for the jump must be
|
||||||
// collected by the state sync module.
|
// collected by the state sync module.
|
||||||
|
@ -509,20 +529,6 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
|
||||||
}
|
}
|
||||||
fallthrough
|
fallthrough
|
||||||
case stateJumpStarted:
|
case stateJumpStarted:
|
||||||
// Replace old storage items by new ones, it should be done step-by step.
|
|
||||||
// Firstly, remove all old genesis-related items.
|
|
||||||
b := bc.dao.Store.Batch()
|
|
||||||
bc.dao.Store.Seek([]byte{byte(bc.dao.Version.StoragePrefix)}, func(k, _ []byte) {
|
|
||||||
// #1468, but don't need to copy here, because it is done by Store.
|
|
||||||
b.Delete(k)
|
|
||||||
})
|
|
||||||
b.Put(jumpStageKey, []byte{byte(oldStorageItemsRemoved)})
|
|
||||||
err := bc.dao.Store.PutBatch(b)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to store state jump stage: %w", err)
|
|
||||||
}
|
|
||||||
fallthrough
|
|
||||||
case oldStorageItemsRemoved:
|
|
||||||
newPrefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
|
newPrefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
|
||||||
v, err := bc.dao.GetVersion()
|
v, err := bc.dao.GetVersion()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -538,6 +544,14 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to store state jump stage: %w", err)
|
return fmt.Errorf("failed to store state jump stage: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = bc.dao.Store.Put(storage.SYSCleanStorage.Bytes(), []byte{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to store clean storage flag: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go bc.removeOldStorageItems()
|
||||||
|
|
||||||
fallthrough
|
fallthrough
|
||||||
case newStorageItemsAdded:
|
case newStorageItemsAdded:
|
||||||
// After current state is updated, we need to remove outdated state-related data if so.
|
// After current state is updated, we need to remove outdated state-related data if so.
|
||||||
|
|
|
@ -1816,7 +1816,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
|
||||||
require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), point))
|
require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), point))
|
||||||
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, true)
|
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, true)
|
||||||
})
|
})
|
||||||
for _, stage := range []stateJumpStage{stateJumpStarted, oldStorageItemsRemoved, newStorageItemsAdded, genesisStateRemoved, 0x03} {
|
for _, stage := range []stateJumpStage{stateJumpStarted, newStorageItemsAdded, genesisStateRemoved, 0x03} {
|
||||||
t.Run(fmt.Sprintf("state jump stage %d", stage), func(t *testing.T) {
|
t.Run(fmt.Sprintf("state jump stage %d", stage), func(t *testing.T) {
|
||||||
require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateJumpStage.Bytes(), []byte{byte(stage)}))
|
require.NoError(t, bcSpout.dao.Store.Put(storage.SYSStateJumpStage.Bytes(), []byte{byte(stage)}))
|
||||||
point := make([]byte, 4)
|
point := make([]byte, 4)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/config"
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
@ -441,9 +442,13 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
||||||
require.ElementsMatch(t, expected, actual)
|
require.ElementsMatch(t, expected, actual)
|
||||||
|
|
||||||
// no temp items should be left
|
// no temp items should be left
|
||||||
bcBolt.dao.Store.Seek(storage.STStorage.Bytes(), func(k, v []byte) {
|
require.Eventually(t, func() bool {
|
||||||
t.Fatal("temp storage items are found")
|
var haveItems bool
|
||||||
|
bcBolt.dao.Store.Seek(storage.STStorage.Bytes(), func(_, _ []byte) {
|
||||||
|
haveItems = true
|
||||||
})
|
})
|
||||||
|
return !haveItems
|
||||||
|
}, time.Second*5, time.Millisecond*100)
|
||||||
bcBolt.Close()
|
bcBolt.Close()
|
||||||
|
|
||||||
// Check restoring with new prefix.
|
// Check restoring with new prefix.
|
||||||
|
|
|
@ -29,6 +29,7 @@ const (
|
||||||
SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2
|
SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2
|
||||||
SYSStateSyncPoint KeyPrefix = 0xc3
|
SYSStateSyncPoint KeyPrefix = 0xc3
|
||||||
SYSStateJumpStage KeyPrefix = 0xc4
|
SYSStateJumpStage KeyPrefix = 0xc4
|
||||||
|
SYSCleanStorage KeyPrefix = 0xc5
|
||||||
SYSVersion KeyPrefix = 0xf0
|
SYSVersion KeyPrefix = 0xf0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue