From 9599fba24f4f37537e5602eb37c48107ac3da5c9 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 13 Dec 2024 13:39:36 +0300 Subject: [PATCH] core: fix removing old transfer data with RemoveUntraceableHeaders MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Transfer data is timestamp-based, previously it always had and used headers, no we can go via a small cache (we don't want it to grow or be stored forever). Otherwise it's unable to do the job: 2024-12-13T12:55:15.056+0300 ERROR failed to find block header for transfer GC {"time": "19.066µs", "error": "key not found"} Signed-off-by: Roman Khimov --- pkg/core/blockchain.go | 41 +++++++++++++++++++++++++------- pkg/core/blockchain_core_test.go | 1 + pkg/core/dao/dao.go | 15 ++++++------ pkg/core/dao/dao_test.go | 9 +++++-- 4 files changed, 48 insertions(+), 18 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index fdcd2bbc9..a223f01bd 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -12,6 +12,7 @@ import ( "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru/v2" json "github.com/nspcc-dev/go-ordered-json" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/limits" @@ -61,6 +62,14 @@ const ( // HeaderVerificationGasLimit is the maximum amount of GAS for block header verification. HeaderVerificationGasLimit = 3_00000000 // 3 GAS defaultStateSyncInterval = 40000 + + // defaultBlockTimesCache should be sufficient for tryRunGC() to get in + // sync with storeBlock(). Most of the time they differ by some thousands of + // blocks and GC interval is more like 10K, so this is sufficient for 80K + // deviation and should be sufficient. If it's not, it's not a big issue + // either, the next cycle will still do the job (only transfers need this, + // MPT won't notice at all). + defaultBlockTimesCache = 8 ) // stateChangeStage denotes the stage of state modification process. @@ -156,6 +165,11 @@ type Blockchain struct { // Current persisted block count. persistedHeight uint32 + // Index->Timestamp cache for garbage collector. Headers can be gone + // by the time it runs, so we use a tiny little cache to sync block + // removal (performed in storeBlock()) with transfer/MPT GC (tryRunGC()) + gcBlockTimes *lru.Cache[uint32, uint64] + // Stop synchronization mechanisms. stopCh chan struct{} runToExitCh chan struct{} @@ -324,6 +338,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl contracts: *native.NewContracts(cfg.ProtocolConfiguration), } + bc.gcBlockTimes, _ = lru.New[uint32, uint64](defaultBlockTimesCache) // Never errors for positive size bc.stateRoot = stateroot.NewModule(cfg, bc.VerifyWitness, bc.log, bc.dao.Store) bc.contracts.Designate.StateRootService = bc.stateRoot @@ -606,7 +621,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro // After current state is updated, we need to remove outdated state-related data if so. // The only outdated data we might have is genesis-related data, so check it. if p-bc.config.MaxTraceableBlocks > 0 { - err := cache.DeleteBlock(bc.GetHeaderHash(0), false) + _, err := cache.DeleteBlock(bc.GetHeaderHash(0), false) if err != nil { return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err) } @@ -800,7 +815,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) keysCnt = new(int) ) for i := height + 1; i <= currHeight; i++ { - err := upperCache.DeleteBlock(bc.GetHeaderHash(i), false) + _, err := upperCache.DeleteBlock(bc.GetHeaderHash(i), false) if err != nil { return fmt.Errorf("error while removing block %d: %w", i, err) } @@ -1289,15 +1304,20 @@ func appendTokenTransferInfo(transferData *state.TokenTransferInfo, func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration { bc.log.Info("starting transfer data garbage collection", zap.Uint32("index", index)) - start := time.Now() - h, err := bc.GetHeader(bc.GetHeaderHash(index)) - if err != nil { + var ( + err error + kept int64 + removed int64 + start = time.Now() + ts, ok = bc.gcBlockTimes.Get(index) + ) + + if !ok { dur := time.Since(start) - bc.log.Error("failed to find block header for transfer GC", zap.Duration("time", dur), zap.Error(err)) + bc.log.Error("failed to get block timestamp transfer GC", zap.Duration("time", dur), zap.Uint32("index", index)) return dur } - var removed, kept int64 - var ts = h.Timestamp + prefixes := []byte{byte(storage.STNEP11Transfers), byte(storage.STNEP17Transfers)} for i := range prefixes { @@ -1623,7 +1643,10 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error stop = start + 1 } for index := start; index < stop; index++ { - err := kvcache.DeleteBlock(bc.GetHeaderHash(index), bc.config.Ledger.RemoveUntraceableHeaders) + ts, err := kvcache.DeleteBlock(bc.GetHeaderHash(index), bc.config.Ledger.RemoveUntraceableHeaders) + if bc.config.Ledger.RemoveUntraceableHeaders && index%bc.config.Ledger.GarbageCollectionPeriod == 0 { + _ = bc.gcBlockTimes.Add(index, ts) + } if err != nil { bc.log.Warn("error while removing old block", zap.Uint32("index", index), diff --git a/pkg/core/blockchain_core_test.go b/pkg/core/blockchain_core_test.go index a35ee30f6..7f71a5790 100644 --- a/pkg/core/blockchain_core_test.go +++ b/pkg/core/blockchain_core_test.go @@ -106,6 +106,7 @@ func TestRemoveOldTransfers(t *testing.T) { _, err = bc.dao.Persist() require.NoError(t, err) + _ = bc.gcBlockTimes.Add(0, h.Timestamp) _ = bc.removeOldTransfers(0) for i := range uint32(2) { diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 8cf5826f7..9842c40d9 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -765,18 +765,19 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a } // DeleteBlock removes the block from dao. It's not atomic, so make sure you're -// using private MemCached instance here. -func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error { +// using private MemCached instance here. It returns block timestamp for GC +// convenience. +func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) (uint64, error) { key := dao.makeExecutableKey(h) b, err := dao.getBlock(key) if err != nil { - return err + return 0, err } if !dropHeader { err = dao.storeHeader(key, &b.Header) if err != nil { - return err + return 0, err } } else { dao.Store.Delete(key) @@ -791,7 +792,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error { v, err := dao.Store.Get(key) if err != nil { - return fmt.Errorf("failed to retrieve conflict record stub for %s (height %d, conflict %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), err) + return 0, fmt.Errorf("failed to retrieve conflict record stub for %s (height %d, conflict %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), err) } // It might be a block since we allow transactions to have block hash in the Conflicts attribute. if v[0] != storage.ExecTransaction { @@ -809,7 +810,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error { sKey := append(key, s.Account.BytesBE()...) v, err := dao.Store.Get(sKey) if err != nil { - return fmt.Errorf("failed to retrieve conflict record for %s (height %d, conflict %s, signer %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), address.Uint160ToString(s.Account), err) + return 0, fmt.Errorf("failed to retrieve conflict record for %s (height %d, conflict %s, signer %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), address.Uint160ToString(s.Account), err) } index = binary.LittleEndian.Uint32(v[1:]) if index == b.Index { @@ -819,7 +820,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error { } } - return nil + return b.Timestamp, nil } // PurgeHeader completely removes specified header from dao. It differs from diff --git a/pkg/core/dao/dao_test.go b/pkg/core/dao/dao_test.go index 1d1667c85..64946ac59 100644 --- a/pkg/core/dao/dao_test.go +++ b/pkg/core/dao/dao_test.go @@ -74,6 +74,7 @@ func TestPutGetBlock(t *testing.T) { dao := NewSimple(storage.NewMemoryStore(), false) b := &block.Block{ Header: block.Header{ + Timestamp: 42, Script: transaction.Witness{ VerificationScript: []byte{byte(opcode.PUSH1)}, InvocationScript: []byte{byte(opcode.NOP)}, @@ -108,12 +109,16 @@ func TestPutGetBlock(t *testing.T) { require.Equal(t, *appExecResult1, gotAppExecResult[0]) require.Equal(t, *appExecResult2, gotAppExecResult[1]) - require.NoError(t, dao.DeleteBlock(hash, false)) + ts, err := dao.DeleteBlock(hash, false) + require.NoError(t, err) + require.Equal(t, uint64(42), ts) gotBlock, err = dao.GetBlock(hash) // It's just a header, but it's still there. require.NoError(t, err) require.NotNil(t, gotBlock) - require.NoError(t, dao.DeleteBlock(hash, true)) + ts, err = dao.DeleteBlock(hash, true) + require.NoError(t, err) + require.Equal(t, uint64(42), ts) _, err = dao.GetBlock(hash) require.Error(t, err) }