diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 2d7d4345f..4542c1b1f 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -2,6 +2,7 @@ package core import ( "bytes" + "encoding/binary" "errors" "fmt" "math" @@ -704,6 +705,62 @@ func (bc *Blockchain) tryRunGC(old uint32) time.Duration { tgtBlock /= int64(bc.config.GarbageCollectionPeriod) tgtBlock *= int64(bc.config.GarbageCollectionPeriod) dur = bc.stateRoot.GC(uint32(tgtBlock), bc.store) + dur += bc.removeOldTransfers(uint32(tgtBlock)) + } + return dur +} + +func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration { + bc.log.Info("starting transfer data garbage collection", zap.Uint32("index", index)) + start := time.Now() + h, err := bc.GetHeader(bc.GetHeaderHash(int(index))) + if err != nil { + dur := time.Since(start) + bc.log.Error("failed to find block header for transfer GC", zap.Duration("time", dur), zap.Error(err)) + return dur + } + var removed, kept int64 + var ts = h.Timestamp + prefixes := []byte{byte(storage.STNEP11Transfers), byte(storage.STNEP17Transfers)} + + for i := range prefixes { + var acc util.Uint160 + var canDrop bool + + err = bc.store.SeekGC(storage.SeekRange{ + Prefix: prefixes[i : i+1], + Backwards: true, // From new to old. + }, func(k, v []byte) bool { + // We don't look inside of the batches, it requires too much effort, instead + // we drop batches that are confirmed to contain outdated entries. + var batchAcc util.Uint160 + var batchTs = binary.BigEndian.Uint64(k[1+util.Uint160Size:]) + copy(batchAcc[:], k[1:]) + + if batchAcc != acc { // Some new account we're iterating over. + acc = batchAcc + } else if canDrop { // We've seen this account and all entries in this batch are guaranteed to be outdated. + removed++ + return false + } + // We don't know what's inside, so keep the current + // batch anyway, but allow to drop older ones. + canDrop = batchTs <= ts + kept++ + return true + }) + if err != nil { + break + } + } + dur := time.Since(start) + if err != nil { + bc.log.Error("failed to flush transfer data GC changeset", zap.Duration("time", dur), zap.Error(err)) + } else { + bc.log.Info("finished transfer data garbage collection", + zap.Int64("removed", removed), + zap.Int64("kept", kept), + zap.Duration("time", dur)) } return dur } diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 4b9f1da2f..db3d56e7d 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -1588,6 +1588,67 @@ func TestDumpAndRestore(t *testing.T) { }) } +func TestRemoveOldTransfers(t *testing.T) { + // Creating proper number of transfers/blocks takes unneccessary time, so emulate + // some DB with stale entries. + bc := newTestChain(t) + h, err := bc.GetHeader(bc.GetHeaderHash(0)) + require.NoError(t, err) + older := h.Timestamp - 1000 + newer := h.Timestamp + 1000 + acc1 := util.Uint160{1} + acc2 := util.Uint160{2} + acc3 := util.Uint160{3} + ttl := state.TokenTransferLog{Raw: []byte{1}} // It's incorrect, but who cares. + + for i := uint32(0); i < 3; i++ { + require.NoError(t, bc.dao.PutTokenTransferLog(acc1, older, i, false, &ttl)) + } + for i := uint32(0); i < 3; i++ { + require.NoError(t, bc.dao.PutTokenTransferLog(acc2, newer, i, false, &ttl)) + } + for i := uint32(0); i < 2; i++ { + require.NoError(t, bc.dao.PutTokenTransferLog(acc3, older, i, true, &ttl)) + } + for i := uint32(0); i < 2; i++ { + require.NoError(t, bc.dao.PutTokenTransferLog(acc3, newer, i, true, &ttl)) + } + + _, err = bc.dao.Persist() + require.NoError(t, err) + _ = bc.removeOldTransfers(0) + + for i := uint32(0); i < 2; i++ { + log, err := bc.dao.GetTokenTransferLog(acc1, older, i, false) + require.NoError(t, err) + require.Equal(t, 0, len(log.Raw)) + } + + log, err := bc.dao.GetTokenTransferLog(acc1, older, 2, false) + require.NoError(t, err) + require.NotEqual(t, 0, len(log.Raw)) + + for i := uint32(0); i < 3; i++ { + log, err = bc.dao.GetTokenTransferLog(acc2, newer, i, false) + require.NoError(t, err) + require.NotEqual(t, 0, len(log.Raw)) + } + + log, err = bc.dao.GetTokenTransferLog(acc3, older, 0, true) + require.NoError(t, err) + require.Equal(t, 0, len(log.Raw)) + + log, err = bc.dao.GetTokenTransferLog(acc3, older, 1, true) + require.NoError(t, err) + require.NotEqual(t, 0, len(log.Raw)) + + for i := uint32(0); i < 2; i++ { + log, err = bc.dao.GetTokenTransferLog(acc3, newer, i, true) + require.NoError(t, err) + require.NotEqual(t, 0, len(log.Raw)) + } +} + func TestRemoveUntraceable(t *testing.T) { check := func(t *testing.T, bc *Blockchain, tHash, bHash, sHash util.Uint256, errorExpected bool) { _, _, err := bc.GetTransaction(tHash)