diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 2d7d4345f..9d0cb3a0f 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -2,6 +2,7 @@ package core import ( "bytes" + "encoding/binary" "errors" "fmt" "math" @@ -356,9 +357,6 @@ func (bc *Blockchain) init() error { bc.dao.Version = ver bc.persistent.Version = ver - // Always try to remove garbage. If there is nothing to do, it will exit quickly. - go bc.removeOldStorageItems() - // At this point there was no version found in the storage which // implies a creating fresh storage with the version specified // and the genesis block as first block. @@ -483,27 +481,6 @@ func (bc *Blockchain) init() error { return bc.updateExtensibleWhitelist(bHeight) } -func (bc *Blockchain) removeOldStorageItems() { - _, err := bc.dao.Store.Get(storage.SYSCleanStorage.Bytes()) - if err != nil { - return - } - - b := bc.dao.Store.Batch() - prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix) - bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) bool { - // #1468, but don't need to copy here, because it is done by Store. - b.Delete(k) - return true - }) - b.Delete(storage.SYSCleanStorage.Bytes()) - - err = bc.dao.Store.PutBatch(b) - if err != nil { - bc.log.Error("failed to remove old storage items", zap.Error(err)) - } -} - // jumpToState is an atomic operation that changes Blockchain state to the one // specified by the state sync point p. All the data needed for the jump must be // collected by the state sync module. @@ -553,31 +530,43 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error return fmt.Errorf("failed to store state jump stage: %w", err) } - err = bc.dao.Store.Put(storage.SYSCleanStorage.Bytes(), []byte{}) - if err != nil { - return fmt.Errorf("failed to store clean storage flag: %w", err) - } - - go bc.removeOldStorageItems() - fallthrough case newStorageItemsAdded: + b := bc.dao.Store.Batch() + prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix) + bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) bool { + // #1468, but don't need to copy here, because it is done by Store. + b.Delete(k) + return true + }) + + err := bc.dao.Store.PutBatch(b) + if err != nil { + return fmt.Errorf("failed to remove old storage items: %w", err) + } + // After current state is updated, we need to remove outdated state-related data if so. // The only outdated data we might have is genesis-related data, so check it. if p-bc.config.MaxTraceableBlocks > 0 { - cache := bc.dao.GetWrapped() + cache := bc.dao.GetWrapped().(*dao.Simple) writeBuf.Reset() err := cache.DeleteBlock(bc.headerHashes[0], writeBuf) if err != nil { return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err) } - // TODO: remove NEP-17 transfers and NEP-17 transfer info for genesis block, #2096 related. + prefixes := []byte{byte(storage.STNEP11Transfers), byte(storage.STNEP17Transfers), byte(storage.STTokenTransferInfo)} + for i := range prefixes { + cache.Store.Seek(storage.SeekRange{Prefix: prefixes[i : i+1]}, func(k, v []byte) bool { + _ = cache.Store.Delete(k) // It's MemCachedStore which never returns an error. + return true + }) + } _, err = cache.Persist() if err != nil { return fmt.Errorf("failed to drop genesis block state: %w", err) } } - err := bc.dao.Store.Put(jumpStageKey, []byte{byte(genesisStateRemoved)}) + err = bc.dao.Store.Put(jumpStageKey, []byte{byte(genesisStateRemoved)}) if err != nil { return fmt.Errorf("failed to store state jump stage: %w", err) } @@ -704,6 +693,62 @@ func (bc *Blockchain) tryRunGC(old uint32) time.Duration { tgtBlock /= int64(bc.config.GarbageCollectionPeriod) tgtBlock *= int64(bc.config.GarbageCollectionPeriod) dur = bc.stateRoot.GC(uint32(tgtBlock), bc.store) + dur += bc.removeOldTransfers(uint32(tgtBlock)) + } + return dur +} + +func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration { + bc.log.Info("starting transfer data garbage collection", zap.Uint32("index", index)) + start := time.Now() + h, err := bc.GetHeader(bc.GetHeaderHash(int(index))) + if err != nil { + dur := time.Since(start) + bc.log.Error("failed to find block header for transfer GC", zap.Duration("time", dur), zap.Error(err)) + return dur + } + var removed, kept int64 + var ts = h.Timestamp + prefixes := []byte{byte(storage.STNEP11Transfers), byte(storage.STNEP17Transfers)} + + for i := range prefixes { + var acc util.Uint160 + var canDrop bool + + err = bc.store.SeekGC(storage.SeekRange{ + Prefix: prefixes[i : i+1], + Backwards: true, // From new to old. + }, func(k, v []byte) bool { + // We don't look inside of the batches, it requires too much effort, instead + // we drop batches that are confirmed to contain outdated entries. + var batchAcc util.Uint160 + var batchTs = binary.BigEndian.Uint64(k[1+util.Uint160Size:]) + copy(batchAcc[:], k[1:]) + + if batchAcc != acc { // Some new account we're iterating over. + acc = batchAcc + } else if canDrop { // We've seen this account and all entries in this batch are guaranteed to be outdated. + removed++ + return false + } + // We don't know what's inside, so keep the current + // batch anyway, but allow to drop older ones. + canDrop = batchTs <= ts + kept++ + return true + }) + if err != nil { + break + } + } + dur := time.Since(start) + if err != nil { + bc.log.Error("failed to flush transfer data GC changeset", zap.Duration("time", dur), zap.Error(err)) + } else { + bc.log.Info("finished transfer data garbage collection", + zap.Int64("removed", removed), + zap.Int64("kept", kept), + zap.Duration("time", dur)) } return dur } diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 4b9f1da2f..db3d56e7d 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -1588,6 +1588,67 @@ func TestDumpAndRestore(t *testing.T) { }) } +func TestRemoveOldTransfers(t *testing.T) { + // Creating proper number of transfers/blocks takes unneccessary time, so emulate + // some DB with stale entries. + bc := newTestChain(t) + h, err := bc.GetHeader(bc.GetHeaderHash(0)) + require.NoError(t, err) + older := h.Timestamp - 1000 + newer := h.Timestamp + 1000 + acc1 := util.Uint160{1} + acc2 := util.Uint160{2} + acc3 := util.Uint160{3} + ttl := state.TokenTransferLog{Raw: []byte{1}} // It's incorrect, but who cares. + + for i := uint32(0); i < 3; i++ { + require.NoError(t, bc.dao.PutTokenTransferLog(acc1, older, i, false, &ttl)) + } + for i := uint32(0); i < 3; i++ { + require.NoError(t, bc.dao.PutTokenTransferLog(acc2, newer, i, false, &ttl)) + } + for i := uint32(0); i < 2; i++ { + require.NoError(t, bc.dao.PutTokenTransferLog(acc3, older, i, true, &ttl)) + } + for i := uint32(0); i < 2; i++ { + require.NoError(t, bc.dao.PutTokenTransferLog(acc3, newer, i, true, &ttl)) + } + + _, err = bc.dao.Persist() + require.NoError(t, err) + _ = bc.removeOldTransfers(0) + + for i := uint32(0); i < 2; i++ { + log, err := bc.dao.GetTokenTransferLog(acc1, older, i, false) + require.NoError(t, err) + require.Equal(t, 0, len(log.Raw)) + } + + log, err := bc.dao.GetTokenTransferLog(acc1, older, 2, false) + require.NoError(t, err) + require.NotEqual(t, 0, len(log.Raw)) + + for i := uint32(0); i < 3; i++ { + log, err = bc.dao.GetTokenTransferLog(acc2, newer, i, false) + require.NoError(t, err) + require.NotEqual(t, 0, len(log.Raw)) + } + + log, err = bc.dao.GetTokenTransferLog(acc3, older, 0, true) + require.NoError(t, err) + require.Equal(t, 0, len(log.Raw)) + + log, err = bc.dao.GetTokenTransferLog(acc3, older, 1, true) + require.NoError(t, err) + require.NotEqual(t, 0, len(log.Raw)) + + for i := uint32(0); i < 2; i++ { + log, err = bc.dao.GetTokenTransferLog(acc3, newer, i, true) + require.NoError(t, err) + require.NotEqual(t, 0, len(log.Raw)) + } +} + func TestRemoveUntraceable(t *testing.T) { check := func(t *testing.T, bc *Blockchain, tHash, bHash, sHash util.Uint256, errorExpected bool) { _, _, err := bc.GetTransaction(tHash) diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index de5e14ec4..24372bc4c 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -16,7 +16,6 @@ const ( // DataMPTAux is used to store additional MPT data like height-root // mappings and local/validated heights. DataMPTAux KeyPrefix = 0x04 - STAccount KeyPrefix = 0x40 STContractID KeyPrefix = 0x51 STStorage KeyPrefix = 0x70 // STTempStorage is used to store contract storage items during state sync process @@ -33,7 +32,6 @@ const ( SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2 SYSStateSyncPoint KeyPrefix = 0xc3 SYSStateJumpStage KeyPrefix = 0xc4 - SYSCleanStorage KeyPrefix = 0xc5 SYSVersion KeyPrefix = 0xf0 ) diff --git a/pkg/core/storage/store_test.go b/pkg/core/storage/store_test.go index 220ad4a55..a07b97ece 100644 --- a/pkg/core/storage/store_test.go +++ b/pkg/core/storage/store_test.go @@ -10,7 +10,7 @@ import ( var ( prefixes = []KeyPrefix{ DataExecutable, - STAccount, + DataMPT, STStorage, IXHeaderHashList, SYSCurrentBlock, @@ -20,7 +20,7 @@ var ( expected = []uint8{ 0x01, - 0x40, + 0x03, 0x70, 0x80, 0xc0, @@ -49,12 +49,12 @@ func TestBatchToOperations(t *testing.T) { b := &MemBatch{ Put: []KeyValueExists{ {KeyValue: KeyValue{Key: []byte{byte(STStorage), 0x01}, Value: []byte{0x01}}}, - {KeyValue: KeyValue{Key: []byte{byte(STAccount), 0x02}, Value: []byte{0x02}}}, + {KeyValue: KeyValue{Key: []byte{byte(DataMPT), 0x02}, Value: []byte{0x02}}}, {KeyValue: KeyValue{Key: []byte{byte(STStorage), 0x03}, Value: []byte{0x03}}, Exists: true}, }, Deleted: []KeyValueExists{ {KeyValue: KeyValue{Key: []byte{byte(STStorage), 0x04}, Value: []byte{0x04}}}, - {KeyValue: KeyValue{Key: []byte{byte(STAccount), 0x05}, Value: []byte{0x05}}}, + {KeyValue: KeyValue{Key: []byte{byte(DataMPT), 0x05}, Value: []byte{0x05}}}, {KeyValue: KeyValue{Key: []byte{byte(STStorage), 0x06}, Value: []byte{0x06}}, Exists: true}, }, }