Merge pull request #2354 from nspcc-dev/mpt-value-based-gc

Value-based MPT GC
This commit is contained in:
Roman Khimov 2022-02-11 16:25:08 +03:00 committed by GitHub
commit 6380647770
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 415 additions and 253 deletions

View file

@ -198,7 +198,9 @@ protocol-related settings described in the table below.
| Section | Type | Default value | Description | Notes |
| --- | --- | --- | --- | --- |
| CommitteeHistory | map[uint32]int | none | Number of committee members after given height, for example `{0: 1, 20: 4}` sets up a chain with one committee member since the genesis and then changes the setting to 4 committee members at the height of 20. `StandbyCommittee` committee setting must have the number of keys equal or exceeding the highest value in this option. Blocks numbers where the change happens must be divisble by the old and by the new values simultaneously. If not set, committee size is derived from the `StandbyCommittee` setting and never changes. |
| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store latest state. If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. |
| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. |
| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store latest state. If true, DB size will be smaller, but older roots won't be accessible. This value should remain th
e same for the same database. | Conflicts with `P2PStateExchangeExtensions`. |
| Magic | `uint32` | `0` | Magic number which uniquely identifies NEO network. |
| MaxBlockSize | `uint32` | `262144` | Maximum block size in bytes. |
| MaxBlockSystemFee | `int64` | `900000000000` | Maximum overall transactions system fee per block. |
@ -208,8 +210,8 @@ protocol-related settings described in the table below.
| NativeActivations | `map[string][]uint32` | ContractManagement: [0]<br>StdLib: [0]<br>CryptoLib: [0]<br>LedgerContract: [0]<br>NeoToken: [0]<br>GasToken: [0]<br>PolicyContract: [0]<br>RoleManagement: [0]<br>OracleContract: [0] | The list of histories of native contracts updates. Each list item shod be presented as a known native contract name with the corresponding list of chain's heights. The contract is not active until chain reaches the first height value specified in the list. | `Notary` is supported. |
| P2PNotaryRequestPayloadPoolSize | `int` | `1000` | Size of the node's P2P Notary request payloads memory pool where P2P Notary requests are stored before main or fallback transaction is completed and added to the chain.<br>This option is valid only if `P2PSigExtensions` are enabled. | Not supported by the C# node, thus may affect heterogeneous networks functionality. |
| P2PSigExtensions | `bool` | `false` | Enables following additional Notary service related logic:<br>• Transaction attributes `NotValidBefore`, `Conflicts` and `NotaryAssisted`<br>• Network payload of the `P2PNotaryRequest` type<br>• Native `Notary` contract<br>• Notary node module | Not supported by the C# node, thus may affect heterogeneous networks functionality. |
| P2PStateExchangeExtensions | `bool` | `false` | Enables following P2P MPT state data exchange logic: <br>`StateSyncInterval` protocol setting <br>• P2P commands `GetMPTDataCMD` and `MPTDataCMD` | Not supported by the C# node, thus may affect heterogeneous networks functionality. |
| RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks should be removed from cache and database. If enabled, then only last `MaxTraceableBlocks` are stored and accessible to smart contracts. |
| P2PStateExchangeExtensions | `bool` | `false` | Enables following P2P MPT state data exchange logic: <br>`StateSyncInterval` protocol setting <br>• P2P commands `GetMPTDataCMD` and `MPTDataCMD` | Not supported by the C# node, thus may affect heterogeneous networks functionality. Conflicts with `KeepOnlyLatestState`. |
| RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks should be removed from cache and database. If enabled, then only last `MaxTraceableBlocks` are stored and accessible to smart contracts. Old MPT data is also deleted in accordance with `GarbageCollectionPeriod` setting. |
| ReservedAttributes | `bool` | `false` | Allows to have reserved attributes range for experimental or private purposes. |
| SaveStorageBatch | `bool` | `false` | Enables storage batch saving before every persist. It is similar to StorageDump plugin for C# node. |
| SecondsPerBlock | `int` | `15` | Minimal time that should pass before next block is accepted. |

View file

@ -15,6 +15,11 @@ type (
ProtocolConfiguration struct {
// CommitteeHistory stores committee size change history (height: size).
CommitteeHistory map[uint32]int `yaml:"CommitteeHistory"`
// GarbageCollectionPeriod sets the number of blocks to wait before
// starting the next MPT garbage collection cycle when RemoveUntraceableBlocks
// option is used.
GarbageCollectionPeriod uint32 `yaml:"GarbageCollectionPeriod"`
Magic netmode.Magic `yaml:"Magic"`
MemPoolSize int `yaml:"MemPoolSize"`
@ -27,7 +32,7 @@ type (
// If true, DB size will be smaller, but older roots won't be accessible.
// This value should remain the same for the same database.
KeepOnlyLatestState bool `yaml:"KeepOnlyLatestState"`
// RemoveUntraceableBlocks specifies if old blocks should be removed.
// RemoveUntraceableBlocks specifies if old data should be removed.
RemoveUntraceableBlocks bool `yaml:"RemoveUntraceableBlocks"`
// MaxBlockSize is the maximum block size in bytes.
MaxBlockSize uint32 `yaml:"MaxBlockSize"`
@ -81,6 +86,9 @@ type heightNumber struct {
func (p *ProtocolConfiguration) Validate() error {
var err error
if p.KeepOnlyLatestState && p.P2PStateExchangeExtensions {
return errors.New("can't have both KeepOnlyLatestState and P2PStateExchangeExtensions")
}
for name := range p.NativeUpdateHistories {
if !nativenames.IsValid(name) {
return fmt.Errorf("NativeActivations configuration section contains unexpected native contract name: %s", name)

View file

@ -8,6 +8,15 @@ import (
func TestProtocolConfigurationValidation(t *testing.T) {
p := &ProtocolConfiguration{
StandbyCommittee: []string{
"02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2",
},
ValidatorsCount: 1,
KeepOnlyLatestState: true,
P2PStateExchangeExtensions: true,
}
require.Error(t, p.Validate())
p = &ProtocolConfiguration{
ValidatorsCount: 1,
}
require.Error(t, p.Validate())

View file

@ -44,9 +44,10 @@ import (
// Tuning parameters.
const (
headerBatchCount = 2000
version = "0.2.1"
version = "0.2.2"
defaultInitialGAS = 52000000_00000000
defaultGCPeriod = 10000
defaultMemPoolSize = 50000
defaultP2PNotaryRequestPayloadPoolSize = 1000
defaultMaxBlockSize = 262144
@ -124,6 +125,9 @@ type Blockchain struct {
// are directly from underlying persistent store.
persistent *dao.Simple
// Underlying persistent store.
store storage.Store
// Current index/height of the highest block.
// Read access should always be called by BlockHeight().
// Write access should only happen in storeBlock().
@ -245,6 +249,10 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
}
}
if cfg.RemoveUntraceableBlocks && cfg.GarbageCollectionPeriod == 0 {
cfg.GarbageCollectionPeriod = defaultGCPeriod
log.Info("GarbageCollectionPeriod is not set or wrong, using default value", zap.Uint32("GarbageCollectionPeriod", cfg.GarbageCollectionPeriod))
}
if len(cfg.NativeUpdateHistories) == 0 {
cfg.NativeUpdateHistories = map[string][]uint32{}
log.Info("NativeActivations are not set, using default values")
@ -253,6 +261,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
config: cfg,
dao: dao.NewSimple(s, cfg.StateRootInHeader, cfg.P2PSigExtensions),
persistent: dao.NewSimple(s, cfg.StateRootInHeader, cfg.P2PSigExtensions),
store: s,
stopCh: make(chan struct{}),
runToExitCh: make(chan struct{}),
memPool: mempool.New(cfg.MemPoolSize, 0, false),
@ -320,7 +329,7 @@ func (bc *Blockchain) init() error {
if err != nil {
return err
}
if err := bc.stateRoot.Init(0, bc.config.KeepOnlyLatestState); err != nil {
if err := bc.stateRoot.Init(0); err != nil {
return fmt.Errorf("can't init MPT: %w", err)
}
return bc.storeBlock(genesisBlock, nil)
@ -426,7 +435,7 @@ func (bc *Blockchain) init() error {
}
bc.blockHeight = bHeight
bc.persistedHeight = bHeight
if err = bc.stateRoot.Init(bHeight, bc.config.KeepOnlyLatestState); err != nil {
if err = bc.stateRoot.Init(bHeight); err != nil {
return fmt.Errorf("can't init MPT at height %d: %w", bHeight, err)
}
@ -599,7 +608,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
if err = bc.stateRoot.JumpToState(&state.MPTRoot{
Index: p,
Root: block.PrevStateRoot,
}, bc.config.KeepOnlyLatestState); err != nil {
}); err != nil {
return fmt.Errorf("can't perform MPT jump to height %d: %w", p, err)
}
@ -647,12 +656,21 @@ func (bc *Blockchain) Run() {
case <-bc.stopCh:
return
case <-persistTimer.C:
var oldPersisted uint32
var gcDur time.Duration
if bc.config.RemoveUntraceableBlocks {
oldPersisted = atomic.LoadUint32(&bc.persistedHeight)
}
dur, err := bc.persist(nextSync)
if err != nil {
bc.log.Warn("failed to persist blockchain", zap.Error(err))
}
if bc.config.RemoveUntraceableBlocks {
gcDur = bc.tryRunGC(oldPersisted)
}
nextSync = dur > persistInterval*2
interval := persistInterval - dur
interval := persistInterval - dur - gcDur
if interval <= 0 {
interval = time.Microsecond // Reset doesn't work with zero value
}
@ -661,6 +679,35 @@ func (bc *Blockchain) Run() {
}
}
func (bc *Blockchain) tryRunGC(old uint32) time.Duration {
var dur time.Duration
new := atomic.LoadUint32(&bc.persistedHeight)
var tgtBlock = int64(new)
tgtBlock -= int64(bc.config.MaxTraceableBlocks)
if bc.config.P2PStateExchangeExtensions {
syncP := new / uint32(bc.config.StateSyncInterval)
syncP--
syncP *= uint32(bc.config.StateSyncInterval)
if tgtBlock > int64(syncP) {
tgtBlock = int64(syncP)
}
}
// Always round to the GCP.
tgtBlock /= int64(bc.config.GarbageCollectionPeriod)
tgtBlock *= int64(bc.config.GarbageCollectionPeriod)
// Count periods.
old /= bc.config.GarbageCollectionPeriod
new /= bc.config.GarbageCollectionPeriod
if tgtBlock > int64(bc.config.GarbageCollectionPeriod) && new != old {
tgtBlock /= int64(bc.config.GarbageCollectionPeriod)
tgtBlock *= int64(bc.config.GarbageCollectionPeriod)
dur = bc.stateRoot.GC(uint32(tgtBlock), bc.store)
}
return dur
}
// notificationDispatcher manages subscription to events and broadcasts new events.
func (bc *Blockchain) notificationDispatcher() {
var (

View file

@ -1589,7 +1589,7 @@ func TestDumpAndRestore(t *testing.T) {
}
func TestRemoveUntraceable(t *testing.T) {
check := func(t *testing.T, bc *Blockchain, tHash, bHash util.Uint256, errorExpected bool) {
check := func(t *testing.T, bc *Blockchain, tHash, bHash, sHash util.Uint256, errorExpected bool) {
_, _, err := bc.GetTransaction(tHash)
if errorExpected {
require.Error(t, err)
@ -1610,10 +1610,20 @@ func TestRemoveUntraceable(t *testing.T) {
}
_, err = bc.GetHeader(bHash)
require.NoError(t, err)
if !sHash.Equals(util.Uint256{}) {
sm := bc.GetStateModule()
_, err = sm.GetState(sHash, []byte{0xfb, 0xff, 0xff, 0xff, 0x0e}) // NEO committee key.
if errorExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}
}
t.Run("P2PStateExchangeExtensions off", func(t *testing.T) {
bc := newTestChainWithCustomCfg(t, func(c *config.Config) {
c.ProtocolConfiguration.MaxTraceableBlocks = 2
c.ProtocolConfiguration.GarbageCollectionPeriod = 2
c.ProtocolConfiguration.RemoveUntraceableBlocks = true
})
@ -1622,6 +1632,8 @@ func TestRemoveUntraceable(t *testing.T) {
b1 := bc.newBlock(tx1)
require.NoError(t, bc.AddBlock(b1))
tx1Height := bc.BlockHeight()
sRoot, err := bc.GetStateModule().GetStateRoot(tx1Height)
require.NoError(t, err)
tx2, err := testchain.NewTransferFromOwner(bc, bc.contracts.NEO.Hash, util.Uint160{}, 1, 0, bc.BlockHeight()+1)
require.NoError(t, err)
@ -1631,13 +1643,21 @@ func TestRemoveUntraceable(t *testing.T) {
require.NoError(t, err)
require.Equal(t, tx1Height, h1)
check(t, bc, tx1.Hash(), b1.Hash(), sRoot.Root, false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
check(t, bc, tx1.Hash(), b1.Hash(), true)
require.NoError(t, bc.AddBlock(bc.newBlock()))
require.NoError(t, bc.AddBlock(bc.newBlock()))
require.NoError(t, bc.AddBlock(bc.newBlock()))
// Don't wait for Run().
_, err = bc.persist(true)
require.NoError(t, err)
bc.tryRunGC(0)
check(t, bc, tx1.Hash(), b1.Hash(), sRoot.Root, true)
})
t.Run("P2PStateExchangeExtensions on", func(t *testing.T) {
bc := newTestChainWithCustomCfg(t, func(c *config.Config) {
c.ProtocolConfiguration.MaxTraceableBlocks = 2
c.ProtocolConfiguration.GarbageCollectionPeriod = 2
c.ProtocolConfiguration.RemoveUntraceableBlocks = true
c.ProtocolConfiguration.P2PStateExchangeExtensions = true
c.ProtocolConfiguration.StateSyncInterval = 2
@ -1649,6 +1669,8 @@ func TestRemoveUntraceable(t *testing.T) {
b1 := bc.newBlock(tx1)
require.NoError(t, bc.AddBlock(b1))
tx1Height := bc.BlockHeight()
sRoot, err := bc.GetStateModule().GetStateRoot(tx1Height)
require.NoError(t, err)
tx2, err := testchain.NewTransferFromOwner(bc, bc.contracts.NEO.Hash, util.Uint160{}, 1, 0, bc.BlockHeight()+1)
require.NoError(t, err)
@ -1664,13 +1686,13 @@ func TestRemoveUntraceable(t *testing.T) {
require.NoError(t, bc.AddBlock(bc.newBlock()))
require.NoError(t, bc.AddBlock(bc.newBlock()))
check(t, bc, tx1.Hash(), b1.Hash(), false)
check(t, bc, tx2.Hash(), b2.Hash(), false)
check(t, bc, tx1.Hash(), b1.Hash(), sRoot.Root, false)
check(t, bc, tx2.Hash(), b2.Hash(), sRoot.Root, false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
check(t, bc, tx1.Hash(), b1.Hash(), true)
check(t, bc, tx2.Hash(), b2.Hash(), false)
check(t, bc, tx1.Hash(), b1.Hash(), util.Uint256{}, true)
check(t, bc, tx2.Hash(), b2.Hash(), util.Uint256{}, false)
_, h2, err := bc.GetTransaction(tx2.Hash())
require.NoError(t, err)
require.Equal(t, tx2Height, h2)

View file

@ -56,8 +56,8 @@ func testIncompletePut(t *testing.T, ps pairs, n int, tr1, tr2 *Trie) {
require.Equal(t, tr1.StateRoot(), tr2.StateRoot())
t.Run("test restore", func(t *testing.T) {
tr2.Flush()
tr3 := NewTrie(NewHashNode(tr2.StateRoot()), false, storage.NewMemCachedStore(tr2.Store))
tr2.Flush(0)
tr3 := NewTrie(NewHashNode(tr2.StateRoot()), ModeAll, storage.NewMemCachedStore(tr2.Store))
for _, p := range ps[:n] {
val, err := tr3.Get(p[0])
if p[1] == nil {
@ -76,8 +76,8 @@ func testPut(t *testing.T, ps pairs, tr1, tr2 *Trie) {
func TestTrie_PutBatchLeaf(t *testing.T) {
prepareLeaf := func(t *testing.T) (*Trie, *Trie) {
tr1 := NewTrie(EmptyNode{}, false, newTestStore())
tr2 := NewTrie(EmptyNode{}, false, newTestStore())
tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
require.NoError(t, tr1.Put([]byte{0}, []byte("value")))
require.NoError(t, tr2.Put([]byte{0}, []byte("value")))
return tr1, tr2
@ -118,8 +118,8 @@ func TestTrie_PutBatchLeaf(t *testing.T) {
func TestTrie_PutBatchExtension(t *testing.T) {
prepareExtension := func(t *testing.T) (*Trie, *Trie) {
tr1 := NewTrie(EmptyNode{}, false, newTestStore())
tr2 := NewTrie(EmptyNode{}, false, newTestStore())
tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
require.NoError(t, tr1.Put([]byte{1, 2}, []byte("value1")))
require.NoError(t, tr2.Put([]byte{1, 2}, []byte("value1")))
return tr1, tr2
@ -170,8 +170,8 @@ func TestTrie_PutBatchExtension(t *testing.T) {
func TestTrie_PutBatchBranch(t *testing.T) {
prepareBranch := func(t *testing.T) (*Trie, *Trie) {
tr1 := NewTrie(EmptyNode{}, false, newTestStore())
tr2 := NewTrie(EmptyNode{}, false, newTestStore())
tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
require.NoError(t, tr1.Put([]byte{0x00, 2}, []byte("value1")))
require.NoError(t, tr2.Put([]byte{0x00, 2}, []byte("value1")))
require.NoError(t, tr1.Put([]byte{0x10, 3}, []byte("value2")))
@ -191,9 +191,9 @@ func TestTrie_PutBatchBranch(t *testing.T) {
t.Run("non-empty child is hash node", func(t *testing.T) {
tr1, tr2 := prepareBranch(t)
tr1.Flush()
tr1.Flush(0)
tr1.Collapse(1)
tr2.Flush()
tr2.Flush(0)
tr2.Collapse(1)
var ps = pairs{{[]byte{0x00, 2}, nil}}
@ -201,16 +201,16 @@ func TestTrie_PutBatchBranch(t *testing.T) {
require.IsType(t, (*ExtensionNode)(nil), tr1.root)
})
t.Run("non-empty child is last node", func(t *testing.T) {
tr1 := NewTrie(EmptyNode{}, false, newTestStore())
tr2 := NewTrie(EmptyNode{}, false, newTestStore())
tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
require.NoError(t, tr1.Put([]byte{0x00, 2}, []byte("value1")))
require.NoError(t, tr2.Put([]byte{0x00, 2}, []byte("value1")))
require.NoError(t, tr1.Put([]byte{0x00}, []byte("value2")))
require.NoError(t, tr2.Put([]byte{0x00}, []byte("value2")))
tr1.Flush()
tr1.Flush(0)
tr1.Collapse(1)
tr2.Flush()
tr2.Flush(0)
tr2.Collapse(1)
var ps = pairs{{[]byte{0x00, 2}, nil}}
@ -248,14 +248,14 @@ func TestTrie_PutBatchBranch(t *testing.T) {
func TestTrie_PutBatchHash(t *testing.T) {
prepareHash := func(t *testing.T) (*Trie, *Trie) {
tr1 := NewTrie(EmptyNode{}, false, newTestStore())
tr2 := NewTrie(EmptyNode{}, false, newTestStore())
tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
require.NoError(t, tr1.Put([]byte{0x10}, []byte("value1")))
require.NoError(t, tr2.Put([]byte{0x10}, []byte("value1")))
require.NoError(t, tr1.Put([]byte{0x20}, []byte("value2")))
require.NoError(t, tr2.Put([]byte{0x20}, []byte("value2")))
tr1.Flush()
tr2.Flush()
tr1.Flush(0)
tr2.Flush(0)
return tr1, tr2
}
@ -274,7 +274,7 @@ func TestTrie_PutBatchHash(t *testing.T) {
}
tr1.Collapse(1)
tr2.Collapse(1)
key := makeStorageKey(tr1.root.(*BranchNode).Children[2].Hash().BytesBE())
key := makeStorageKey(tr1.root.(*BranchNode).Children[2].Hash())
require.NoError(t, tr1.Store.Delete(key))
require.NoError(t, tr2.Store.Delete(key))
testIncompletePut(t, ps, 1, tr1, tr2)
@ -283,8 +283,8 @@ func TestTrie_PutBatchHash(t *testing.T) {
func TestTrie_PutBatchEmpty(t *testing.T) {
t.Run("good", func(t *testing.T) {
tr1 := NewTrie(EmptyNode{}, false, newTestStore())
tr2 := NewTrie(EmptyNode{}, false, newTestStore())
tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
var ps = pairs{
{[]byte{0}, []byte("value0")},
{[]byte{1}, []byte("value1")},
@ -299,15 +299,15 @@ func TestTrie_PutBatchEmpty(t *testing.T) {
{[]byte{2}, nil},
{[]byte{3}, []byte("replace3")},
}
tr1 := NewTrie(EmptyNode{}, false, newTestStore())
tr2 := NewTrie(EmptyNode{}, false, newTestStore())
tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
testIncompletePut(t, ps, 4, tr1, tr2)
})
}
// For the sake of coverage.
func TestTrie_InvalidNodeType(t *testing.T) {
tr := NewTrie(EmptyNode{}, false, newTestStore())
tr := NewTrie(EmptyNode{}, ModeAll, newTestStore())
var b Batch
b.Add([]byte{1}, []byte("value"))
tr.root = Node(nil)
@ -315,8 +315,8 @@ func TestTrie_InvalidNodeType(t *testing.T) {
}
func TestTrie_PutBatch(t *testing.T) {
tr1 := NewTrie(EmptyNode{}, false, newTestStore())
tr2 := NewTrie(EmptyNode{}, false, newTestStore())
tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore())
var ps = pairs{
{[]byte{1}, []byte{1}},
{[]byte{2}, []byte{3}},

View file

@ -32,19 +32,19 @@ type Billet struct {
Store *storage.MemCachedStore
root Node
refcountEnabled bool
mode TrieMode
}
// NewBillet returns new billet for MPT trie restoring. It accepts a MemCachedStore
// to decouple storage errors from logic errors so that all storage errors are
// processed during `store.Persist()` at the caller. This also has the benefit,
// that every `Put` can be considered an atomic operation.
func NewBillet(rootHash util.Uint256, enableRefCount bool, prefix storage.KeyPrefix, store *storage.MemCachedStore) *Billet {
func NewBillet(rootHash util.Uint256, mode TrieMode, prefix storage.KeyPrefix, store *storage.MemCachedStore) *Billet {
return &Billet{
TempStoragePrefix: prefix,
Store: store,
root: NewHashNode(rootHash),
refcountEnabled: enableRefCount,
mode: mode,
}
}
@ -177,8 +177,8 @@ func (b *Billet) putIntoHash(curr *HashNode, path []byte, val Node) (Node, error
}
func (b *Billet) incrementRefAndStore(h util.Uint256, bs []byte) {
key := makeStorageKey(h.BytesBE())
if b.refcountEnabled {
key := makeStorageKey(h)
if b.mode.RC() {
var (
err error
data []byte
@ -191,7 +191,7 @@ func (b *Billet) incrementRefAndStore(h util.Uint256, bs []byte) {
}
cnt++
if len(data) == 0 {
data = append(bs, 0, 0, 0, 0)
data = append(bs, 1, 0, 0, 0, 0)
}
binary.LittleEndian.PutUint32(data[len(data)-4:], uint32(cnt))
_ = b.Store.Put(key, data)
@ -325,7 +325,7 @@ func (b *Billet) tryCollapseBranch(curr *BranchNode) Node {
// GetFromStore returns MPT node from the storage.
func (b *Billet) GetFromStore(h util.Uint256) (Node, error) {
data, err := b.Store.Get(makeStorageKey(h.BytesBE()))
data, err := b.Store.Get(makeStorageKey(h))
if err != nil {
return nil, err
}
@ -337,8 +337,8 @@ func (b *Billet) GetFromStore(h util.Uint256) (Node, error) {
return nil, r.Err
}
if b.refcountEnabled {
data = data[:len(data)-4]
if b.mode.RC() {
data = data[:len(data)-5]
}
n.Node.(flushedNode).setCache(data, h)
return n.Node, nil

View file

@ -16,7 +16,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
_ = expectedRoot.Hash()
_ = tr.root.Hash()
require.Equal(t, expectedRoot, tr.root)
expectedBytes, err := tr.Store.Get(makeStorageKey(expectedNode.Hash().BytesBE()))
expectedBytes, err := tr.Store.Get(makeStorageKey(expectedNode.Hash()))
if expectedRefCount != 0 {
require.NoError(t, err)
require.Equal(t, expectedRefCount, binary.LittleEndian.Uint32(expectedBytes[len(expectedBytes)-4:]))
@ -32,7 +32,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
b.Children[5] = NewExtensionNode([]byte{0x01}, NewLeafNode([]byte{0xAB, 0xDE}))
path := toNibbles([]byte{0xAC})
e := NewExtensionNode(path, NewHashNode(b.Hash()))
tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore())
tr := NewBillet(e.Hash(), ModeLatest, storage.STTempStorage, newTestStore())
tr.root = e
// OK
@ -61,7 +61,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
l := NewLeafNode([]byte{0xAB, 0xCD})
path := toNibbles([]byte{0xAC})
e := NewExtensionNode(path, NewHashNode(l.Hash()))
tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore())
tr := NewBillet(e.Hash(), ModeLatest, storage.STTempStorage, newTestStore())
tr.root = e
// OK
@ -87,7 +87,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
h := NewHashNode(util.Uint256{1, 2, 3})
path := toNibbles([]byte{0xAC})
e := NewExtensionNode(path, h)
tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore())
tr := NewBillet(e.Hash(), ModeLatest, storage.STTempStorage, newTestStore())
tr.root = e
// no-op
@ -99,7 +99,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
t.Run("parent is Leaf", func(t *testing.T) {
l := NewLeafNode([]byte{0xAB, 0xCD})
path := []byte{}
tr := NewBillet(l.Hash(), true, storage.STTempStorage, newTestStore())
tr := NewBillet(l.Hash(), ModeLatest, storage.STTempStorage, newTestStore())
tr.root = l
// Already restored => panic expected
@ -121,7 +121,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
b := NewBranchNode()
b.Children[5] = NewHashNode(l1.Hash())
b.Children[lastChild] = NewHashNode(l2.Hash())
tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore())
tr := NewBillet(b.Hash(), ModeLatest, storage.STTempStorage, newTestStore())
tr.root = b
// OK
@ -152,7 +152,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
b := NewBranchNode()
b.Children[5] = NewHashNode(l1.Hash())
b.Children[lastChild] = NewHashNode(l2.Hash())
tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore())
tr := NewBillet(b.Hash(), ModeLatest, storage.STTempStorage, newTestStore())
tr.root = b
// OK
@ -179,7 +179,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
// two same hashnodes => leaf's refcount expected to be 2 in the end.
b.Children[3] = NewHashNode(l.Hash())
b.Children[4] = NewHashNode(l.Hash())
tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore())
tr := NewBillet(b.Hash(), ModeLatest, storage.STTempStorage, newTestStore())
tr.root = b
// OK
@ -202,7 +202,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
b := NewBranchNode()
b.Children[3] = NewHashNode(l.Hash())
b.Children[4] = NewHashNode(l.Hash())
tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore())
tr := NewBillet(b.Hash(), ModeLatest, storage.STTempStorage, newTestStore())
// Should fail, because if it's a hash node with non-empty path, then the node
// has already been collapsed.

View file

@ -1,7 +1,6 @@
package mpt
import (
"bytes"
"testing"
"github.com/stretchr/testify/require"
@ -23,7 +22,7 @@ func prepareMPTCompat() *Trie {
b.Children[16] = v2
b.Children[15] = NewHashNode(e4.Hash())
tr := NewTrie(r, true, newTestStore())
tr := NewTrie(r, ModeLatest, newTestStore())
tr.putToStore(r)
tr.putToStore(b)
tr.putToStore(e1)
@ -113,12 +112,12 @@ func TestCompatibility(t *testing.T) {
tr := newFilledTrie(t,
[]byte{0xac, 0x00}, []byte{0xab, 0xcd},
[]byte{0xac, 0x10}, []byte{0xab, 0xcd})
tr.Flush()
tr.Flush(0)
tr2 := copyTrie(tr)
require.NoError(t, tr2.Delete([]byte{0xac, 0x00}))
tr2.Flush()
tr2.Flush(0)
require.NoError(t, tr2.Delete([]byte{0xac, 0x10}))
})
@ -132,7 +131,7 @@ func TestCompatibility(t *testing.T) {
b.Children[0] = e1
b.Children[15] = NewHashNode(e4.Hash())
tr := NewTrie(NewHashNode(r.Hash()), false, newTestStore())
tr := NewTrie(NewHashNode(r.Hash()), ModeAll, newTestStore())
tr.putToStore(r)
tr.putToStore(b)
tr.putToStore(e1)
@ -150,9 +149,9 @@ func TestCompatibility(t *testing.T) {
require.NoError(t, tr.Delete([]byte{0xac, 0x01}))
tr.testHas(t, []byte{0xac, 0x02}, []byte{0xab, 0xcd})
tr.Flush()
tr.Flush(0)
tr2 := NewTrie(NewHashNode(tr.root.Hash()), false, tr.Store)
tr2 := NewTrie(NewHashNode(tr.root.Hash()), ModeAll, tr.Store)
tr2.testHas(t, []byte{0xac, 0x02}, []byte{0xab, 0xcd})
})
@ -161,15 +160,15 @@ func TestCompatibility(t *testing.T) {
[]byte{0xac, 0x11}, []byte{0xac, 0x11},
[]byte{0xac, 0x22}, []byte{0xac, 0x22},
[]byte{0xac}, []byte{0xac})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 7)
require.NoError(t, tr.Delete([]byte{0xac, 0x11}))
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 5)
require.NoError(t, tr.Delete([]byte{0xac, 0x22}))
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 2)
})
@ -189,15 +188,14 @@ func TestCompatibility(t *testing.T) {
b.Children[16] = v2
b.Children[15] = NewHashNode(e4.Hash())
tr := NewTrie(NewHashNode(r.Hash()), true, mainTrie.Store)
tr := NewTrie(NewHashNode(r.Hash()), ModeLatest, mainTrie.Store)
require.Equal(t, r.Hash(), tr.root.Hash())
// Tail bytes contain reference counter thus check for prefix.
proof := testGetProof(t, tr, []byte{0xac, 0x01}, 4)
require.True(t, bytes.HasPrefix(r.Bytes(), proof[0]))
require.True(t, bytes.HasPrefix(b.Bytes(), proof[1]))
require.True(t, bytes.HasPrefix(e1.Bytes(), proof[2]))
require.True(t, bytes.HasPrefix(v1.Bytes(), proof[3]))
require.Equal(t, r.Bytes(), proof[0])
require.Equal(t, b.Bytes(), proof[1])
require.Equal(t, e1.Bytes(), proof[2])
require.Equal(t, v1.Bytes(), proof[3])
testGetProof(t, tr, []byte{0xac}, 3)
testGetProof(t, tr, []byte{0xac, 0x10}, 0)
@ -242,11 +240,11 @@ func TestCompatibility(t *testing.T) {
[]byte{0xa1, 0x01}, []byte{0x01},
[]byte{0xa2, 0x01}, []byte{0x01},
[]byte{0xa3, 0x01}, []byte{0x01})
tr.Flush()
tr.Flush(0)
tr2 := copyTrie(tr)
require.NoError(t, tr2.Delete([]byte{0xa3, 0x01}))
tr2.Flush()
tr2.Flush(0)
tr3 := copyTrie(tr2)
require.NoError(t, tr3.Delete([]byte{0xa2, 0x01}))
@ -258,15 +256,15 @@ func TestCompatibility(t *testing.T) {
[]byte{0xa1, 0x01}, []byte{0x01},
[]byte{0xa2, 0x01}, []byte{0x01},
[]byte{0xa3, 0x01}, []byte{0x01})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 4)
require.NoError(t, tr.Delete([]byte{0xa3, 0x01}))
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 4)
require.NoError(t, tr.Delete([]byte{0xa2, 0x01}))
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 2)
tr.testHas(t, []byte{0xa1, 0x01}, []byte{0x01})
})
@ -275,17 +273,17 @@ func TestCompatibility(t *testing.T) {
tr := newFilledTrie(t,
[]byte{0xa1}, []byte{0x01},
[]byte{0xa2}, []byte{0x02})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 4)
tr1 := copyTrie(tr)
require.NoError(t, tr1.Delete([]byte{0xa1}))
tr1.Flush()
tr1.Flush(0)
require.Equal(t, 2, len(tr1.Store.GetBatch().Put))
tr2 := copyTrie(tr1)
require.NoError(t, tr2.Delete([]byte{0xa2}))
tr2.Flush()
tr2.Flush(0)
require.Equal(t, 0, len(tr2.Store.GetBatch().Put))
})
@ -294,21 +292,21 @@ func TestCompatibility(t *testing.T) {
[]byte{0x10}, []byte{0x01},
[]byte{0x20}, []byte{0x02},
[]byte{0x30}, []byte{0x03})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 7)
tr1 := copyTrie(tr)
require.NoError(t, tr1.Delete([]byte{0x10}))
tr1.Flush()
tr1.Flush(0)
tr2 := copyTrie(tr1)
require.NoError(t, tr2.Delete([]byte{0x20}))
tr2.Flush()
tr2.Flush(0)
require.Equal(t, 2, len(tr2.Store.GetBatch().Put))
tr3 := copyTrie(tr2)
require.NoError(t, tr3.Delete([]byte{0x30}))
tr3.Flush()
tr3.Flush(0)
require.Equal(t, 0, len(tr3.Store.GetBatch().Put))
})
@ -316,12 +314,12 @@ func TestCompatibility(t *testing.T) {
tr := newFilledTrie(t,
[]byte{0xa1}, []byte{0x01},
[]byte{0xa2}, []byte{0x02})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 4)
tr1 := copyTrie(tr)
require.NoError(t, tr1.Put([]byte{0xa3}, []byte{0x03}))
tr1.Flush()
tr1.Flush(0)
require.Equal(t, 5, len(tr1.Store.GetBatch().Put))
})
@ -329,19 +327,19 @@ func TestCompatibility(t *testing.T) {
tr := newFilledTrie(t,
[]byte{0x10}, []byte{0x01},
[]byte{0x20}, []byte{0x02})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 5)
tr1 := copyTrie(tr)
require.NoError(t, tr1.Put([]byte{0x30}, []byte{0x03}))
tr1.Flush()
tr1.Flush(0)
checkBatchSize(t, tr1, 7)
})
t.Run("EmptyValueIssue633", func(t *testing.T) {
tr := newFilledTrie(t,
[]byte{0x01}, []byte{})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 2)
proof := testGetProof(t, tr, []byte{0x01}, 2)
@ -352,7 +350,7 @@ func TestCompatibility(t *testing.T) {
}
func copyTrie(t *Trie) *Trie {
return NewTrie(NewHashNode(t.root.Hash()), t.refcountEnabled, t.Store)
return NewTrie(NewHashNode(t.root.Hash()), t.mode, t.Store)
}
func checkBatchSize(t *testing.T, tr *Trie, n int) {
@ -372,7 +370,7 @@ func testGetProof(t *testing.T, tr *Trie, key []byte, size int) [][]byte {
}
func newFilledTrie(t *testing.T, args ...[]byte) *Trie {
tr := NewTrie(nil, true, newTestStore())
tr := NewTrie(nil, ModeLatest, newTestStore())
for i := 0; i < len(args); i += 2 {
require.NoError(t, tr.Put(args[i], args[i+1]))
}
@ -381,7 +379,7 @@ func newFilledTrie(t *testing.T, args ...[]byte) *Trie {
func TestCompatibility_Find(t *testing.T) {
check := func(t *testing.T, from []byte, expectedResLen int) {
tr := NewTrie(nil, false, newTestStore())
tr := NewTrie(nil, ModeAll, newTestStore())
require.NoError(t, tr.Put([]byte("aa"), []byte("02")))
require.NoError(t, tr.Put([]byte("aa10"), []byte("03")))
require.NoError(t, tr.Put([]byte("aa50"), []byte("04")))
@ -407,11 +405,11 @@ func TestCompatibility_Find(t *testing.T) {
check(t, []byte{}, 2) // without `from` key
})
t.Run("TestFindStatesIssue652", func(t *testing.T) {
tr := NewTrie(nil, false, newTestStore())
tr := NewTrie(nil, ModeAll, newTestStore())
// root is an extension node with key=abc; next=branch
require.NoError(t, tr.Put([]byte("abc1"), []byte("01")))
require.NoError(t, tr.Put([]byte("abc3"), []byte("02")))
tr.Flush()
tr.Flush(0)
// find items with extension's key prefix
t.Run("from > start", func(t *testing.T) {
res, err := tr.Find([]byte("ab"), []byte("d2"), 100)

View file

@ -93,7 +93,7 @@ func TestNode_Serializable(t *testing.T) {
// https://github.com/neo-project/neo/blob/neox-2.x/neo.UnitTests/UT_MPTTrie.cs#L198
func TestJSONSharp(t *testing.T) {
tr := NewTrie(nil, false, newTestStore())
tr := NewTrie(nil, ModeAll, newTestStore())
require.NoError(t, tr.Put([]byte{0xac, 0x11}, []byte{0xac, 0x11}))
require.NoError(t, tr.Put([]byte{0xac, 0x22}, []byte{0xac, 0x22}))
require.NoError(t, tr.Put([]byte{0xac}, []byte{0xac}))

View file

@ -66,11 +66,11 @@ func (t *Trie) getProof(curr Node, path []byte, proofs *[][]byte) (Node, error)
// It also returns value for the key.
func VerifyProof(rh util.Uint256, key []byte, proofs [][]byte) ([]byte, bool) {
path := toNibbles(key)
tr := NewTrie(NewHashNode(rh), false, storage.NewMemCachedStore(storage.NewMemoryStore()))
tr := NewTrie(NewHashNode(rh), ModeAll, storage.NewMemCachedStore(storage.NewMemoryStore()))
for i := range proofs {
h := hash.DoubleSha256(proofs[i])
// no errors in Put to memory store
_ = tr.Store.Put(makeStorageKey(h[:]), proofs[i])
_ = tr.Store.Put(makeStorageKey(h), proofs[i])
}
_, leaf, _, err := tr.getWithPath(tr.root, path, true)
if err != nil {

View file

@ -15,7 +15,7 @@ func newProofTrie(t *testing.T, missingHashNode bool) *Trie {
b.Children[4] = NewHashNode(e.Hash())
b.Children[5] = e2
tr := NewTrie(b, false, newTestStore())
tr := NewTrie(b, ModeAll, newTestStore())
require.NoError(t, tr.Put([]byte{0x12, 0x31}, []byte("value1")))
require.NoError(t, tr.Put([]byte{0x12, 0x32}, []byte("value2")))
tr.putToStore(l)

View file

@ -12,12 +12,28 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util/slice"
)
// TrieMode is the storage mode of trie, it affects the DB scheme.
type TrieMode byte
// TrieMode is the storage mode of trie.
const (
// ModeAll is used to store everything.
ModeAll TrieMode = 0
// ModeLatest is used to only store the latest root.
ModeLatest TrieMode = 0x01
// ModeGCFlag is a flag for GC.
ModeGCFlag TrieMode = 0x02
// ModeGC is used to store a set of roots with GC possible, it combines
// GCFlag and Latest (because it needs RC, but it has GC enabled).
ModeGC TrieMode = 0x03
)
// Trie is an MPT trie storing all key-value pairs.
type Trie struct {
Store *storage.MemCachedStore
root Node
refcountEnabled bool
mode TrieMode
refcount map[util.Uint256]*cachedNode
}
@ -30,10 +46,20 @@ type cachedNode struct {
// ErrNotFound is returned when requested trie item is missing.
var ErrNotFound = errors.New("item not found")
// RC returns true when reference counting is enabled.
func (m TrieMode) RC() bool {
return m&ModeLatest != 0
}
// GC returns true when garbage collection is enabled.
func (m TrieMode) GC() bool {
return m&ModeGCFlag != 0
}
// NewTrie returns new MPT trie. It accepts a MemCachedStore to decouple storage errors from logic errors
// so that all storage errors are processed during `store.Persist()` at the caller.
// This also has the benefit, that every `Put` can be considered an atomic operation.
func NewTrie(root Node, enableRefCount bool, store *storage.MemCachedStore) *Trie {
func NewTrie(root Node, mode TrieMode, store *storage.MemCachedStore) *Trie {
if root == nil {
root = EmptyNode{}
}
@ -42,7 +68,7 @@ func NewTrie(root Node, enableRefCount bool, store *storage.MemCachedStore) *Tri
Store: store,
root: root,
refcountEnabled: enableRefCount,
mode: mode,
refcount: make(map[util.Uint256]*cachedNode),
}
}
@ -372,27 +398,29 @@ func (t *Trie) StateRoot() util.Uint256 {
return t.root.Hash()
}
func makeStorageKey(mptKey []byte) []byte {
return append([]byte{byte(storage.DataMPT)}, mptKey...)
func makeStorageKey(mptKey util.Uint256) []byte {
return append([]byte{byte(storage.DataMPT)}, mptKey[:]...)
}
// Flush puts every node in the trie except Hash ones to the storage.
// Because we care only about block-level changes, there is no need to put every
// new node to storage. Normally, flush should be called with every StateRoot persist, i.e.
// after every block.
func (t *Trie) Flush() {
func (t *Trie) Flush(index uint32) {
key := makeStorageKey(util.Uint256{})
for h, node := range t.refcount {
if node.refcount != 0 {
copy(key[1:], h[:])
if node.bytes == nil {
panic("item not in trie")
}
if t.refcountEnabled {
node.initial = t.updateRefCount(h)
if t.mode.RC() {
node.initial = t.updateRefCount(h, key, index)
if node.initial == 0 {
delete(t.refcount, h)
}
} else if node.refcount > 0 {
_ = t.Store.Put(makeStorageKey(h.BytesBE()), node.bytes)
_ = t.Store.Put(key, node.bytes)
}
node.refcount = 0
} else {
@ -401,25 +429,36 @@ func (t *Trie) Flush() {
}
}
func IsActiveValue(v []byte) bool {
return len(v) > 4 && v[len(v)-5] == 1
}
func getFromStore(key []byte, mode TrieMode, store *storage.MemCachedStore) ([]byte, error) {
data, err := store.Get(key)
if err == nil && mode.GC() && !IsActiveValue(data) {
return nil, storage.ErrKeyNotFound
}
return data, err
}
// updateRefCount should be called only when refcounting is enabled.
func (t *Trie) updateRefCount(h util.Uint256) int32 {
if !t.refcountEnabled {
func (t *Trie) updateRefCount(h util.Uint256, key []byte, index uint32) int32 {
if !t.mode.RC() {
panic("`updateRefCount` is called, but GC is disabled")
}
var data []byte
key := makeStorageKey(h.BytesBE())
node := t.refcount[h]
cnt := node.initial
if cnt == 0 {
// A newly created item which may be in store.
var err error
data, err = t.Store.Get(key)
data, err = getFromStore(key, t.mode, t.Store)
if err == nil {
cnt = int32(binary.LittleEndian.Uint32(data[len(data)-4:]))
}
}
if len(data) == 0 {
data = append(node.bytes, 0, 0, 0, 0)
data = append(node.bytes, 1, 0, 0, 0, 0)
}
cnt += node.refcount
switch {
@ -427,7 +466,13 @@ func (t *Trie) updateRefCount(h util.Uint256) int32 {
// BUG: negative reference count
panic(fmt.Sprintf("negative reference count: %s new %d, upd %d", h.StringBE(), cnt, t.refcount[h]))
case cnt == 0:
if !t.mode.GC() {
_ = t.Store.Delete(key)
} else {
data[len(data)-5] = 0
binary.LittleEndian.PutUint32(data[len(data)-4:], index)
_ = t.Store.Put(key, data)
}
default:
binary.LittleEndian.PutUint32(data[len(data)-4:], uint32(cnt))
_ = t.Store.Put(key, data)
@ -466,7 +511,7 @@ func (t *Trie) removeRef(h util.Uint256, bs []byte) {
}
func (t *Trie) getFromStore(h util.Uint256) (Node, error) {
data, err := t.Store.Get(makeStorageKey(h.BytesBE()))
data, err := getFromStore(makeStorageKey(h), t.mode, t.Store)
if err != nil {
return nil, err
}
@ -478,11 +523,12 @@ func (t *Trie) getFromStore(h util.Uint256) (Node, error) {
return nil, r.Err
}
if t.refcountEnabled {
data = data[:len(data)-4]
if t.mode.RC() {
data = data[:len(data)-5]
node := t.refcount[h]
if node != nil {
node.bytes = data
_ = r.ReadB()
node.initial = int32(r.ReadU32LE())
}
}
@ -566,7 +612,7 @@ func (t *Trie) Find(prefix, from []byte, max int) ([]storage.KeyValue, error) {
res []storage.KeyValue
count int
)
b := NewBillet(t.root.Hash(), false, 0, t.Store)
b := NewBillet(t.root.Hash(), t.mode, 0, t.Store)
process := func(pathToNode []byte, node Node, _ []byte) bool {
if leaf, ok := node.(*LeafNode); ok {
if from == nil || !bytes.Equal(pathToNode, from) { // (*Billet).traverse includes `from` path into result if so. Need to filter out manually.

View file

@ -29,7 +29,7 @@ func newTestTrie(t *testing.T) *Trie {
b.Children[10] = NewExtensionNode([]byte{0x0e}, h)
e := NewExtensionNode(toNibbles([]byte{0xAC}), b)
tr := NewTrie(e, false, newTestStore())
tr := NewTrie(e, ModeAll, newTestStore())
tr.putToStore(e)
tr.putToStore(b)
@ -46,30 +46,30 @@ func newTestTrie(t *testing.T) *Trie {
}
func testTrieRefcount(t *testing.T, key1, key2 []byte) {
tr := NewTrie(nil, true, storage.NewMemCachedStore(storage.NewMemoryStore()))
tr := NewTrie(nil, ModeLatest, storage.NewMemCachedStore(storage.NewMemoryStore()))
require.NoError(t, tr.Put(key1, []byte{1}))
tr.Flush()
tr.Flush(0)
require.NoError(t, tr.Put(key2, []byte{1}))
tr.Flush()
tr.Flush(0)
tr.testHas(t, key1, []byte{1})
tr.testHas(t, key2, []byte{1})
// remove first, keep second
require.NoError(t, tr.Delete(key1))
tr.Flush()
tr.Flush(0)
tr.testHas(t, key1, nil)
tr.testHas(t, key2, []byte{1})
// no-op
require.NoError(t, tr.Put(key1, []byte{1}))
require.NoError(t, tr.Delete(key1))
tr.Flush()
tr.Flush(0)
tr.testHas(t, key1, nil)
tr.testHas(t, key2, []byte{1})
// delete non-existent, refcount should not be updated
require.NoError(t, tr.Delete(key1))
tr.Flush()
tr.Flush(0)
tr.testHas(t, key1, nil)
tr.testHas(t, key2, []byte{1})
}
@ -89,7 +89,7 @@ func TestTrie_PutIntoBranchNode(t *testing.T) {
l := NewLeafNode([]byte{0x8})
b.Children[0x7] = NewHashNode(l.Hash())
b.Children[0x8] = NewHashNode(random.Uint256())
tr := NewTrie(b, false, newTestStore())
tr := NewTrie(b, ModeAll, newTestStore())
// empty hash node child
require.NoError(t, tr.Put([]byte{0x66}, value))
@ -119,7 +119,7 @@ func TestTrie_PutIntoExtensionNode(t *testing.T) {
l := NewLeafNode([]byte{0x11})
key := []byte{0x12}
e := NewExtensionNode(toNibbles(key), NewHashNode(l.Hash()))
tr := NewTrie(e, false, newTestStore())
tr := NewTrie(e, ModeAll, newTestStore())
// missing hash
require.Error(t, tr.Put(key, value))
@ -145,7 +145,7 @@ func TestTrie_PutIntoHashNode(t *testing.T) {
e := NewExtensionNode([]byte{0x02}, l)
b.Children[1] = NewHashNode(e.Hash())
b.Children[9] = NewHashNode(random.Uint256())
tr := NewTrie(b, false, newTestStore())
tr := NewTrie(b, ModeAll, newTestStore())
tr.putToStore(e)
@ -174,7 +174,7 @@ func TestTrie_PutIntoHashNode(t *testing.T) {
func TestTrie_Put(t *testing.T) {
trExp := newTestTrie(t)
trAct := NewTrie(nil, false, newTestStore())
trAct := NewTrie(nil, ModeAll, newTestStore())
require.NoError(t, trAct.Put([]byte{0xAC, 0x01}, []byte{0xAB, 0xCD}))
require.NoError(t, trAct.Put([]byte{0xAC, 0x13}, []byte{}))
require.NoError(t, trAct.Put([]byte{0xAC, 0x99}, []byte{0x22, 0x22}))
@ -186,7 +186,7 @@ func TestTrie_Put(t *testing.T) {
}
func TestTrie_PutInvalid(t *testing.T) {
tr := NewTrie(nil, false, newTestStore())
tr := NewTrie(nil, ModeAll, newTestStore())
key, value := []byte("key"), []byte("value")
// empty key
@ -204,7 +204,7 @@ func TestTrie_PutInvalid(t *testing.T) {
}
func TestTrie_BigPut(t *testing.T) {
tr := NewTrie(nil, false, newTestStore())
tr := NewTrie(nil, ModeAll, newTestStore())
items := []struct{ k, v string }{
{"item with long key", "value1"},
{"item with matching prefix", "value2"},
@ -244,14 +244,14 @@ func (tr *Trie) putToStore(n Node) {
if n.Type() == HashT {
panic("can't put hash node in trie")
}
if tr.refcountEnabled {
if tr.mode.RC() {
tr.refcount[n.Hash()] = &cachedNode{
bytes: n.Bytes(),
refcount: 1,
}
tr.updateRefCount(n.Hash())
tr.updateRefCount(n.Hash(), makeStorageKey(n.Hash()), 0)
} else {
_ = tr.Store.Put(makeStorageKey(n.Hash().BytesBE()), n.Bytes())
_ = tr.Store.Put(makeStorageKey(n.Hash()), n.Bytes())
}
}
@ -298,7 +298,7 @@ func TestTrie_Get(t *testing.T) {
})
t.Run("UnfoldRoot", func(t *testing.T) {
tr := newTestTrie(t)
single := NewTrie(NewHashNode(tr.root.Hash()), false, tr.Store)
single := NewTrie(NewHashNode(tr.root.Hash()), ModeAll, tr.Store)
single.testHas(t, []byte{0xAC}, nil)
single.testHas(t, []byte{0xAC, 0x01}, []byte{0xAB, 0xCD})
single.testHas(t, []byte{0xAC, 0x99}, []byte{0x22, 0x22})
@ -313,13 +313,13 @@ func TestTrie_Flush(t *testing.T) {
"key2": []byte("value2"),
}
tr := NewTrie(nil, false, newTestStore())
tr := NewTrie(nil, ModeAll, newTestStore())
for k, v := range pairs {
require.NoError(t, tr.Put([]byte(k), v))
}
tr.Flush()
tr = NewTrie(NewHashNode(tr.StateRoot()), false, tr.Store)
tr.Flush(0)
tr = NewTrie(NewHashNode(tr.StateRoot()), ModeAll, tr.Store)
for k, v := range pairs {
actual, err := tr.Get([]byte(k))
require.NoError(t, err)
@ -337,10 +337,14 @@ func TestTrie_Delete(t *testing.T) {
}
func testTrieDelete(t *testing.T, enableGC bool) {
var mode TrieMode
if enableGC {
mode = ModeLatest
}
t.Run("Hash", func(t *testing.T) {
t.Run("FromStore", func(t *testing.T) {
l := NewLeafNode([]byte{0x12})
tr := NewTrie(NewHashNode(l.Hash()), enableGC, newTestStore())
tr := NewTrie(NewHashNode(l.Hash()), mode, newTestStore())
t.Run("NotInStore", func(t *testing.T) {
require.Error(t, tr.Delete([]byte{}))
})
@ -352,7 +356,7 @@ func testTrieDelete(t *testing.T, enableGC bool) {
})
t.Run("Empty", func(t *testing.T) {
tr := NewTrie(nil, enableGC, newTestStore())
tr := NewTrie(nil, mode, newTestStore())
require.NoError(t, tr.Delete([]byte{}))
})
})
@ -360,7 +364,7 @@ func testTrieDelete(t *testing.T, enableGC bool) {
t.Run("Leaf", func(t *testing.T) {
check := func(t *testing.T, value []byte) {
l := NewLeafNode(value)
tr := NewTrie(l, enableGC, newTestStore())
tr := NewTrie(l, mode, newTestStore())
t.Run("NonExistentKey", func(t *testing.T) {
require.NoError(t, tr.Delete([]byte{0x12}))
tr.testHas(t, []byte{}, value)
@ -381,7 +385,7 @@ func testTrieDelete(t *testing.T, enableGC bool) {
check := func(t *testing.T, value []byte) {
l := NewLeafNode(value)
e := NewExtensionNode([]byte{0x0A, 0x0B}, l)
tr := NewTrie(e, enableGC, newTestStore())
tr := NewTrie(e, mode, newTestStore())
t.Run("NonExistentKey", func(t *testing.T) {
require.NoError(t, tr.Delete([]byte{}))
@ -405,7 +409,7 @@ func testTrieDelete(t *testing.T, enableGC bool) {
b.Children[0] = NewExtensionNode([]byte{0x01}, NewLeafNode(value))
b.Children[6] = NewExtensionNode([]byte{0x07}, NewLeafNode([]byte{0x56, 0x78}))
e := NewExtensionNode([]byte{0x01, 0x02}, b)
tr := NewTrie(e, enableGC, newTestStore())
tr := NewTrie(e, mode, newTestStore())
h := e.Hash()
require.NoError(t, tr.Delete([]byte{0x12, 0x01}))
@ -432,7 +436,7 @@ func testTrieDelete(t *testing.T, enableGC bool) {
b.Children[lastChild] = NewLeafNode([]byte{0x12})
b.Children[0] = NewExtensionNode([]byte{0x01}, NewLeafNode([]byte{0x34}))
b.Children[1] = NewExtensionNode([]byte{0x06}, NewLeafNode(value))
tr := NewTrie(b, enableGC, newTestStore())
tr := NewTrie(b, mode, newTestStore())
require.NoError(t, tr.Delete([]byte{0x16}))
tr.testHas(t, []byte{}, []byte{0x12})
tr.testHas(t, []byte{0x01}, []byte{0x34})
@ -454,7 +458,7 @@ func testTrieDelete(t *testing.T, enableGC bool) {
l := NewLeafNode([]byte{0x34})
e := NewExtensionNode([]byte{0x06}, l)
b.Children[5] = NewHashNode(e.Hash())
tr := NewTrie(b, enableGC, newTestStore())
tr := NewTrie(b, mode, newTestStore())
tr.putToStore(l)
tr.putToStore(e)
require.NoError(t, tr.Delete([]byte{}))
@ -478,7 +482,7 @@ func testTrieDelete(t *testing.T, enableGC bool) {
b.Children[3] = NewExtensionNode([]byte{4}, NewLeafNode(value))
b.Children[lastChild] = NewHashNode(h)
tr := NewTrie(NewExtensionNode([]byte{1, 2}, b), enableGC, newTestStore())
tr := NewTrie(NewExtensionNode([]byte{1, 2}, b), mode, newTestStore())
tr.putToStore(ch)
require.NoError(t, tr.Delete([]byte{0x12, 0x34}))
@ -505,7 +509,7 @@ func testTrieDelete(t *testing.T, enableGC bool) {
b := NewBranchNode()
b.Children[lastChild] = NewLeafNode(value)
b.Children[5] = c
tr := NewTrie(b, enableGC, newTestStore())
tr := NewTrie(b, mode, newTestStore())
require.NoError(t, tr.Delete([]byte{}))
tr.testHas(t, []byte{}, nil)
@ -529,7 +533,7 @@ func testTrieDelete(t *testing.T, enableGC bool) {
l := NewLeafNode(value)
e := NewExtensionNode([]byte{0x06}, l)
b.Children[5] = NewHashNode(e.Hash())
tr := NewTrie(b, enableGC, newTestStore())
tr := NewTrie(b, mode, newTestStore())
tr.putToStore(l)
tr.putToStore(e)
require.NoError(t, tr.Delete([]byte{0x56}))
@ -579,7 +583,7 @@ func TestTrie_Collapse(t *testing.T) {
b.Children[0] = e
hb := b.Hash()
tr := NewTrie(b, false, newTestStore())
tr := NewTrie(b, ModeAll, newTestStore())
tr.Collapse(1)
newb, ok := tr.root.(*BranchNode)
@ -593,7 +597,7 @@ func TestTrie_Collapse(t *testing.T) {
hl := l.Hash()
e := NewExtensionNode([]byte{0x01}, l)
h := e.Hash()
tr := NewTrie(e, false, newTestStore())
tr := NewTrie(e, ModeAll, newTestStore())
tr.Collapse(1)
newe, ok := tr.root.(*ExtensionNode)
@ -604,19 +608,19 @@ func TestTrie_Collapse(t *testing.T) {
})
t.Run("Leaf", func(t *testing.T) {
l := NewLeafNode([]byte("value"))
tr := NewTrie(l, false, newTestStore())
tr := NewTrie(l, ModeAll, newTestStore())
tr.Collapse(10)
require.Equal(t, NewLeafNode([]byte("value")), tr.root)
})
t.Run("Empty Leaf", func(t *testing.T) {
l := NewLeafNode([]byte{})
tr := NewTrie(l, false, newTestStore())
tr := NewTrie(l, ModeAll, newTestStore())
tr.Collapse(10)
require.Equal(t, NewLeafNode([]byte{}), tr.root)
})
t.Run("Hash", func(t *testing.T) {
t.Run("EmptyNode", func(t *testing.T) {
tr := NewTrie(EmptyNode{}, false, newTestStore())
tr := NewTrie(EmptyNode{}, ModeAll, newTestStore())
require.NotPanics(t, func() { tr.Collapse(1) })
_, ok := tr.root.(EmptyNode)
require.True(t, ok)
@ -624,7 +628,7 @@ func TestTrie_Collapse(t *testing.T) {
h := random.Uint256()
hn := NewHashNode(h)
tr := NewTrie(hn, false, newTestStore())
tr := NewTrie(hn, ModeAll, newTestStore())
tr.Collapse(10)
newRoot, ok := tr.root.(*HashNode)

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
@ -28,6 +29,7 @@ type (
Store *storage.MemCachedStore
network netmode.Magic
srInHead bool
mode mpt.TrieMode
mpt *mpt.Trie
verifier VerifierFunc
log *zap.Logger
@ -52,9 +54,17 @@ type (
// NewModule returns new instance of stateroot module.
func NewModule(cfg config.ProtocolConfiguration, verif VerifierFunc, log *zap.Logger, s *storage.MemCachedStore) *Module {
var mode mpt.TrieMode
if cfg.KeepOnlyLatestState {
mode |= mpt.ModeLatest
}
if cfg.RemoveUntraceableBlocks {
mode |= mpt.ModeGC
}
return &Module{
network: cfg.Magic,
srInHead: cfg.StateRootInHeader,
mode: mode,
verifier: verif,
log: log,
Store: s,
@ -63,7 +73,8 @@ func NewModule(cfg config.ProtocolConfiguration, verif VerifierFunc, log *zap.Lo
// GetState returns value at the specified key fom the MPT with the specified root.
func (s *Module) GetState(root util.Uint256, key []byte) ([]byte, error) {
tr := mpt.NewTrie(mpt.NewHashNode(root), false, storage.NewMemCachedStore(s.Store))
// Allow accessing old values, it's RO thing.
tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store))
return tr.Get(key)
}
@ -73,13 +84,15 @@ func (s *Module) GetState(root util.Uint256, key []byte) ([]byte, error) {
// item with key equals to prefix is included into result; if empty `start` specified,
// then item with key equals to prefix is not included into result.
func (s *Module) FindStates(root util.Uint256, prefix, start []byte, max int) ([]storage.KeyValue, error) {
tr := mpt.NewTrie(mpt.NewHashNode(root), false, storage.NewMemCachedStore(s.Store))
// Allow accessing old values, it's RO thing.
tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store))
return tr.Find(prefix, start, max)
}
// GetStateProof returns proof of having key in the MPT with the specified root.
func (s *Module) GetStateProof(root util.Uint256, key []byte) ([][]byte, error) {
tr := mpt.NewTrie(mpt.NewHashNode(root), false, storage.NewMemCachedStore(s.Store))
// Allow accessing old values, it's RO thing.
tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store))
return tr.GetProof(key)
}
@ -104,14 +117,14 @@ func (s *Module) CurrentValidatedHeight() uint32 {
}
// Init initializes state root module at the given height.
func (s *Module) Init(height uint32, enableRefCount bool) error {
func (s *Module) Init(height uint32) error {
data, err := s.Store.Get([]byte{byte(storage.DataMPT), prefixValidated})
if err == nil {
s.validatedHeight.Store(binary.LittleEndian.Uint32(data))
}
if height == 0 {
s.mpt = mpt.NewTrie(nil, enableRefCount, s.Store)
s.mpt = mpt.NewTrie(nil, s.mode, s.Store)
s.currentLocal.Store(util.Uint256{})
return nil
}
@ -121,7 +134,7 @@ func (s *Module) Init(height uint32, enableRefCount bool) error {
}
s.currentLocal.Store(r.Root)
s.localHeight.Store(r.Index)
s.mpt = mpt.NewTrie(mpt.NewHashNode(r.Root), enableRefCount, s.Store)
s.mpt = mpt.NewTrie(mpt.NewHashNode(r.Root), s.mode, s.Store)
return nil
}
@ -157,7 +170,7 @@ func (s *Module) CleanStorage() error {
}
// JumpToState performs jump to the state specified by given stateroot index.
func (s *Module) JumpToState(sr *state.MPTRoot, enableRefCount bool) error {
func (s *Module) JumpToState(sr *state.MPTRoot) error {
if err := s.addLocalStateRoot(s.Store, sr); err != nil {
return fmt.Errorf("failed to store local state root: %w", err)
}
@ -171,10 +184,48 @@ func (s *Module) JumpToState(sr *state.MPTRoot, enableRefCount bool) error {
s.currentLocal.Store(sr.Root)
s.localHeight.Store(sr.Index)
s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), enableRefCount, s.Store)
s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), s.mode, s.Store)
return nil
}
// GC performs garbage collection.
func (s *Module) GC(index uint32, store storage.Store) time.Duration {
if !s.mode.GC() {
panic("stateroot: GC invoked, but not enabled")
}
var removed int
var stored int64
s.log.Info("starting MPT garbage collection", zap.Uint32("index", index))
start := time.Now()
b := store.Batch()
store.Seek(storage.SeekRange{
Prefix: []byte{byte(storage.DataMPT)},
}, func(k, v []byte) bool {
stored++
if !mpt.IsActiveValue(v) {
h := binary.LittleEndian.Uint32(v[len(v)-4:])
if h > index {
return true
}
b.Delete(k)
removed++
stored--
}
return true
})
err := store.PutBatch(b)
dur := time.Since(start)
if err != nil {
s.log.Error("failed to flush MPT GC changeset", zap.Duration("time", dur), zap.Error(err))
} else {
s.log.Info("finished MPT garbage collection",
zap.Int("removed", removed),
zap.Int64("stored", stored),
zap.Duration("time", dur))
}
return dur
}
// AddMPTBatch updates using provided batch.
func (s *Module) AddMPTBatch(index uint32, b mpt.Batch, cache *storage.MemCachedStore) (*mpt.Trie, *state.MPTRoot, error) {
mpt := *s.mpt
@ -182,7 +233,7 @@ func (s *Module) AddMPTBatch(index uint32, b mpt.Batch, cache *storage.MemCached
if _, err := mpt.PutBatch(b); err != nil {
return nil, nil, err
}
mpt.Flush()
mpt.Flush(index)
sr := &state.MPTRoot{
Index: index,
Root: mpt.StateRoot(),

View file

@ -221,7 +221,12 @@ func (s *Module) defineSyncStage() error {
if err != nil {
return fmt.Errorf("failed to get header to initialize MPT billet: %w", err)
}
s.billet = mpt.NewBillet(header.PrevStateRoot, s.bc.GetConfig().KeepOnlyLatestState,
var mode mpt.TrieMode
// No need to enable GC here, it only has latest things.
if s.bc.GetConfig().KeepOnlyLatestState || s.bc.GetConfig().RemoveUntraceableBlocks {
mode |= mpt.ModeLatest
}
s.billet = mpt.NewBillet(header.PrevStateRoot, mode,
TemporaryPrefix(s.dao.Version.StoragePrefix), s.dao.Store)
s.log.Info("MPT billet initialized",
zap.Uint32("height", s.syncPoint),
@ -494,7 +499,12 @@ func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeByt
s.lock.RLock()
defer s.lock.RUnlock()
b := mpt.NewBillet(root, s.bc.GetConfig().KeepOnlyLatestState, 0, storage.NewMemCachedStore(s.dao.Store))
var mode mpt.TrieMode
// GC must be turned off here to allow access to the archived nodes.
if s.bc.GetConfig().KeepOnlyLatestState || s.bc.GetConfig().RemoveUntraceableBlocks {
mode |= mpt.ModeLatest
}
b := mpt.NewBillet(root, mode, 0, storage.NewMemCachedStore(s.dao.Store))
return b.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool {
return process(node, nodeBytes)
}, false)

View file

@ -15,7 +15,7 @@ import (
func TestModule_PR2019_discussion_r689629704(t *testing.T) {
expectedStorage := storage.NewMemCachedStore(storage.NewMemoryStore())
tr := mpt.NewTrie(nil, true, expectedStorage)
tr := mpt.NewTrie(nil, mpt.ModeLatest, expectedStorage)
require.NoError(t, tr.Put([]byte{0x03}, []byte("leaf1")))
require.NoError(t, tr.Put([]byte{0x01, 0xab, 0x02}, []byte("leaf2")))
require.NoError(t, tr.Put([]byte{0x01, 0xab, 0x04}, []byte("leaf3")))
@ -24,7 +24,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
require.NoError(t, tr.Put([]byte{0x06, 0x03}, []byte("leaf4")))
sr := tr.StateRoot()
tr.Flush()
tr.Flush(0)
// Keep MPT nodes in a map in order not to repeat them. We'll use `nodes` map to ask
// state sync module to restore the nodes.
@ -57,7 +57,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
dao: dao.NewSimple(actualStorage, true, false),
mptpool: NewPool(),
}
stateSync.billet = mpt.NewBillet(sr, true,
stateSync.billet = mpt.NewBillet(sr, mpt.ModeLatest,
TemporaryPrefix(stateSync.dao.Version.StoragePrefix), actualStorage)
stateSync.mptpool.Add(sr, []byte{})

View file

@ -85,21 +85,21 @@ func (s *BoltDBStore) Delete(key []byte) error {
// PutBatch implements the Store interface.
func (s *BoltDBStore) PutBatch(batch Batch) error {
memBatch := batch.(*MemoryBatch)
return s.PutChangeSet(memBatch.mem, memBatch.del)
return s.PutChangeSet(memBatch.mem)
}
// PutChangeSet implements the Store interface.
func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error {
var err error
return s.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(Bucket)
for k, v := range puts {
err := b.Put([]byte(k), v)
if err != nil {
return err
if v != nil {
err = b.Put([]byte(k), v)
} else {
err = b.Delete([]byte(k))
}
}
for k := range dels {
err := b.Delete([]byte(k))
if err != nil {
return err
}

View file

@ -62,20 +62,17 @@ func (s *LevelDBStore) PutBatch(batch Batch) error {
}
// PutChangeSet implements the Store interface.
func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
func (s *LevelDBStore) PutChangeSet(puts map[string][]byte) error {
tx, err := s.db.OpenTransaction()
if err != nil {
return err
}
for k := range puts {
if puts[k] != nil {
err = tx.Put([]byte(k), puts[k], nil)
if err != nil {
tx.Discard()
return err
}
}
for k := range dels {
} else {
err = tx.Delete([]byte(k), nil)
}
if err != nil {
tx.Discard()
return err

View file

@ -55,13 +55,12 @@ func NewMemCachedStore(lower Store) *MemCachedStore {
func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
s.mut.RLock()
defer s.mut.RUnlock()
k := string(key)
if val, ok := s.mem[k]; ok {
return val, nil
}
if _, ok := s.del[k]; ok {
if val, ok := s.mem[string(key)]; ok {
if val == nil {
return nil, ErrKeyNotFound
}
return val, nil
}
return s.ps.Get(key)
}
@ -73,19 +72,16 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
var b MemBatch
b.Put = make([]KeyValueExists, 0, len(s.mem))
b.Deleted = make([]KeyValueExists, 0)
for k, v := range s.mem {
key := []byte(k)
_, err := s.ps.Get(key)
if v == nil {
b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil})
} else {
b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil})
}
b.Deleted = make([]KeyValueExists, 0, len(s.del))
for k := range s.del {
key := []byte(k)
_, err := s.ps.Get(key)
b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil})
}
return &b
}
@ -101,11 +97,12 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix
res := make(chan KeyValue)
go func() {
s.seek(ctx, rng, cutPrefix, func(k, v []byte) bool {
res <- KeyValue{
Key: k,
Value: v,
select {
case <-ctx.Done():
return false
case res <- KeyValue{Key: k, Value: v}:
return true
}
return true // always continue, we have context for early stop.
})
close(res)
}()
@ -141,16 +138,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
Key: []byte(k),
Value: v,
},
Exists: true,
})
}
}
for k := range s.MemoryStore.del {
if isKeyOK(k) {
memRes = append(memRes, KeyValueExists{
KeyValue: KeyValue{
Key: []byte(k),
},
Exists: v != nil,
})
}
}
@ -265,15 +253,14 @@ func (s *MemCachedStore) PersistSync() (int, error) {
func (s *MemCachedStore) persist(isSync bool) (int, error) {
var err error
var keys, dkeys int
var keys int
s.plock.Lock()
defer s.plock.Unlock()
s.mut.Lock()
keys = len(s.mem)
dkeys = len(s.del)
if keys == 0 && dkeys == 0 {
if keys == 0 {
s.mut.Unlock()
return 0, nil
}
@ -282,15 +269,14 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) {
// starts using fresh new maps. This tempstore is only known here and
// nothing ever changes it, therefore accesses to it (reads) can go
// unprotected while writes are handled by s proper.
var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, del: s.del}, ps: s.ps}
var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem}, ps: s.ps}
s.ps = tempstore
s.mem = make(map[string][]byte, len(s.mem))
s.del = make(map[string]bool, len(s.del))
if !isSync {
s.mut.Unlock()
}
err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del)
err = tempstore.ps.PutChangeSet(tempstore.mem)
if !isSync {
s.mut.Lock()
@ -306,12 +292,8 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) {
for k := range s.mem {
tempstore.put(k, s.mem[k])
}
for k := range s.del {
tempstore.drop(k)
}
s.ps = tempstore.ps
s.mem = tempstore.mem
s.del = tempstore.del
}
s.mut.Unlock()
return keys, err

View file

@ -74,7 +74,7 @@ func testMemCachedStorePersist(t *testing.T, ps Store) {
c, err = ts.Persist()
checkBatch(t, ts, nil, nil)
assert.Equal(t, nil, err)
assert.Equal(t, 0, c)
assert.Equal(t, 1, c)
v, err = ps.Get([]byte("key"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
@ -287,7 +287,7 @@ func (b *BadStore) Put(k, v []byte) error {
func (b *BadStore) PutBatch(Batch) error {
return nil
}
func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string]bool) error {
func (b *BadStore) PutChangeSet(_ map[string][]byte) error {
b.onPutBatch()
return ErrKeyNotFound
}

View file

@ -14,8 +14,6 @@ import (
type MemoryStore struct {
mut sync.RWMutex
mem map[string][]byte
// A map, not a slice, to avoid duplicates.
del map[string]bool
}
// MemoryBatch is an in-memory batch compatible with MemoryStore.
@ -37,7 +35,6 @@ func (b *MemoryBatch) Delete(k []byte) {
func NewMemoryStore() *MemoryStore {
return &MemoryStore{
mem: make(map[string][]byte),
del: make(map[string]bool),
}
}
@ -45,7 +42,7 @@ func NewMemoryStore() *MemoryStore {
func (s *MemoryStore) Get(key []byte) ([]byte, error) {
s.mut.RLock()
defer s.mut.RUnlock()
if val, ok := s.mem[string(key)]; ok {
if val, ok := s.mem[string(key)]; ok && val != nil {
return val, nil
}
return nil, ErrKeyNotFound
@ -55,7 +52,6 @@ func (s *MemoryStore) Get(key []byte) ([]byte, error) {
// with mutex locked.
func (s *MemoryStore) put(key string, value []byte) {
s.mem[key] = value
delete(s.del, key)
}
// Put implements the Store interface. Never returns an error.
@ -71,8 +67,7 @@ func (s *MemoryStore) Put(key, value []byte) error {
// drop deletes a key-value pair from the store, it's supposed to be called
// with mutex locked.
func (s *MemoryStore) drop(key string) {
s.del[key] = true
delete(s.mem, key)
s.mem[key] = nil
}
// Delete implements Store interface. Never returns an error.
@ -87,18 +82,15 @@ func (s *MemoryStore) Delete(key []byte) error {
// PutBatch implements the Store interface. Never returns an error.
func (s *MemoryStore) PutBatch(batch Batch) error {
b := batch.(*MemoryBatch)
return s.PutChangeSet(b.mem, b.del)
return s.PutChangeSet(b.mem)
}
// PutChangeSet implements the Store interface. Never returns an error.
func (s *MemoryStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
func (s *MemoryStore) PutChangeSet(puts map[string][]byte) error {
s.mut.Lock()
for k := range puts {
s.put(k, puts[k])
}
for k := range dels {
s.drop(k)
}
s.mut.Unlock()
return nil
}
@ -120,11 +112,6 @@ func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
f([]byte(k), v)
}
}
for k := range s.del {
if strings.HasPrefix(k, sk) {
f([]byte(k), nil)
}
}
}
// seek is an internal unlocked implementation of Seek. `start` denotes whether
@ -151,7 +138,7 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) {
}
for k, v := range s.mem {
if isKeyOK(k) {
if v != nil && isKeyOK(k) {
memList = append(memList, KeyValue{
Key: []byte(k),
Value: v,
@ -182,7 +169,6 @@ func newMemoryBatch() *MemoryBatch {
// error.
func (s *MemoryStore) Close() error {
s.mut.Lock()
s.del = nil
s.mem = nil
s.mut.Unlock()
return nil

View file

@ -88,7 +88,7 @@ type (
Put(k, v []byte) error
PutBatch(Batch) error
// PutChangeSet allows to push prepared changeset to the Store.
PutChangeSet(puts map[string][]byte, dels map[string]bool) error
PutChangeSet(puts map[string][]byte) error
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
// Seek continues iteration until false is returned from f.
// Key and value slices should not be modified.