diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 9b284c143..f2e7e14a1 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -44,7 +44,7 @@ import ( // Tuning parameters. const ( headerBatchCount = 2000 - version = "0.1.3" + version = "0.1.4" defaultInitialGAS = 52000000_00000000 defaultMemPoolSize = 50000 @@ -56,6 +56,31 @@ const ( // HeaderVerificationGasLimit is the maximum amount of GAS for block header verification. HeaderVerificationGasLimit = 3_00000000 // 3 GAS defaultStateSyncInterval = 40000 + + // maxStorageBatchSize is the number of elements in storage batch expected to fit into the + // storage without delays and problems. Estimated size of batch in case of given number of + // elements does not exceed 1Mb. + maxStorageBatchSize = 10000 +) + +// stateJumpStage denotes the stage of state jump process. +type stateJumpStage byte + +const ( + // none means that no state jump process was initiated yet. + none stateJumpStage = 1 << iota + // stateJumpStarted means that state jump was just initiated, but outdated storage items + // were not yet removed. + stateJumpStarted + // oldStorageItemsRemoved means that outdated contract storage items were removed, but + // new storage items were not yet saved. + oldStorageItemsRemoved + // newStorageItemsAdded means that contract storage items are up-to-date with the current + // state. + newStorageItemsAdded + // genesisStateRemoved means that state corresponding to the genesis block was removed + // from the storage. + genesisStateRemoved ) var ( @@ -357,6 +382,24 @@ func (bc *Blockchain) init() error { } } + // Check whether StateJump stage is in the storage and continue interrupted state jump if so. + jumpStage, err := bc.dao.Store.Get(storage.SYSStateJumpStage.Bytes()) + if err == nil { + if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().RemoveUntraceableBlocks) { + return errors.New("state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on. " + + "To start an archival node drop the database manually and restart the node") + } + if len(jumpStage) != 1 { + return fmt.Errorf("invalid state jump stage format") + } + // State jump wasn't finished yet, thus continue it. + stateSyncPoint, err := bc.dao.GetStateSyncPoint() + if err != nil { + return fmt.Errorf("failed to get state sync point from the storage") + } + return bc.jumpToStateInternal(stateSyncPoint, stateJumpStage(jumpStage[0])) + } + bHeight, err := bc.dao.GetCurrentBlockHeight() if err != nil { return err @@ -418,17 +461,109 @@ func (bc *Blockchain) jumpToState(p uint32) error { bc.lock.Lock() defer bc.lock.Unlock() + return bc.jumpToStateInternal(p, none) +} + +// jumpToStateInternal is an internal representation of jumpToState callback that +// changes Blockchain state to the one specified by state sync point p and state +// jump stage. All the data needed for the jump must be in the DB, otherwise an +// error is returned. It is not protected by mutex. +func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error { if p+1 >= uint32(len(bc.headerHashes)) { - return fmt.Errorf("invalid state sync point") + return fmt.Errorf("invalid state sync point %d: headerHeignt is %d", p, len(bc.headerHashes)) } bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p)) + writeBuf := io.NewBufBinWriter() + jumpStageKey := storage.SYSStateJumpStage.Bytes() + switch stage { + case none: + err := bc.dao.Store.Put(jumpStageKey, []byte{byte(stateJumpStarted)}) + if err != nil { + return fmt.Errorf("failed to store state jump stage: %w", err) + } + fallthrough + case stateJumpStarted: + // Replace old storage items by new ones, it should be done step-by step. + // Firstly, remove all old genesis-related items. + b := bc.dao.Store.Batch() + bc.dao.Store.Seek([]byte{byte(storage.STStorage)}, func(k, _ []byte) { + // Must copy here, #1468. + key := slice.Copy(k) + b.Delete(key) + }) + b.Put(jumpStageKey, []byte{byte(oldStorageItemsRemoved)}) + err := bc.dao.Store.PutBatch(b) + if err != nil { + return fmt.Errorf("failed to store state jump stage: %w", err) + } + fallthrough + case oldStorageItemsRemoved: + // Then change STTempStorage prefix to STStorage. Each replace operation is atomic. + for { + count := 0 + b := bc.dao.Store.Batch() + bc.dao.Store.Seek([]byte{byte(storage.STTempStorage)}, func(k, v []byte) { + if count >= maxStorageBatchSize { + return + } + // Must copy here, #1468. + oldKey := slice.Copy(k) + b.Delete(oldKey) + key := make([]byte, len(k)) + key[0] = byte(storage.STStorage) + copy(key[1:], k[1:]) + value := slice.Copy(v) + b.Put(key, value) + count += 2 + }) + if count > 0 { + err := bc.dao.Store.PutBatch(b) + if err != nil { + return fmt.Errorf("failed to replace outdated contract storage items with the fresh ones: %w", err) + } + } else { + break + } + } + err := bc.dao.Store.Put(jumpStageKey, []byte{byte(newStorageItemsAdded)}) + if err != nil { + return fmt.Errorf("failed to store state jump stage: %w", err) + } + fallthrough + case newStorageItemsAdded: + // After current state is updated, we need to remove outdated state-related data if so. + // The only outdated data we might have is genesis-related data, so check it. + if p-bc.config.MaxTraceableBlocks > 0 { + cache := bc.dao.GetWrapped() + writeBuf.Reset() + err := cache.DeleteBlock(bc.headerHashes[0], writeBuf) + if err != nil { + return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err) + } + // TODO: remove NEP17 transfers and NEP17 transfer info for genesis block, #2096 related. + _, err = cache.Persist() + if err != nil { + return fmt.Errorf("failed to drop genesis block state: %w", err) + } + } + err := bc.dao.Store.Put(jumpStageKey, []byte{byte(genesisStateRemoved)}) + if err != nil { + return fmt.Errorf("failed to store state jump stage: %w", err) + } + case genesisStateRemoved: + // there's nothing to do after that, so just continue with common operations + // and remove state jump stage in the end. + default: + return errors.New("unknown state jump stage") + } + block, err := bc.dao.GetBlock(bc.headerHashes[p]) if err != nil { return fmt.Errorf("failed to get current block: %w", err) } - writeBuf := io.NewBufBinWriter() + writeBuf.Reset() err = bc.dao.StoreAsCurrentBlock(block, writeBuf) if err != nil { return fmt.Errorf("failed to store current block: %w", err) @@ -448,27 +583,6 @@ func (bc *Blockchain) jumpToState(p uint32) error { return fmt.Errorf("can't perform MPT jump to height %d: %w", p, err) } - b := bc.dao.Store.Batch() - bc.dao.Store.Seek([]byte{byte(storage.STStorage)}, func(k, _ []byte) { - // Must copy here, #1468. - key := slice.Copy(k) - b.Delete(key) - }) - bc.dao.Store.Seek([]byte{byte(storage.STTempStorage)}, func(k, v []byte) { - // Must copy here, #1468. - oldKey := slice.Copy(k) - b.Delete(oldKey) - key := make([]byte, len(k)) - key[0] = byte(storage.STStorage) - copy(key[1:], k[1:]) - value := slice.Copy(v) - b.Put(key, value) - }) - err = bc.dao.Store.PutBatch(b) - if err != nil { - return fmt.Errorf("failed to replace outdated contract storage items with the fresh ones: %w", err) - } - err = bc.contracts.NEO.InitializeCache(bc, bc.dao) if err != nil { return fmt.Errorf("can't init cache for NEO native contract: %w", err) @@ -483,19 +597,12 @@ func (bc *Blockchain) jumpToState(p uint32) error { return fmt.Errorf("failed to update extensible whitelist: %w", err) } - // After current state is updated, we need to remove outdated state-related data if so. - // The only outdated data we might have is genesis-related data, so check it. - if p-bc.config.MaxTraceableBlocks > 0 { - cache := bc.dao.GetWrapped() - writeBuf.Reset() - err = cache.DeleteBlock(bc.headerHashes[0], writeBuf) - if err != nil { - return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err) - } - // TODO: remove NEP17 transfers and NEP17 transfer info for genesis block, #2096 related. - } - updateBlockHeightMetric(p) + + err = bc.dao.Store.Delete(jumpStageKey) + if err != nil { + return fmt.Errorf("failed to remove outdated state jump stage: %w", err) + } return nil } diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 26f5f5133..bd62f6001 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -27,6 +27,7 @@ const ( SYSCurrentHeader KeyPrefix = 0xc1 SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2 SYSStateSyncPoint KeyPrefix = 0xc3 + SYSStateJumpStage KeyPrefix = 0xc4 SYSVersion KeyPrefix = 0xf0 )