core: allow to recover after state jump interruption

We need several stages to manage state jump process in order not to mess
up old and new contract storage items and to be sure about genesis state data
are properly removed from the storage. Other operations do not require
separate stage and can be performed each time `jumpToStateInternal` is
called.
This commit is contained in:
Anna Shaleva 2021-08-26 17:34:52 +03:00
parent 5cda24b3af
commit 5cd78c31af
2 changed files with 144 additions and 36 deletions

View file

@ -44,7 +44,7 @@ import (
// Tuning parameters. // Tuning parameters.
const ( const (
headerBatchCount = 2000 headerBatchCount = 2000
version = "0.1.3" version = "0.1.4"
defaultInitialGAS = 52000000_00000000 defaultInitialGAS = 52000000_00000000
defaultMemPoolSize = 50000 defaultMemPoolSize = 50000
@ -56,6 +56,31 @@ const (
// HeaderVerificationGasLimit is the maximum amount of GAS for block header verification. // HeaderVerificationGasLimit is the maximum amount of GAS for block header verification.
HeaderVerificationGasLimit = 3_00000000 // 3 GAS HeaderVerificationGasLimit = 3_00000000 // 3 GAS
defaultStateSyncInterval = 40000 defaultStateSyncInterval = 40000
// maxStorageBatchSize is the number of elements in storage batch expected to fit into the
// storage without delays and problems. Estimated size of batch in case of given number of
// elements does not exceed 1Mb.
maxStorageBatchSize = 10000
)
// stateJumpStage denotes the stage of state jump process.
type stateJumpStage byte
const (
// none means that no state jump process was initiated yet.
none stateJumpStage = 1 << iota
// stateJumpStarted means that state jump was just initiated, but outdated storage items
// were not yet removed.
stateJumpStarted
// oldStorageItemsRemoved means that outdated contract storage items were removed, but
// new storage items were not yet saved.
oldStorageItemsRemoved
// newStorageItemsAdded means that contract storage items are up-to-date with the current
// state.
newStorageItemsAdded
// genesisStateRemoved means that state corresponding to the genesis block was removed
// from the storage.
genesisStateRemoved
) )
var ( var (
@ -357,6 +382,24 @@ func (bc *Blockchain) init() error {
} }
} }
// Check whether StateJump stage is in the storage and continue interrupted state jump if so.
jumpStage, err := bc.dao.Store.Get(storage.SYSStateJumpStage.Bytes())
if err == nil {
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().RemoveUntraceableBlocks) {
return errors.New("state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on. " +
"To start an archival node drop the database manually and restart the node")
}
if len(jumpStage) != 1 {
return fmt.Errorf("invalid state jump stage format")
}
// State jump wasn't finished yet, thus continue it.
stateSyncPoint, err := bc.dao.GetStateSyncPoint()
if err != nil {
return fmt.Errorf("failed to get state sync point from the storage")
}
return bc.jumpToStateInternal(stateSyncPoint, stateJumpStage(jumpStage[0]))
}
bHeight, err := bc.dao.GetCurrentBlockHeight() bHeight, err := bc.dao.GetCurrentBlockHeight()
if err != nil { if err != nil {
return err return err
@ -418,17 +461,109 @@ func (bc *Blockchain) jumpToState(p uint32) error {
bc.lock.Lock() bc.lock.Lock()
defer bc.lock.Unlock() defer bc.lock.Unlock()
return bc.jumpToStateInternal(p, none)
}
// jumpToStateInternal is an internal representation of jumpToState callback that
// changes Blockchain state to the one specified by state sync point p and state
// jump stage. All the data needed for the jump must be in the DB, otherwise an
// error is returned. It is not protected by mutex.
func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error {
if p+1 >= uint32(len(bc.headerHashes)) { if p+1 >= uint32(len(bc.headerHashes)) {
return fmt.Errorf("invalid state sync point") return fmt.Errorf("invalid state sync point %d: headerHeignt is %d", p, len(bc.headerHashes))
} }
bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p)) bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p))
writeBuf := io.NewBufBinWriter()
jumpStageKey := storage.SYSStateJumpStage.Bytes()
switch stage {
case none:
err := bc.dao.Store.Put(jumpStageKey, []byte{byte(stateJumpStarted)})
if err != nil {
return fmt.Errorf("failed to store state jump stage: %w", err)
}
fallthrough
case stateJumpStarted:
// Replace old storage items by new ones, it should be done step-by step.
// Firstly, remove all old genesis-related items.
b := bc.dao.Store.Batch()
bc.dao.Store.Seek([]byte{byte(storage.STStorage)}, func(k, _ []byte) {
// Must copy here, #1468.
key := slice.Copy(k)
b.Delete(key)
})
b.Put(jumpStageKey, []byte{byte(oldStorageItemsRemoved)})
err := bc.dao.Store.PutBatch(b)
if err != nil {
return fmt.Errorf("failed to store state jump stage: %w", err)
}
fallthrough
case oldStorageItemsRemoved:
// Then change STTempStorage prefix to STStorage. Each replace operation is atomic.
for {
count := 0
b := bc.dao.Store.Batch()
bc.dao.Store.Seek([]byte{byte(storage.STTempStorage)}, func(k, v []byte) {
if count >= maxStorageBatchSize {
return
}
// Must copy here, #1468.
oldKey := slice.Copy(k)
b.Delete(oldKey)
key := make([]byte, len(k))
key[0] = byte(storage.STStorage)
copy(key[1:], k[1:])
value := slice.Copy(v)
b.Put(key, value)
count += 2
})
if count > 0 {
err := bc.dao.Store.PutBatch(b)
if err != nil {
return fmt.Errorf("failed to replace outdated contract storage items with the fresh ones: %w", err)
}
} else {
break
}
}
err := bc.dao.Store.Put(jumpStageKey, []byte{byte(newStorageItemsAdded)})
if err != nil {
return fmt.Errorf("failed to store state jump stage: %w", err)
}
fallthrough
case newStorageItemsAdded:
// After current state is updated, we need to remove outdated state-related data if so.
// The only outdated data we might have is genesis-related data, so check it.
if p-bc.config.MaxTraceableBlocks > 0 {
cache := bc.dao.GetWrapped()
writeBuf.Reset()
err := cache.DeleteBlock(bc.headerHashes[0], writeBuf)
if err != nil {
return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err)
}
// TODO: remove NEP17 transfers and NEP17 transfer info for genesis block, #2096 related.
_, err = cache.Persist()
if err != nil {
return fmt.Errorf("failed to drop genesis block state: %w", err)
}
}
err := bc.dao.Store.Put(jumpStageKey, []byte{byte(genesisStateRemoved)})
if err != nil {
return fmt.Errorf("failed to store state jump stage: %w", err)
}
case genesisStateRemoved:
// there's nothing to do after that, so just continue with common operations
// and remove state jump stage in the end.
default:
return errors.New("unknown state jump stage")
}
block, err := bc.dao.GetBlock(bc.headerHashes[p]) block, err := bc.dao.GetBlock(bc.headerHashes[p])
if err != nil { if err != nil {
return fmt.Errorf("failed to get current block: %w", err) return fmt.Errorf("failed to get current block: %w", err)
} }
writeBuf := io.NewBufBinWriter() writeBuf.Reset()
err = bc.dao.StoreAsCurrentBlock(block, writeBuf) err = bc.dao.StoreAsCurrentBlock(block, writeBuf)
if err != nil { if err != nil {
return fmt.Errorf("failed to store current block: %w", err) return fmt.Errorf("failed to store current block: %w", err)
@ -448,27 +583,6 @@ func (bc *Blockchain) jumpToState(p uint32) error {
return fmt.Errorf("can't perform MPT jump to height %d: %w", p, err) return fmt.Errorf("can't perform MPT jump to height %d: %w", p, err)
} }
b := bc.dao.Store.Batch()
bc.dao.Store.Seek([]byte{byte(storage.STStorage)}, func(k, _ []byte) {
// Must copy here, #1468.
key := slice.Copy(k)
b.Delete(key)
})
bc.dao.Store.Seek([]byte{byte(storage.STTempStorage)}, func(k, v []byte) {
// Must copy here, #1468.
oldKey := slice.Copy(k)
b.Delete(oldKey)
key := make([]byte, len(k))
key[0] = byte(storage.STStorage)
copy(key[1:], k[1:])
value := slice.Copy(v)
b.Put(key, value)
})
err = bc.dao.Store.PutBatch(b)
if err != nil {
return fmt.Errorf("failed to replace outdated contract storage items with the fresh ones: %w", err)
}
err = bc.contracts.NEO.InitializeCache(bc, bc.dao) err = bc.contracts.NEO.InitializeCache(bc, bc.dao)
if err != nil { if err != nil {
return fmt.Errorf("can't init cache for NEO native contract: %w", err) return fmt.Errorf("can't init cache for NEO native contract: %w", err)
@ -483,19 +597,12 @@ func (bc *Blockchain) jumpToState(p uint32) error {
return fmt.Errorf("failed to update extensible whitelist: %w", err) return fmt.Errorf("failed to update extensible whitelist: %w", err)
} }
// After current state is updated, we need to remove outdated state-related data if so.
// The only outdated data we might have is genesis-related data, so check it.
if p-bc.config.MaxTraceableBlocks > 0 {
cache := bc.dao.GetWrapped()
writeBuf.Reset()
err = cache.DeleteBlock(bc.headerHashes[0], writeBuf)
if err != nil {
return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err)
}
// TODO: remove NEP17 transfers and NEP17 transfer info for genesis block, #2096 related.
}
updateBlockHeightMetric(p) updateBlockHeightMetric(p)
err = bc.dao.Store.Delete(jumpStageKey)
if err != nil {
return fmt.Errorf("failed to remove outdated state jump stage: %w", err)
}
return nil return nil
} }

View file

@ -27,6 +27,7 @@ const (
SYSCurrentHeader KeyPrefix = 0xc1 SYSCurrentHeader KeyPrefix = 0xc1
SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2 SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2
SYSStateSyncPoint KeyPrefix = 0xc3 SYSStateSyncPoint KeyPrefix = 0xc3
SYSStateJumpStage KeyPrefix = 0xc4
SYSVersion KeyPrefix = 0xf0 SYSVersion KeyPrefix = 0xf0
) )