Compare commits
6 commits
master
...
btree-mems
Author | SHA1 | Date | |
---|---|---|---|
|
b224957677 | ||
|
85cc5f4247 | ||
|
afe08fde3e | ||
|
b94da149bb | ||
|
4058cb30ba | ||
|
cd5810d6cf |
17 changed files with 208 additions and 207 deletions
1
go.mod
1
go.mod
|
@ -5,6 +5,7 @@ require (
|
|||
github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db
|
||||
github.com/btcsuite/btcd v0.22.0-beta
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/google/btree v1.0.1
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/hashicorp/golang-lru v0.5.4
|
||||
github.com/holiman/uint256 v1.2.0
|
||||
|
|
2
go.sum
2
go.sum
|
@ -111,6 +111,8 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l
|
|||
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
|
||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
|
||||
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
|
||||
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
|
|
|
@ -1116,7 +1116,7 @@ func TestVerifyHashAgainstScript(t *testing.T) {
|
|||
bc := newTestChain(t)
|
||||
|
||||
cs, csInvalid := getTestContractState(t, 4, 5, random.Uint160()) // sender and IDs are not important for the test
|
||||
ic := bc.newInteropContext(trigger.Verification, bc.dao, nil, nil)
|
||||
ic := bc.newInteropContext(trigger.Verification, bc.dao.GetWrapped(), nil, nil)
|
||||
require.NoError(t, bc.contracts.Management.PutContractState(bc.dao, cs))
|
||||
require.NoError(t, bc.contracts.Management.PutContractState(bc.dao, csInvalid))
|
||||
|
||||
|
@ -1373,22 +1373,14 @@ func TestGetClaimable(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestClose(t *testing.T) {
|
||||
defer func() {
|
||||
r := recover()
|
||||
assert.NotNil(t, r)
|
||||
}()
|
||||
bc := initTestChain(t, nil, nil)
|
||||
go bc.Run()
|
||||
hash0 := bc.GetHeaderHash(0)
|
||||
_, err := bc.genBlocks(10)
|
||||
require.NoError(t, err)
|
||||
bc.Close()
|
||||
// It's a hack, but we use internal knowledge of MemoryStore
|
||||
// implementation which makes it completely unusable (up to panicing)
|
||||
// after Close().
|
||||
_ = bc.dao.Store.Put([]byte{0}, []byte{1})
|
||||
|
||||
// This should never be executed.
|
||||
assert.Nil(t, t)
|
||||
_, err = bc.GetBlock(hash0) // DB is closed, so this will fail.
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestSubscriptions(t *testing.T) {
|
||||
|
|
|
@ -50,6 +50,7 @@ type DAO interface {
|
|||
GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.StorageItemWithKey, error)
|
||||
GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error)
|
||||
GetVersion() (Version, error)
|
||||
GetCloned() DAO
|
||||
GetWrapped() DAO
|
||||
HasTransaction(hash util.Uint256) error
|
||||
Persist() (int, error)
|
||||
|
@ -102,6 +103,15 @@ func (dao *Simple) GetWrapped() DAO {
|
|||
return d
|
||||
}
|
||||
|
||||
// GetCloned returns new DAO instance with shared trie of MemCachedStore, use it for
|
||||
// the latest layer that either doesn't need to Persist, or Persists to another well-known
|
||||
// non-shared (!) layer.
|
||||
func (dao *Simple) GetCloned() DAO {
|
||||
d := *dao
|
||||
d.Store = dao.Store.Clone()
|
||||
return &d
|
||||
}
|
||||
|
||||
// GetAndDecode performs get operation and decoding with serializable structures.
|
||||
func (dao *Simple) GetAndDecode(entity io.Serializable, key []byte) error {
|
||||
entityBytes, err := dao.Store.Get(key)
|
||||
|
|
|
@ -71,7 +71,7 @@ func NewContext(trigger trigger.Type, bc Ledger, d dao.DAO,
|
|||
getContract func(dao.DAO, util.Uint160) (*state.Contract, error), natives []Contract,
|
||||
block *block.Block, tx *transaction.Transaction, log *zap.Logger) *Context {
|
||||
baseExecFee := int64(DefaultBaseExecFee)
|
||||
dao := d.GetWrapped()
|
||||
dao := d.GetCloned()
|
||||
|
||||
if bc != nil && (block == nil || block.Index != 0) {
|
||||
baseExecFee = bc.GetBaseExecFee()
|
||||
|
|
|
@ -274,7 +274,7 @@ func TestTrie_PutBatchHash(t *testing.T) {
|
|||
}
|
||||
tr1.Collapse(1)
|
||||
tr2.Collapse(1)
|
||||
key := makeStorageKey(tr1.root.(*BranchNode).Children[2].Hash().BytesBE())
|
||||
key := makeStorageKey(tr1.root.(*BranchNode).Children[2].Hash())
|
||||
require.NoError(t, tr1.Store.Delete(key))
|
||||
require.NoError(t, tr2.Store.Delete(key))
|
||||
testIncompletePut(t, ps, 1, tr1, tr2)
|
||||
|
|
|
@ -177,7 +177,7 @@ func (b *Billet) putIntoHash(curr *HashNode, path []byte, val Node) (Node, error
|
|||
}
|
||||
|
||||
func (b *Billet) incrementRefAndStore(h util.Uint256, bs []byte) {
|
||||
key := makeStorageKey(h.BytesBE())
|
||||
key := makeStorageKey(h)
|
||||
if b.refcountEnabled {
|
||||
var (
|
||||
err error
|
||||
|
@ -325,7 +325,7 @@ func (b *Billet) tryCollapseBranch(curr *BranchNode) Node {
|
|||
|
||||
// GetFromStore returns MPT node from the storage.
|
||||
func (b *Billet) GetFromStore(h util.Uint256) (Node, error) {
|
||||
data, err := b.Store.Get(makeStorageKey(h.BytesBE()))
|
||||
data, err := b.Store.Get(makeStorageKey(h))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ func TestBillet_RestoreHashNode(t *testing.T) {
|
|||
_ = expectedRoot.Hash()
|
||||
_ = tr.root.Hash()
|
||||
require.Equal(t, expectedRoot, tr.root)
|
||||
expectedBytes, err := tr.Store.Get(makeStorageKey(expectedNode.Hash().BytesBE()))
|
||||
expectedBytes, err := tr.Store.Get(makeStorageKey(expectedNode.Hash()))
|
||||
if expectedRefCount != 0 {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedRefCount, binary.LittleEndian.Uint32(expectedBytes[len(expectedBytes)-4:]))
|
||||
|
|
|
@ -70,7 +70,7 @@ func VerifyProof(rh util.Uint256, key []byte, proofs [][]byte) ([]byte, bool) {
|
|||
for i := range proofs {
|
||||
h := hash.DoubleSha256(proofs[i])
|
||||
// no errors in Put to memory store
|
||||
_ = tr.Store.Put(makeStorageKey(h[:]), proofs[i])
|
||||
_ = tr.Store.Put(makeStorageKey(h), proofs[i])
|
||||
}
|
||||
_, leaf, _, err := tr.getWithPath(tr.root, path, true)
|
||||
if err != nil {
|
||||
|
|
|
@ -372,8 +372,8 @@ func (t *Trie) StateRoot() util.Uint256 {
|
|||
return t.root.Hash()
|
||||
}
|
||||
|
||||
func makeStorageKey(mptKey []byte) []byte {
|
||||
return append([]byte{byte(storage.DataMPT)}, mptKey...)
|
||||
func makeStorageKey(mptKey util.Uint256) []byte {
|
||||
return append([]byte{byte(storage.DataMPT)}, mptKey[:]...)
|
||||
}
|
||||
|
||||
// Flush puts every node in the trie except Hash ones to the storage.
|
||||
|
@ -392,7 +392,7 @@ func (t *Trie) Flush() {
|
|||
delete(t.refcount, h)
|
||||
}
|
||||
} else if node.refcount > 0 {
|
||||
_ = t.Store.Put(makeStorageKey(h.BytesBE()), node.bytes)
|
||||
_ = t.Store.Put(makeStorageKey(h), node.bytes)
|
||||
}
|
||||
node.refcount = 0
|
||||
} else {
|
||||
|
@ -407,7 +407,7 @@ func (t *Trie) updateRefCount(h util.Uint256) int32 {
|
|||
panic("`updateRefCount` is called, but GC is disabled")
|
||||
}
|
||||
var data []byte
|
||||
key := makeStorageKey(h.BytesBE())
|
||||
key := makeStorageKey(h)
|
||||
node := t.refcount[h]
|
||||
cnt := node.initial
|
||||
if cnt == 0 {
|
||||
|
@ -466,7 +466,7 @@ func (t *Trie) removeRef(h util.Uint256, bs []byte) {
|
|||
}
|
||||
|
||||
func (t *Trie) getFromStore(h util.Uint256) (Node, error) {
|
||||
data, err := t.Store.Get(makeStorageKey(h.BytesBE()))
|
||||
data, err := t.Store.Get(makeStorageKey(h))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -251,7 +251,7 @@ func (tr *Trie) putToStore(n Node) {
|
|||
}
|
||||
tr.updateRefCount(n.Hash())
|
||||
} else {
|
||||
_ = tr.Store.Put(makeStorageKey(n.Hash().BytesBE()), n.Bytes())
|
||||
_ = tr.Store.Put(makeStorageKey(n.Hash()), n.Bytes())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/google/btree"
|
||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
|
@ -85,26 +86,25 @@ func (s *BoltDBStore) Delete(key []byte) error {
|
|||
// PutBatch implements the Store interface.
|
||||
func (s *BoltDBStore) PutBatch(batch Batch) error {
|
||||
memBatch := batch.(*MemoryBatch)
|
||||
return s.PutChangeSet(memBatch.mem, memBatch.del)
|
||||
return s.PutChangeSet(&memBatch.mem)
|
||||
}
|
||||
|
||||
// PutChangeSet implements the Store interface.
|
||||
func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
|
||||
func (s *BoltDBStore) PutChangeSet(puts *btree.BTree) error {
|
||||
var err error
|
||||
|
||||
return s.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(Bucket)
|
||||
for k, v := range puts {
|
||||
err := b.Put([]byte(k), v)
|
||||
if err != nil {
|
||||
return err
|
||||
puts.Ascend(func(i btree.Item) bool {
|
||||
kv := i.(KeyValue)
|
||||
if kv.Value != nil {
|
||||
err = b.Put(kv.Key, kv.Value)
|
||||
} else {
|
||||
err = b.Delete(kv.Key)
|
||||
}
|
||||
}
|
||||
for k := range dels {
|
||||
err := b.Delete([]byte(k))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return err == nil
|
||||
})
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"github.com/google/btree"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/filter"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
|
@ -62,24 +63,23 @@ func (s *LevelDBStore) PutBatch(batch Batch) error {
|
|||
}
|
||||
|
||||
// PutChangeSet implements the Store interface.
|
||||
func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
|
||||
func (s *LevelDBStore) PutChangeSet(puts *btree.BTree) error {
|
||||
tx, err := s.db.OpenTransaction()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for k := range puts {
|
||||
err = tx.Put([]byte(k), puts[k], nil)
|
||||
if err != nil {
|
||||
tx.Discard()
|
||||
return err
|
||||
}
|
||||
}
|
||||
for k := range dels {
|
||||
err = tx.Delete([]byte(k), nil)
|
||||
if err != nil {
|
||||
tx.Discard()
|
||||
return err
|
||||
puts.Ascend(func(i btree.Item) bool {
|
||||
kv := i.(KeyValue)
|
||||
if kv.Value != nil {
|
||||
err = tx.Put(kv.Key, kv.Value, nil)
|
||||
} else {
|
||||
err = tx.Delete(kv.Key, nil)
|
||||
}
|
||||
return err == nil
|
||||
})
|
||||
if err != nil {
|
||||
tx.Discard()
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
|
|
@ -3,10 +3,9 @@ package storage
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/google/btree"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||
)
|
||||
|
||||
|
@ -15,6 +14,9 @@ import (
|
|||
type MemCachedStore struct {
|
||||
MemoryStore
|
||||
|
||||
// lowerTrie stores lower level MemCachedStore trie for cloned MemCachedStore,
|
||||
// which allows for much more efficient Persist.
|
||||
lowerTrie *btree.BTree
|
||||
// plock protects Persist from double entrance.
|
||||
plock sync.Mutex
|
||||
// Persistent Store.
|
||||
|
@ -43,6 +45,11 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
// Less implements btree.Item interface.
|
||||
func (kv KeyValue) Less(other btree.Item) bool {
|
||||
return bytes.Compare(kv.Key, other.(KeyValue).Key) < 0
|
||||
}
|
||||
|
||||
// NewMemCachedStore creates a new MemCachedStore object.
|
||||
func NewMemCachedStore(lower Store) *MemCachedStore {
|
||||
return &MemCachedStore{
|
||||
|
@ -51,16 +58,27 @@ func NewMemCachedStore(lower Store) *MemCachedStore {
|
|||
}
|
||||
}
|
||||
|
||||
// NewClonedMemCachedStore creates a cloned MemCachedStore which shares the trie
|
||||
// with another MemCachedStore (until you write into it).
|
||||
func (s *MemCachedStore) Clone() *MemCachedStore {
|
||||
return &MemCachedStore{
|
||||
MemoryStore: MemoryStore{mem: *s.mem.Clone()}, // Shared COW trie.
|
||||
lowerTrie: &s.mem,
|
||||
ps: s.ps, // But the same PS.
|
||||
}
|
||||
}
|
||||
|
||||
// Get implements the Store interface.
|
||||
func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
|
||||
s.mut.RLock()
|
||||
defer s.mut.RUnlock()
|
||||
k := string(key)
|
||||
if val, ok := s.mem[k]; ok {
|
||||
return val, nil
|
||||
}
|
||||
if _, ok := s.del[k]; ok {
|
||||
return nil, ErrKeyNotFound
|
||||
itm := s.mem.Get(KeyValue{Key: key})
|
||||
if itm != nil {
|
||||
kv := itm.(KeyValue)
|
||||
if kv.Value == nil {
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
return kv.Value, nil
|
||||
}
|
||||
return s.ps.Get(key)
|
||||
}
|
||||
|
@ -72,20 +90,18 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
|
|||
|
||||
var b MemBatch
|
||||
|
||||
b.Put = make([]KeyValueExists, 0, len(s.mem))
|
||||
for k, v := range s.mem {
|
||||
key := []byte(k)
|
||||
_, err := s.ps.Get(key)
|
||||
b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil})
|
||||
}
|
||||
|
||||
b.Deleted = make([]KeyValueExists, 0, len(s.del))
|
||||
for k := range s.del {
|
||||
key := []byte(k)
|
||||
_, err := s.ps.Get(key)
|
||||
b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil})
|
||||
}
|
||||
|
||||
b.Put = make([]KeyValueExists, 0, s.mem.Len())
|
||||
b.Deleted = make([]KeyValueExists, 0)
|
||||
s.mem.Ascend(func(i btree.Item) bool {
|
||||
kv := i.(KeyValue)
|
||||
_, err := s.ps.Get(kv.Key)
|
||||
if kv.Value == nil {
|
||||
b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: kv, Exists: err == nil})
|
||||
} else {
|
||||
b.Put = append(b.Put, KeyValueExists{KeyValue: kv, Exists: err == nil})
|
||||
}
|
||||
return true
|
||||
})
|
||||
return &b
|
||||
}
|
||||
|
||||
|
@ -119,41 +135,10 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix
|
|||
// and point to start seeking from. Backwards seeking from some point is supported
|
||||
// with corresponding `rng` field set.
|
||||
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) {
|
||||
// Create memory store `mem` and `del` snapshot not to hold the lock.
|
||||
var memRes []KeyValueExists
|
||||
sPrefix := string(rng.Prefix)
|
||||
lPrefix := len(sPrefix)
|
||||
sStart := string(rng.Start)
|
||||
lStart := len(sStart)
|
||||
isKeyOK := func(key string) bool {
|
||||
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0)
|
||||
}
|
||||
if rng.Backwards {
|
||||
isKeyOK = func(key string) bool {
|
||||
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0)
|
||||
}
|
||||
}
|
||||
lPrefix := len(rng.Prefix)
|
||||
|
||||
s.mut.RLock()
|
||||
for k, v := range s.MemoryStore.mem {
|
||||
if isKeyOK(k) {
|
||||
memRes = append(memRes, KeyValueExists{
|
||||
KeyValue: KeyValue{
|
||||
Key: []byte(k),
|
||||
Value: v,
|
||||
},
|
||||
Exists: true,
|
||||
})
|
||||
}
|
||||
}
|
||||
for k := range s.MemoryStore.del {
|
||||
if isKeyOK(k) {
|
||||
memRes = append(memRes, KeyValueExists{
|
||||
KeyValue: KeyValue{
|
||||
Key: []byte(k),
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
var memRes = s.getSeekPairs(rng)
|
||||
ps := s.ps
|
||||
s.mut.RUnlock()
|
||||
|
||||
|
@ -161,15 +146,11 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
|||
res := bytes.Compare(k1, k2)
|
||||
return res != 0 && rng.Backwards == (res > 0)
|
||||
}
|
||||
// Sort memRes items for further comparison with ps items.
|
||||
sort.Slice(memRes, func(i, j int) bool {
|
||||
return less(memRes[i].Key, memRes[j].Key)
|
||||
})
|
||||
|
||||
var (
|
||||
done bool
|
||||
iMem int
|
||||
kvMem KeyValueExists
|
||||
kvMem KeyValue
|
||||
haveMem bool
|
||||
)
|
||||
if iMem < len(memRes) {
|
||||
|
@ -195,7 +176,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
|||
default:
|
||||
var isMem = haveMem && less(kvMem.Key, kvPs.Key)
|
||||
if isMem {
|
||||
if kvMem.Exists {
|
||||
if kvMem.Value != nil {
|
||||
if cutPrefix {
|
||||
kvMem.Key = kvMem.Key[lPrefix:]
|
||||
}
|
||||
|
@ -236,7 +217,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
|||
break loop
|
||||
default:
|
||||
kvMem = memRes[i]
|
||||
if kvMem.Exists {
|
||||
if kvMem.Value != nil {
|
||||
if cutPrefix {
|
||||
kvMem.Key = kvMem.Key[lPrefix:]
|
||||
}
|
||||
|
@ -265,15 +246,21 @@ func (s *MemCachedStore) PersistSync() (int, error) {
|
|||
|
||||
func (s *MemCachedStore) persist(isSync bool) (int, error) {
|
||||
var err error
|
||||
var keys, dkeys int
|
||||
var keys int
|
||||
|
||||
s.plock.Lock()
|
||||
defer s.plock.Unlock()
|
||||
s.mut.Lock()
|
||||
|
||||
keys = len(s.mem)
|
||||
dkeys = len(s.del)
|
||||
if keys == 0 && dkeys == 0 {
|
||||
if s.lowerTrie != nil {
|
||||
keys = s.mem.Len() - s.lowerTrie.Len()
|
||||
*s.lowerTrie = s.mem
|
||||
s.mut.Unlock()
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
keys = s.mem.Len()
|
||||
if keys == 0 {
|
||||
s.mut.Unlock()
|
||||
return 0, nil
|
||||
}
|
||||
|
@ -282,15 +269,14 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) {
|
|||
// starts using fresh new maps. This tempstore is only known here and
|
||||
// nothing ever changes it, therefore accesses to it (reads) can go
|
||||
// unprotected while writes are handled by s proper.
|
||||
var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, del: s.del}, ps: s.ps}
|
||||
var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem}, ps: s.ps}
|
||||
s.ps = tempstore
|
||||
s.mem = make(map[string][]byte, len(s.mem))
|
||||
s.del = make(map[string]bool, len(s.del))
|
||||
s.mem = *btree.New(8)
|
||||
if !isSync {
|
||||
s.mut.Unlock()
|
||||
}
|
||||
|
||||
err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del)
|
||||
err = tempstore.ps.PutChangeSet(&tempstore.mem)
|
||||
|
||||
if !isSync {
|
||||
s.mut.Lock()
|
||||
|
@ -303,15 +289,10 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) {
|
|||
} else {
|
||||
// We're toast. We'll try to still keep proper state, but OOM
|
||||
// killer will get to us eventually.
|
||||
for k := range s.mem {
|
||||
tempstore.put(k, s.mem[k])
|
||||
}
|
||||
for k := range s.del {
|
||||
tempstore.drop(k)
|
||||
}
|
||||
s.ps = tempstore.ps
|
||||
s.mem = tempstore.mem
|
||||
s.del = tempstore.del
|
||||
s.mem.Ascend(func(i btree.Item) bool {
|
||||
tempstore.put(i.(KeyValue))
|
||||
return true
|
||||
})
|
||||
}
|
||||
s.mut.Unlock()
|
||||
return keys, err
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/google/btree"
|
||||
"github.com/nspcc-dev/neo-go/internal/random"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -74,7 +75,7 @@ func testMemCachedStorePersist(t *testing.T, ps Store) {
|
|||
c, err = ts.Persist()
|
||||
checkBatch(t, ts, nil, nil)
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, 0, c)
|
||||
assert.Equal(t, 1, c)
|
||||
v, err = ps.Get([]byte("key"))
|
||||
assert.Equal(t, ErrKeyNotFound, err)
|
||||
assert.Equal(t, []byte(nil), v)
|
||||
|
@ -243,8 +244,8 @@ func BenchmarkCachedSeek(t *testing.B) {
|
|||
"MemPS": func(t testing.TB) Store {
|
||||
return NewMemoryStore()
|
||||
},
|
||||
"BoltPS": newBoltStoreForTesting,
|
||||
"LevelPS": newLevelDBForTesting,
|
||||
// "BoltPS": newBoltStoreForTesting,
|
||||
// "LevelPS": newLevelDBForTesting,
|
||||
}
|
||||
for psName, newPS := range stores {
|
||||
for psCount := 100; psCount <= 10000; psCount *= 10 {
|
||||
|
@ -287,7 +288,7 @@ func (b *BadStore) Put(k, v []byte) error {
|
|||
func (b *BadStore) PutBatch(Batch) error {
|
||||
return nil
|
||||
}
|
||||
func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string]bool) error {
|
||||
func (b *BadStore) PutChangeSet(_ *btree.BTree) error {
|
||||
b.onPutBatch()
|
||||
return ErrKeyNotFound
|
||||
}
|
||||
|
|
|
@ -2,10 +2,9 @@ package storage
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/google/btree"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||
)
|
||||
|
||||
|
@ -13,9 +12,7 @@ import (
|
|||
// used for testing. Do not use MemoryStore in production.
|
||||
type MemoryStore struct {
|
||||
mut sync.RWMutex
|
||||
mem map[string][]byte
|
||||
// A map, not a slice, to avoid duplicates.
|
||||
del map[string]bool
|
||||
mem btree.BTree
|
||||
}
|
||||
|
||||
// MemoryBatch is an in-memory batch compatible with MemoryStore.
|
||||
|
@ -25,19 +22,18 @@ type MemoryBatch struct {
|
|||
|
||||
// Put implements the Batch interface.
|
||||
func (b *MemoryBatch) Put(k, v []byte) {
|
||||
b.MemoryStore.put(string(k), slice.Copy(v))
|
||||
b.MemoryStore.put(dupKV(k, v))
|
||||
}
|
||||
|
||||
// Delete implements Batch interface.
|
||||
func (b *MemoryBatch) Delete(k []byte) {
|
||||
b.MemoryStore.drop(string(k))
|
||||
b.MemoryStore.drop(slice.Copy(k))
|
||||
}
|
||||
|
||||
// NewMemoryStore creates a new MemoryStore object.
|
||||
func NewMemoryStore() *MemoryStore {
|
||||
return &MemoryStore{
|
||||
mem: make(map[string][]byte),
|
||||
del: make(map[string]bool),
|
||||
mem: *btree.New(16),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -45,39 +41,40 @@ func NewMemoryStore() *MemoryStore {
|
|||
func (s *MemoryStore) Get(key []byte) ([]byte, error) {
|
||||
s.mut.RLock()
|
||||
defer s.mut.RUnlock()
|
||||
if val, ok := s.mem[string(key)]; ok {
|
||||
return val, nil
|
||||
itm := s.mem.Get(KeyValue{Key: key})
|
||||
if itm != nil {
|
||||
kv := itm.(KeyValue)
|
||||
if kv.Value != nil {
|
||||
return kv.Value, nil
|
||||
}
|
||||
}
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
|
||||
// put puts a key-value pair into the store, it's supposed to be called
|
||||
// with mutex locked.
|
||||
func (s *MemoryStore) put(key string, value []byte) {
|
||||
s.mem[key] = value
|
||||
delete(s.del, key)
|
||||
func (s *MemoryStore) put(kv KeyValue) {
|
||||
_ = s.mem.ReplaceOrInsert(kv)
|
||||
}
|
||||
|
||||
// Put implements the Store interface. Never returns an error.
|
||||
func (s *MemoryStore) Put(key, value []byte) error {
|
||||
newKey := string(key)
|
||||
vcopy := slice.Copy(value)
|
||||
kv := dupKV(key, value)
|
||||
s.mut.Lock()
|
||||
s.put(newKey, vcopy)
|
||||
s.put(kv)
|
||||
s.mut.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// drop deletes a key-value pair from the store, it's supposed to be called
|
||||
// with mutex locked.
|
||||
func (s *MemoryStore) drop(key string) {
|
||||
s.del[key] = true
|
||||
delete(s.mem, key)
|
||||
func (s *MemoryStore) drop(key []byte) {
|
||||
s.put(KeyValue{Key: key})
|
||||
}
|
||||
|
||||
// Delete implements Store interface. Never returns an error.
|
||||
func (s *MemoryStore) Delete(key []byte) error {
|
||||
newKey := string(key)
|
||||
newKey := slice.Copy(key)
|
||||
s.mut.Lock()
|
||||
s.drop(newKey)
|
||||
s.mut.Unlock()
|
||||
|
@ -87,18 +84,16 @@ func (s *MemoryStore) Delete(key []byte) error {
|
|||
// PutBatch implements the Store interface. Never returns an error.
|
||||
func (s *MemoryStore) PutBatch(batch Batch) error {
|
||||
b := batch.(*MemoryBatch)
|
||||
return s.PutChangeSet(b.mem, b.del)
|
||||
return s.PutChangeSet(&b.mem)
|
||||
}
|
||||
|
||||
// PutChangeSet implements the Store interface. Never returns an error.
|
||||
func (s *MemoryStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
|
||||
func (s *MemoryStore) PutChangeSet(puts *btree.BTree) error {
|
||||
s.mut.Lock()
|
||||
for k := range puts {
|
||||
s.put(k, puts[k])
|
||||
}
|
||||
for k := range dels {
|
||||
s.drop(k)
|
||||
}
|
||||
puts.Ascend(func(i btree.Item) bool {
|
||||
s.put(i.(KeyValue))
|
||||
return true
|
||||
})
|
||||
s.mut.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
@ -114,54 +109,59 @@ func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
|||
func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
|
||||
s.mut.RLock()
|
||||
defer s.mut.RUnlock()
|
||||
sk := string(key)
|
||||
for k, v := range s.mem {
|
||||
if strings.HasPrefix(k, sk) {
|
||||
f([]byte(k), v)
|
||||
}
|
||||
}
|
||||
for k := range s.del {
|
||||
if strings.HasPrefix(k, sk) {
|
||||
f([]byte(k), nil)
|
||||
s.mem.AscendGreaterOrEqual(KeyValue{Key: key}, func(i btree.Item) bool {
|
||||
kv := i.(KeyValue)
|
||||
if !bytes.HasPrefix(kv.Key, key) {
|
||||
return false
|
||||
}
|
||||
f(kv.Key, kv.Value)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// getSeekPairs returns KV pairs for current Seek.
|
||||
func (s *MemoryStore) getSeekPairs(rng SeekRange) []KeyValue {
|
||||
lPrefix := len(rng.Prefix)
|
||||
lStart := len(rng.Start)
|
||||
|
||||
var pivot KeyValue
|
||||
pivot.Key = make([]byte, lPrefix+lStart, lPrefix+lStart+1)
|
||||
copy(pivot.Key, rng.Prefix)
|
||||
if lStart != 0 {
|
||||
copy(pivot.Key[lPrefix:], rng.Start)
|
||||
}
|
||||
|
||||
var memList []KeyValue
|
||||
var appender = func(i btree.Item) bool {
|
||||
kv := i.(KeyValue)
|
||||
if !bytes.HasPrefix(kv.Key, rng.Prefix) {
|
||||
return false
|
||||
}
|
||||
memList = append(memList, kv)
|
||||
return true
|
||||
}
|
||||
|
||||
if !rng.Backwards {
|
||||
s.mem.AscendGreaterOrEqual(pivot, appender)
|
||||
} else {
|
||||
if lStart != 0 {
|
||||
pivot.Key = append(pivot.Key, 0) // Right after the start key.
|
||||
s.mem.AscendRange(KeyValue{Key: rng.Prefix}, pivot, appender)
|
||||
} else {
|
||||
s.mem.AscendGreaterOrEqual(KeyValue{Key: rng.Prefix}, appender)
|
||||
}
|
||||
for i, j := 0, len(memList)-1; i <= j; i, j = i+1, j-1 {
|
||||
memList[i], memList[j] = memList[j], memList[i]
|
||||
}
|
||||
}
|
||||
return memList
|
||||
}
|
||||
|
||||
// seek is an internal unlocked implementation of Seek. `start` denotes whether
|
||||
// seeking starting from the provided prefix should be performed. Backwards
|
||||
// seeking from some point is supported with corresponding SeekRange field set.
|
||||
func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||
sPrefix := string(rng.Prefix)
|
||||
lPrefix := len(sPrefix)
|
||||
sStart := string(rng.Start)
|
||||
lStart := len(sStart)
|
||||
var memList []KeyValue
|
||||
|
||||
isKeyOK := func(key string) bool {
|
||||
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0)
|
||||
}
|
||||
if rng.Backwards {
|
||||
isKeyOK = func(key string) bool {
|
||||
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0)
|
||||
}
|
||||
}
|
||||
less := func(k1, k2 []byte) bool {
|
||||
res := bytes.Compare(k1, k2)
|
||||
return res != 0 && rng.Backwards == (res > 0)
|
||||
}
|
||||
|
||||
for k, v := range s.mem {
|
||||
if isKeyOK(k) {
|
||||
memList = append(memList, KeyValue{
|
||||
Key: []byte(k),
|
||||
Value: v,
|
||||
})
|
||||
}
|
||||
}
|
||||
sort.Slice(memList, func(i, j int) bool {
|
||||
return less(memList[i].Key, memList[j].Key)
|
||||
})
|
||||
for _, kv := range memList {
|
||||
for _, kv := range s.getSeekPairs(rng) {
|
||||
if !f(kv.Key, kv.Value) {
|
||||
break
|
||||
}
|
||||
|
@ -182,8 +182,20 @@ func newMemoryBatch() *MemoryBatch {
|
|||
// error.
|
||||
func (s *MemoryStore) Close() error {
|
||||
s.mut.Lock()
|
||||
s.del = nil
|
||||
s.mem = nil
|
||||
s.mem.Clear(false)
|
||||
s.mut.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func dupKV(key []byte, value []byte) KeyValue {
|
||||
var res KeyValue
|
||||
|
||||
s := make([]byte, len(key)+len(value))
|
||||
copy(s, key)
|
||||
res.Key = s[:len(key)]
|
||||
if value != nil {
|
||||
copy(s[len(key):], value)
|
||||
res.Value = s[len(key):]
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/btree"
|
||||
)
|
||||
|
||||
// KeyPrefix constants.
|
||||
|
@ -88,7 +90,7 @@ type (
|
|||
Put(k, v []byte) error
|
||||
PutBatch(Batch) error
|
||||
// PutChangeSet allows to push prepared changeset to the Store.
|
||||
PutChangeSet(puts map[string][]byte, dels map[string]bool) error
|
||||
PutChangeSet(puts *btree.BTree) error
|
||||
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
||||
// Seek continues iteration until false is returned from f.
|
||||
// Key and value slices should not be modified.
|
||||
|
|
Loading…
Reference in a new issue