From c7f5f173ae232425b82014923521e7c4daff5800 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 12 Dec 2024 18:27:07 +0300 Subject: [PATCH 1/5] core: introduce RemoveUntraceableHeaders With blocks available from NeoFS we can drop them from the local DB. Signed-off-by: Roman Khimov --- docs/node-configuration.md | 1 + pkg/config/ledger_config.go | 3 +++ pkg/core/blockchain.go | 9 +++++--- pkg/core/blockchain_core_test.go | 35 ++++++++++++++++++++------------ pkg/core/dao/dao.go | 12 +++++++---- pkg/core/dao/dao_test.go | 9 ++++++++ 6 files changed, 49 insertions(+), 20 deletions(-) diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 73211a8a6..7d1921910 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -30,6 +30,7 @@ node-related settings described in the table below. | Relay | `bool` | `true` | Determines whether the server is forwarding its inventory. | | Consensus | [Consensus Configuration](#Consensus-Configuration) | | Describes consensus (dBFT) configuration. See the [Consensus Configuration](#Consensus-Configuration) for details. | | RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks should be removed from cache and database. If enabled, then only the last `MaxTraceableBlocks` are stored and accessible to smart contracts. Old MPT data is also deleted in accordance with `GarbageCollectionPeriod` setting. If enabled along with `P2PStateExchangeExtensions` protocol extension, then old blocks and MPT states will be removed up to the second latest state synchronisation point (see `StateSyncInterval`). | +| RemoveUntraceableHeaders | `bool`| `false` | Used only with RemoveUntraceableBlocks and makes node delete untraceable block headers as well. Notice that this is an experimental option, not recommended for production use. | | RPC | [RPC Configuration](#RPC-Configuration) | | Describes [RPC subsystem](rpc.md) configuration. See the [RPC Configuration](#RPC-Configuration) for details. | | SaveStorageBatch | `bool` | `false` | Enables storage batch saving before every persist. It is similar to StorageDump plugin for C# node. | | SkipBlockVerification | `bool` | `false` | Allows to disable verification of received/processed blocks (including cryptographic checks). | diff --git a/pkg/config/ledger_config.go b/pkg/config/ledger_config.go index 529a0d017..ff6d59948 100644 --- a/pkg/config/ledger_config.go +++ b/pkg/config/ledger_config.go @@ -14,6 +14,9 @@ type Ledger struct { KeepOnlyLatestState bool `yaml:"KeepOnlyLatestState"` // RemoveUntraceableBlocks specifies if old data should be removed. RemoveUntraceableBlocks bool `yaml:"RemoveUntraceableBlocks"` + // RemoveUntraceableHeaders is used in addition to RemoveUntraceableBlocks + // when headers need to be removed as well. + RemoveUntraceableHeaders bool `yaml:"RemoveUntraceableHeaders"` // SaveStorageBatch enables storage batch saving before every persist. SaveStorageBatch bool `yaml:"SaveStorageBatch"` // SkipBlockVerification allows to disable verification of received diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 2db55ba82..5061c3c42 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -283,6 +283,9 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl zap.Int("StateSyncInterval", cfg.StateSyncInterval)) } } + if cfg.RemoveUntraceableHeaders && !cfg.RemoveUntraceableBlocks { + return nil, errors.New("RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not") + } if cfg.Hardforks == nil { cfg.Hardforks = map[string]uint32{} for _, hf := range config.StableHardforks { @@ -603,7 +606,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro // After current state is updated, we need to remove outdated state-related data if so. // The only outdated data we might have is genesis-related data, so check it. if p-bc.config.MaxTraceableBlocks > 0 { - err := cache.DeleteBlock(bc.GetHeaderHash(0)) + err := cache.DeleteBlock(bc.GetHeaderHash(0), false) if err != nil { return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err) } @@ -797,7 +800,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) keysCnt = new(int) ) for i := height + 1; i <= currHeight; i++ { - err := upperCache.DeleteBlock(bc.GetHeaderHash(i)) + err := upperCache.DeleteBlock(bc.GetHeaderHash(i), false) if err != nil { return fmt.Errorf("error while removing block %d: %w", i, err) } @@ -1622,7 +1625,7 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error stop = start + 1 } for index := start; index < stop; index++ { - err := kvcache.DeleteBlock(bc.GetHeaderHash(index)) + err := kvcache.DeleteBlock(bc.GetHeaderHash(index), bc.config.Ledger.RemoveUntraceableHeaders) if err != nil { bc.log.Warn("error while removing old block", zap.Uint32("index", index), diff --git a/pkg/core/blockchain_core_test.go b/pkg/core/blockchain_core_test.go index 6293d797f..443c3a5d5 100644 --- a/pkg/core/blockchain_core_test.go +++ b/pkg/core/blockchain_core_test.go @@ -139,6 +139,28 @@ func TestRemoveOldTransfers(t *testing.T) { } } +func checkNewBlockchainErr(t *testing.T, cfg func(c *config.Config), store storage.Store, errText string) { + unitTestNetCfg, err := config.Load("../../config", testchain.Network()) + require.NoError(t, err) + cfg(&unitTestNetCfg) + log := zaptest.NewLogger(t) + _, err = NewBlockchain(store, unitTestNetCfg.Blockchain(), log) + if len(errText) != 0 { + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), errText)) + } else { + require.NoError(t, err) + } +} + +func TestNewBlockchainIncosistencies(t *testing.T) { + t.Run("untraceable blocks/headers", func(t *testing.T) { + checkNewBlockchainErr(t, func(c *config.Config) { + c.ApplicationConfiguration.RemoveUntraceableHeaders = true + }, storage.NewMemoryStore(), "RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not") + }) +} + func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { var ( stateSyncInterval = 4 @@ -186,19 +208,6 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { _, err := batch.Persist() require.NoError(t, err) - checkNewBlockchainErr := func(t *testing.T, cfg func(c *config.Config), store storage.Store, errText string) { - unitTestNetCfg, err := config.Load("../../config", testchain.Network()) - require.NoError(t, err) - cfg(&unitTestNetCfg) - log := zaptest.NewLogger(t) - _, err = NewBlockchain(store, unitTestNetCfg.Blockchain(), log) - if len(errText) != 0 { - require.Error(t, err) - require.True(t, strings.Contains(err.Error(), errText)) - } else { - require.NoError(t, err) - } - } boltCfg := func(c *config.Config) { spountCfg(c) c.ApplicationConfiguration.KeepOnlyLatestState = true diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 579d35548..8cf5826f7 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -766,16 +766,20 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a // DeleteBlock removes the block from dao. It's not atomic, so make sure you're // using private MemCached instance here. -func (dao *Simple) DeleteBlock(h util.Uint256) error { +func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error { key := dao.makeExecutableKey(h) b, err := dao.getBlock(key) if err != nil { return err } - err = dao.storeHeader(key, &b.Header) - if err != nil { - return err + if !dropHeader { + err = dao.storeHeader(key, &b.Header) + if err != nil { + return err + } + } else { + dao.Store.Delete(key) } for _, tx := range b.Transactions { diff --git a/pkg/core/dao/dao_test.go b/pkg/core/dao/dao_test.go index 2652c422c..1d1667c85 100644 --- a/pkg/core/dao/dao_test.go +++ b/pkg/core/dao/dao_test.go @@ -107,6 +107,15 @@ func TestPutGetBlock(t *testing.T) { require.Equal(t, 2, len(gotAppExecResult)) require.Equal(t, *appExecResult1, gotAppExecResult[0]) require.Equal(t, *appExecResult2, gotAppExecResult[1]) + + require.NoError(t, dao.DeleteBlock(hash, false)) + gotBlock, err = dao.GetBlock(hash) // It's just a header, but it's still there. + require.NoError(t, err) + require.NotNil(t, gotBlock) + + require.NoError(t, dao.DeleteBlock(hash, true)) + _, err = dao.GetBlock(hash) + require.Error(t, err) } func TestGetVersion_NoVersion(t *testing.T) { From c53b0645bbbca1646c4e6474b5aca6ba8fa93dc8 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 12 Dec 2024 18:51:52 +0300 Subject: [PATCH 2/5] core: extend NewBlockchain coverage a bit Signed-off-by: Roman Khimov --- pkg/core/blockchain_core_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/core/blockchain_core_test.go b/pkg/core/blockchain_core_test.go index 443c3a5d5..a35ee30f6 100644 --- a/pkg/core/blockchain_core_test.go +++ b/pkg/core/blockchain_core_test.go @@ -159,6 +159,11 @@ func TestNewBlockchainIncosistencies(t *testing.T) { c.ApplicationConfiguration.RemoveUntraceableHeaders = true }, storage.NewMemoryStore(), "RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not") }) + t.Run("state exchange without state root", func(t *testing.T) { + checkNewBlockchainErr(t, func(c *config.Config) { + c.ProtocolConfiguration.P2PStateExchangeExtensions = true + }, storage.NewMemoryStore(), "P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off") + }) } func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { From 7d89a530430fc44ea786abab83d54a6f0ebb8dc0 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 13 Dec 2024 12:37:18 +0300 Subject: [PATCH 3/5] core: drop redundant tgtBlock normalization It's there since 423c7883b81ce14cc37a91d1d3588bcbd1ccb34c, but looks like it never changed anything, the same thing is done six lines above and tgtBlock is not changed since. Signed-off-by: Roman Khimov --- pkg/core/blockchain.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 5061c3c42..fdcd2bbc9 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -1144,8 +1144,6 @@ func (bc *Blockchain) tryRunGC(oldHeight uint32) time.Duration { oldHeight /= bc.config.Ledger.GarbageCollectionPeriod newHeight /= bc.config.Ledger.GarbageCollectionPeriod if tgtBlock > int64(bc.config.Ledger.GarbageCollectionPeriod) && newHeight != oldHeight { - tgtBlock /= int64(bc.config.Ledger.GarbageCollectionPeriod) - tgtBlock *= int64(bc.config.Ledger.GarbageCollectionPeriod) dur = bc.stateRoot.GC(uint32(tgtBlock), bc.store) dur += bc.removeOldTransfers(uint32(tgtBlock)) } From 9599fba24f4f37537e5602eb37c48107ac3da5c9 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 13 Dec 2024 13:39:36 +0300 Subject: [PATCH 4/5] core: fix removing old transfer data with RemoveUntraceableHeaders MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Transfer data is timestamp-based, previously it always had and used headers, no we can go via a small cache (we don't want it to grow or be stored forever). Otherwise it's unable to do the job: 2024-12-13T12:55:15.056+0300 ERROR failed to find block header for transfer GC {"time": "19.066µs", "error": "key not found"} Signed-off-by: Roman Khimov --- pkg/core/blockchain.go | 41 +++++++++++++++++++++++++------- pkg/core/blockchain_core_test.go | 1 + pkg/core/dao/dao.go | 15 ++++++------ pkg/core/dao/dao_test.go | 9 +++++-- 4 files changed, 48 insertions(+), 18 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index fdcd2bbc9..a223f01bd 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -12,6 +12,7 @@ import ( "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru/v2" json "github.com/nspcc-dev/go-ordered-json" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/limits" @@ -61,6 +62,14 @@ const ( // HeaderVerificationGasLimit is the maximum amount of GAS for block header verification. HeaderVerificationGasLimit = 3_00000000 // 3 GAS defaultStateSyncInterval = 40000 + + // defaultBlockTimesCache should be sufficient for tryRunGC() to get in + // sync with storeBlock(). Most of the time they differ by some thousands of + // blocks and GC interval is more like 10K, so this is sufficient for 80K + // deviation and should be sufficient. If it's not, it's not a big issue + // either, the next cycle will still do the job (only transfers need this, + // MPT won't notice at all). + defaultBlockTimesCache = 8 ) // stateChangeStage denotes the stage of state modification process. @@ -156,6 +165,11 @@ type Blockchain struct { // Current persisted block count. persistedHeight uint32 + // Index->Timestamp cache for garbage collector. Headers can be gone + // by the time it runs, so we use a tiny little cache to sync block + // removal (performed in storeBlock()) with transfer/MPT GC (tryRunGC()) + gcBlockTimes *lru.Cache[uint32, uint64] + // Stop synchronization mechanisms. stopCh chan struct{} runToExitCh chan struct{} @@ -324,6 +338,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl contracts: *native.NewContracts(cfg.ProtocolConfiguration), } + bc.gcBlockTimes, _ = lru.New[uint32, uint64](defaultBlockTimesCache) // Never errors for positive size bc.stateRoot = stateroot.NewModule(cfg, bc.VerifyWitness, bc.log, bc.dao.Store) bc.contracts.Designate.StateRootService = bc.stateRoot @@ -606,7 +621,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro // After current state is updated, we need to remove outdated state-related data if so. // The only outdated data we might have is genesis-related data, so check it. if p-bc.config.MaxTraceableBlocks > 0 { - err := cache.DeleteBlock(bc.GetHeaderHash(0), false) + _, err := cache.DeleteBlock(bc.GetHeaderHash(0), false) if err != nil { return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err) } @@ -800,7 +815,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) keysCnt = new(int) ) for i := height + 1; i <= currHeight; i++ { - err := upperCache.DeleteBlock(bc.GetHeaderHash(i), false) + _, err := upperCache.DeleteBlock(bc.GetHeaderHash(i), false) if err != nil { return fmt.Errorf("error while removing block %d: %w", i, err) } @@ -1289,15 +1304,20 @@ func appendTokenTransferInfo(transferData *state.TokenTransferInfo, func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration { bc.log.Info("starting transfer data garbage collection", zap.Uint32("index", index)) - start := time.Now() - h, err := bc.GetHeader(bc.GetHeaderHash(index)) - if err != nil { + var ( + err error + kept int64 + removed int64 + start = time.Now() + ts, ok = bc.gcBlockTimes.Get(index) + ) + + if !ok { dur := time.Since(start) - bc.log.Error("failed to find block header for transfer GC", zap.Duration("time", dur), zap.Error(err)) + bc.log.Error("failed to get block timestamp transfer GC", zap.Duration("time", dur), zap.Uint32("index", index)) return dur } - var removed, kept int64 - var ts = h.Timestamp + prefixes := []byte{byte(storage.STNEP11Transfers), byte(storage.STNEP17Transfers)} for i := range prefixes { @@ -1623,7 +1643,10 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error stop = start + 1 } for index := start; index < stop; index++ { - err := kvcache.DeleteBlock(bc.GetHeaderHash(index), bc.config.Ledger.RemoveUntraceableHeaders) + ts, err := kvcache.DeleteBlock(bc.GetHeaderHash(index), bc.config.Ledger.RemoveUntraceableHeaders) + if bc.config.Ledger.RemoveUntraceableHeaders && index%bc.config.Ledger.GarbageCollectionPeriod == 0 { + _ = bc.gcBlockTimes.Add(index, ts) + } if err != nil { bc.log.Warn("error while removing old block", zap.Uint32("index", index), diff --git a/pkg/core/blockchain_core_test.go b/pkg/core/blockchain_core_test.go index a35ee30f6..7f71a5790 100644 --- a/pkg/core/blockchain_core_test.go +++ b/pkg/core/blockchain_core_test.go @@ -106,6 +106,7 @@ func TestRemoveOldTransfers(t *testing.T) { _, err = bc.dao.Persist() require.NoError(t, err) + _ = bc.gcBlockTimes.Add(0, h.Timestamp) _ = bc.removeOldTransfers(0) for i := range uint32(2) { diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 8cf5826f7..9842c40d9 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -765,18 +765,19 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a } // DeleteBlock removes the block from dao. It's not atomic, so make sure you're -// using private MemCached instance here. -func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error { +// using private MemCached instance here. It returns block timestamp for GC +// convenience. +func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) (uint64, error) { key := dao.makeExecutableKey(h) b, err := dao.getBlock(key) if err != nil { - return err + return 0, err } if !dropHeader { err = dao.storeHeader(key, &b.Header) if err != nil { - return err + return 0, err } } else { dao.Store.Delete(key) @@ -791,7 +792,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error { v, err := dao.Store.Get(key) if err != nil { - return fmt.Errorf("failed to retrieve conflict record stub for %s (height %d, conflict %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), err) + return 0, fmt.Errorf("failed to retrieve conflict record stub for %s (height %d, conflict %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), err) } // It might be a block since we allow transactions to have block hash in the Conflicts attribute. if v[0] != storage.ExecTransaction { @@ -809,7 +810,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error { sKey := append(key, s.Account.BytesBE()...) v, err := dao.Store.Get(sKey) if err != nil { - return fmt.Errorf("failed to retrieve conflict record for %s (height %d, conflict %s, signer %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), address.Uint160ToString(s.Account), err) + return 0, fmt.Errorf("failed to retrieve conflict record for %s (height %d, conflict %s, signer %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), address.Uint160ToString(s.Account), err) } index = binary.LittleEndian.Uint32(v[1:]) if index == b.Index { @@ -819,7 +820,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error { } } - return nil + return b.Timestamp, nil } // PurgeHeader completely removes specified header from dao. It differs from diff --git a/pkg/core/dao/dao_test.go b/pkg/core/dao/dao_test.go index 1d1667c85..64946ac59 100644 --- a/pkg/core/dao/dao_test.go +++ b/pkg/core/dao/dao_test.go @@ -74,6 +74,7 @@ func TestPutGetBlock(t *testing.T) { dao := NewSimple(storage.NewMemoryStore(), false) b := &block.Block{ Header: block.Header{ + Timestamp: 42, Script: transaction.Witness{ VerificationScript: []byte{byte(opcode.PUSH1)}, InvocationScript: []byte{byte(opcode.NOP)}, @@ -108,12 +109,16 @@ func TestPutGetBlock(t *testing.T) { require.Equal(t, *appExecResult1, gotAppExecResult[0]) require.Equal(t, *appExecResult2, gotAppExecResult[1]) - require.NoError(t, dao.DeleteBlock(hash, false)) + ts, err := dao.DeleteBlock(hash, false) + require.NoError(t, err) + require.Equal(t, uint64(42), ts) gotBlock, err = dao.GetBlock(hash) // It's just a header, but it's still there. require.NoError(t, err) require.NotNil(t, gotBlock) - require.NoError(t, dao.DeleteBlock(hash, true)) + ts, err = dao.DeleteBlock(hash, true) + require.NoError(t, err) + require.Equal(t, uint64(42), ts) _, err = dao.GetBlock(hash) require.Error(t, err) } From 90b6a423313c9041a6d1ec5ac550ccfee40481a1 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 13 Dec 2024 16:05:43 +0300 Subject: [PATCH 5/5] network: make GetHeaders best-effort Be a nice neighbor, try to reply with whatever headers we have, don't fail completely because of a single missing header. Signed-off-by: Roman Khimov --- pkg/network/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 6b043544a..d5ad7f2f9 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1120,7 +1120,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error } header, err := s.chain.GetHeader(hash) if err != nil { - break + continue } resp.Hdrs = append(resp.Hdrs, header) }