core: implement basic GC for value-based storage scheme

The key idea here is that even though we can't ensure MPT code won't make the
node active again we can order the changes made to the persistent store in
such a way that it practically doesn't matter. What happens is:
 * after persist if it's time to collect our garbage we do it synchronously
   right in the same thread working the underlying persistent store directly
 * all the other node code doesn't see much of it, it works with bc.dao or
   layers above it
 * if MPT doesn't find some stale deactivated node in the storage it's OK,
   it'll recreate it in bc.dao
 * if MPT finds it and activates it, it's OK too, bc.dao will store it
 * while GC is being performed nothing else changes the persistent store
 * all subsequent bc.dao persists only happen after the GC is completed which
   means that any changes to the (potentially) deleted nodes have a priority,
   it's OK for GC to delete something that'll be recreated with the next
   persist cycle

Otherwise it's a simple scheme with node status/last active height stored in
the value. Preliminary tests show that it works ~18% worse than the simple
KeepOnlyLatest scheme, but this seems to be the best result so far.

Fixes #2095.
This commit is contained in:
Roman Khimov 2022-01-29 11:28:29 +03:00
parent c4ee310e85
commit 423c7883b8
5 changed files with 131 additions and 13 deletions

View file

@ -198,7 +198,9 @@ protocol-related settings described in the table below.
| Section | Type | Default value | Description | Notes |
| --- | --- | --- | --- | --- |
| CommitteeHistory | map[uint32]int | none | Number of committee members after given height, for example `{0: 1, 20: 4}` sets up a chain with one committee member since the genesis and then changes the setting to 4 committee members at the height of 20. `StandbyCommittee` committee setting must have the number of keys equal or exceeding the highest value in this option. Blocks numbers where the change happens must be divisble by the old and by the new values simultaneously. If not set, committee size is derived from the `StandbyCommittee` setting and never changes. |
| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store latest state. If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. |
| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. |
| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store latest state. If true, DB size will be smaller, but older roots won't be accessible. This value should remain th
e same for the same database. |
| Magic | `uint32` | `0` | Magic number which uniquely identifies NEO network. |
| MaxBlockSize | `uint32` | `262144` | Maximum block size in bytes. |
| MaxBlockSystemFee | `int64` | `900000000000` | Maximum overall transactions system fee per block. |
@ -209,7 +211,7 @@ protocol-related settings described in the table below.
| P2PNotaryRequestPayloadPoolSize | `int` | `1000` | Size of the node's P2P Notary request payloads memory pool where P2P Notary requests are stored before main or fallback transaction is completed and added to the chain.<br>This option is valid only if `P2PSigExtensions` are enabled. | Not supported by the C# node, thus may affect heterogeneous networks functionality. |
| P2PSigExtensions | `bool` | `false` | Enables following additional Notary service related logic:<br>• Transaction attributes `NotValidBefore`, `Conflicts` and `NotaryAssisted`<br>• Network payload of the `P2PNotaryRequest` type<br>• Native `Notary` contract<br>• Notary node module | Not supported by the C# node, thus may affect heterogeneous networks functionality. |
| P2PStateExchangeExtensions | `bool` | `false` | Enables following P2P MPT state data exchange logic: <br>`StateSyncInterval` protocol setting <br>• P2P commands `GetMPTDataCMD` and `MPTDataCMD` | Not supported by the C# node, thus may affect heterogeneous networks functionality. |
| RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks should be removed from cache and database. If enabled, then only last `MaxTraceableBlocks` are stored and accessible to smart contracts. |
| RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks should be removed from cache and database. If enabled, then only last `MaxTraceableBlocks` are stored and accessible to smart contracts. Old MPT data is also deleted in accordance with `GarbageCollectionPeriod` setting. |
| ReservedAttributes | `bool` | `false` | Allows to have reserved attributes range for experimental or private purposes. |
| SaveStorageBatch | `bool` | `false` | Enables storage batch saving before every persist. It is similar to StorageDump plugin for C# node. |
| SecondsPerBlock | `int` | `15` | Minimal time that should pass before next block is accepted. |

View file

@ -15,8 +15,13 @@ type (
ProtocolConfiguration struct {
// CommitteeHistory stores committee size change history (height: size).
CommitteeHistory map[uint32]int `yaml:"CommitteeHistory"`
Magic netmode.Magic `yaml:"Magic"`
MemPoolSize int `yaml:"MemPoolSize"`
// GarbageCollectionPeriod sets the number of blocks to wait before
// starting the next MPT garbage collection cycle when RemoveUntraceableBlocks
// option is used.
GarbageCollectionPeriod uint32 `yaml:"GarbageCollectionPeriod"`
Magic netmode.Magic `yaml:"Magic"`
MemPoolSize int `yaml:"MemPoolSize"`
// InitialGASSupply is the amount of GAS generated in the genesis block.
InitialGASSupply fixedn.Fixed8 `yaml:"InitialGASSupply"`
@ -27,7 +32,7 @@ type (
// If true, DB size will be smaller, but older roots won't be accessible.
// This value should remain the same for the same database.
KeepOnlyLatestState bool `yaml:"KeepOnlyLatestState"`
// RemoveUntraceableBlocks specifies if old blocks should be removed.
// RemoveUntraceableBlocks specifies if old data should be removed.
RemoveUntraceableBlocks bool `yaml:"RemoveUntraceableBlocks"`
// MaxBlockSize is the maximum block size in bytes.
MaxBlockSize uint32 `yaml:"MaxBlockSize"`

View file

@ -47,6 +47,7 @@ const (
version = "0.2.2"
defaultInitialGAS = 52000000_00000000
defaultGCPeriod = 10000
defaultMemPoolSize = 50000
defaultP2PNotaryRequestPayloadPoolSize = 1000
defaultMaxBlockSize = 262144
@ -124,6 +125,9 @@ type Blockchain struct {
// are directly from underlying persistent store.
persistent *dao.Simple
// Underlying persistent store.
store storage.Store
// Current index/height of the highest block.
// Read access should always be called by BlockHeight().
// Write access should only happen in storeBlock().
@ -245,6 +249,10 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
}
}
if cfg.RemoveUntraceableBlocks && cfg.GarbageCollectionPeriod == 0 {
cfg.GarbageCollectionPeriod = defaultGCPeriod
log.Info("GarbageCollectionPeriod is not set or wrong, using default value", zap.Uint32("GarbageCollectionPeriod", cfg.GarbageCollectionPeriod))
}
if len(cfg.NativeUpdateHistories) == 0 {
cfg.NativeUpdateHistories = map[string][]uint32{}
log.Info("NativeActivations are not set, using default values")
@ -253,6 +261,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
config: cfg,
dao: dao.NewSimple(s, cfg.StateRootInHeader, cfg.P2PSigExtensions),
persistent: dao.NewSimple(s, cfg.StateRootInHeader, cfg.P2PSigExtensions),
store: s,
stopCh: make(chan struct{}),
runToExitCh: make(chan struct{}),
memPool: mempool.New(cfg.MemPoolSize, 0, false),
@ -647,12 +656,21 @@ func (bc *Blockchain) Run() {
case <-bc.stopCh:
return
case <-persistTimer.C:
var oldPersisted uint32
var gcDur time.Duration
if bc.config.RemoveUntraceableBlocks {
oldPersisted = atomic.LoadUint32(&bc.persistedHeight)
}
dur, err := bc.persist(nextSync)
if err != nil {
bc.log.Warn("failed to persist blockchain", zap.Error(err))
}
if bc.config.RemoveUntraceableBlocks {
gcDur = bc.tryRunGC(oldPersisted)
}
nextSync = dur > persistInterval*2
interval := persistInterval - dur
interval := persistInterval - dur - gcDur
if interval <= 0 {
interval = time.Microsecond // Reset doesn't work with zero value
}
@ -661,6 +679,35 @@ func (bc *Blockchain) Run() {
}
}
func (bc *Blockchain) tryRunGC(old uint32) time.Duration {
var dur time.Duration
new := atomic.LoadUint32(&bc.persistedHeight)
var tgtBlock = int64(new)
tgtBlock -= int64(bc.config.MaxTraceableBlocks)
if bc.config.P2PStateExchangeExtensions {
syncP := new / uint32(bc.config.StateSyncInterval)
syncP--
syncP *= uint32(bc.config.StateSyncInterval)
if tgtBlock > int64(syncP) {
tgtBlock = int64(syncP)
}
}
// Always round to the GCP.
tgtBlock /= int64(bc.config.GarbageCollectionPeriod)
tgtBlock *= int64(bc.config.GarbageCollectionPeriod)
// Count periods.
old /= bc.config.GarbageCollectionPeriod
new /= bc.config.GarbageCollectionPeriod
if tgtBlock > int64(bc.config.GarbageCollectionPeriod) && new != old {
tgtBlock /= int64(bc.config.GarbageCollectionPeriod)
tgtBlock *= int64(bc.config.GarbageCollectionPeriod)
dur = bc.stateRoot.GC(uint32(tgtBlock), bc.store)
}
return dur
}
// notificationDispatcher manages subscription to events and broadcasts new events.
func (bc *Blockchain) notificationDispatcher() {
var (

View file

@ -1589,7 +1589,7 @@ func TestDumpAndRestore(t *testing.T) {
}
func TestRemoveUntraceable(t *testing.T) {
check := func(t *testing.T, bc *Blockchain, tHash, bHash util.Uint256, errorExpected bool) {
check := func(t *testing.T, bc *Blockchain, tHash, bHash, sHash util.Uint256, errorExpected bool) {
_, _, err := bc.GetTransaction(tHash)
if errorExpected {
require.Error(t, err)
@ -1610,10 +1610,20 @@ func TestRemoveUntraceable(t *testing.T) {
}
_, err = bc.GetHeader(bHash)
require.NoError(t, err)
if !sHash.Equals(util.Uint256{}) {
sm := bc.GetStateModule()
_, err = sm.GetState(sHash, []byte{0xfb, 0xff, 0xff, 0xff, 0x0e}) // NEO committee key.
if errorExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}
}
t.Run("P2PStateExchangeExtensions off", func(t *testing.T) {
bc := newTestChainWithCustomCfg(t, func(c *config.Config) {
c.ProtocolConfiguration.MaxTraceableBlocks = 2
c.ProtocolConfiguration.GarbageCollectionPeriod = 2
c.ProtocolConfiguration.RemoveUntraceableBlocks = true
})
@ -1622,6 +1632,8 @@ func TestRemoveUntraceable(t *testing.T) {
b1 := bc.newBlock(tx1)
require.NoError(t, bc.AddBlock(b1))
tx1Height := bc.BlockHeight()
sRoot, err := bc.GetStateModule().GetStateRoot(tx1Height)
require.NoError(t, err)
tx2, err := testchain.NewTransferFromOwner(bc, bc.contracts.NEO.Hash, util.Uint160{}, 1, 0, bc.BlockHeight()+1)
require.NoError(t, err)
@ -1631,13 +1643,21 @@ func TestRemoveUntraceable(t *testing.T) {
require.NoError(t, err)
require.Equal(t, tx1Height, h1)
check(t, bc, tx1.Hash(), b1.Hash(), sRoot.Root, false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
check(t, bc, tx1.Hash(), b1.Hash(), true)
require.NoError(t, bc.AddBlock(bc.newBlock()))
require.NoError(t, bc.AddBlock(bc.newBlock()))
require.NoError(t, bc.AddBlock(bc.newBlock()))
// Don't wait for Run().
_, err = bc.persist(true)
require.NoError(t, err)
bc.tryRunGC(0)
check(t, bc, tx1.Hash(), b1.Hash(), sRoot.Root, true)
})
t.Run("P2PStateExchangeExtensions on", func(t *testing.T) {
bc := newTestChainWithCustomCfg(t, func(c *config.Config) {
c.ProtocolConfiguration.MaxTraceableBlocks = 2
c.ProtocolConfiguration.GarbageCollectionPeriod = 2
c.ProtocolConfiguration.RemoveUntraceableBlocks = true
c.ProtocolConfiguration.P2PStateExchangeExtensions = true
c.ProtocolConfiguration.StateSyncInterval = 2
@ -1649,6 +1669,8 @@ func TestRemoveUntraceable(t *testing.T) {
b1 := bc.newBlock(tx1)
require.NoError(t, bc.AddBlock(b1))
tx1Height := bc.BlockHeight()
sRoot, err := bc.GetStateModule().GetStateRoot(tx1Height)
require.NoError(t, err)
tx2, err := testchain.NewTransferFromOwner(bc, bc.contracts.NEO.Hash, util.Uint160{}, 1, 0, bc.BlockHeight()+1)
require.NoError(t, err)
@ -1664,13 +1686,13 @@ func TestRemoveUntraceable(t *testing.T) {
require.NoError(t, bc.AddBlock(bc.newBlock()))
require.NoError(t, bc.AddBlock(bc.newBlock()))
check(t, bc, tx1.Hash(), b1.Hash(), false)
check(t, bc, tx2.Hash(), b2.Hash(), false)
check(t, bc, tx1.Hash(), b1.Hash(), sRoot.Root, false)
check(t, bc, tx2.Hash(), b2.Hash(), sRoot.Root, false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
check(t, bc, tx1.Hash(), b1.Hash(), true)
check(t, bc, tx2.Hash(), b2.Hash(), false)
check(t, bc, tx1.Hash(), b1.Hash(), util.Uint256{}, true)
check(t, bc, tx2.Hash(), b2.Hash(), util.Uint256{}, false)
_, h2, err := bc.GetTransaction(tx2.Hash())
require.NoError(t, err)
require.Equal(t, tx2Height, h2)

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
@ -57,6 +58,9 @@ func NewModule(cfg config.ProtocolConfiguration, verif VerifierFunc, log *zap.Lo
if cfg.KeepOnlyLatestState {
mode |= mpt.ModeLatest
}
if cfg.RemoveUntraceableBlocks {
mode |= mpt.ModeGC
}
return &Module{
network: cfg.Magic,
srInHead: cfg.StateRootInHeader,
@ -184,6 +188,44 @@ func (s *Module) JumpToState(sr *state.MPTRoot) error {
return nil
}
// GC performs garbage collection.
func (s *Module) GC(index uint32, store storage.Store) time.Duration {
if !s.mode.GC() {
panic("stateroot: GC invoked, but not enabled")
}
var removed int
var stored int64
s.log.Info("starting MPT garbage collection", zap.Uint32("index", index))
start := time.Now()
b := store.Batch()
store.Seek(storage.SeekRange{
Prefix: []byte{byte(storage.DataMPT)},
}, func(k, v []byte) bool {
stored++
if !mpt.IsActiveValue(v) {
h := binary.LittleEndian.Uint32(v[len(v)-4:])
if h > index {
return true
}
b.Delete(k)
removed++
stored--
}
return true
})
err := store.PutBatch(b)
dur := time.Since(start)
if err != nil {
s.log.Error("failed to flush MPT GC changeset", zap.Duration("time", dur), zap.Error(err))
} else {
s.log.Info("finished MPT garbage collection",
zap.Int("removed", removed),
zap.Int64("stored", stored),
zap.Duration("time", dur))
}
return dur
}
// AddMPTBatch updates using provided batch.
func (s *Module) AddMPTBatch(index uint32, b mpt.Batch, cache *storage.MemCachedStore) (*mpt.Trie, *state.MPTRoot, error) {
mpt := *s.mpt