From b94da149bbe3282401bf322d7ff55f2de5456c8b Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 2 Feb 2022 14:30:46 +0300 Subject: [PATCH] storage: btree-based MemoryStore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit name old time/op new time/op delta CachedSeek/MemPS_10TSItems_100PSItems-8 123µs ± 5% 36µs ±10% -70.80% (p=0.008 n=5+5) CachedSeek/MemPS_100TSItems_100PSItems-8 191µs ± 3% 48µs ± 5% -74.78% (p=0.016 n=5+4) CachedSeek/MemPS_10TSItems_1000PSItems-8 1.59ms ± 2% 0.54ms ±13% -65.81% (p=0.008 n=5+5) CachedSeek/MemPS_100TSItems_1000PSItems-8 1.67ms ± 3% 0.59ms ± 4% -64.93% (p=0.008 n=5+5) CachedSeek/MemPS_1000TSItems_1000PSItems-8 2.66ms ± 3% 0.96ms ± 3% -63.93% (p=0.008 n=5+5) CachedSeek/MemPS_10TSItems_10000PSItems-8 23.8ms ± 3% 10.3ms ± 8% -56.50% (p=0.008 n=5+5) CachedSeek/MemPS_100TSItems_10000PSItems-8 23.8ms ± 1% 10.4ms ± 3% -56.38% (p=0.008 n=5+5) CachedSeek/MemPS_1000TSItems_10000PSItems-8 25.6ms ± 2% 10.6ms ± 2% -58.36% (p=0.008 n=5+5) CachedSeek/MemPS_10000TSItems_10000PSItems-8 39.9ms ± 1% 21.4ms ± 3% -46.39% (p=0.008 n=5+5) MemorySeek/10Elements-8 2.74µs ± 4% 1.44µs ± 1% -47.49% (p=0.008 n=5+5) MemorySeek/100Elements-8 31.5µs ± 3% 7.3µs ± 3% -76.71% (p=0.008 n=5+5) MemorySeek/1000Elements-8 395µs ± 2% 64µs ±16% -83.88% (p=0.008 n=5+5) MemorySeek/10000Elements-8 5.89ms ± 2% 2.34ms ±21% -60.26% (p=0.008 n=5+5) name old alloc/op new alloc/op delta CachedSeek/MemPS_10TSItems_100PSItems-8 64.4kB ± 0% 58.6kB ± 0% -9.02% (p=0.008 n=5+5) CachedSeek/MemPS_100TSItems_100PSItems-8 96.0kB ± 0% 80.1kB ± 0% -16.52% (p=0.008 n=5+5) CachedSeek/MemPS_10TSItems_1000PSItems-8 785kB ± 0% 736kB ± 0% -6.24% (p=0.008 n=5+5) CachedSeek/MemPS_100TSItems_1000PSItems-8 817kB ± 0% 758kB ± 0% -7.23% (p=0.000 n=5+4) CachedSeek/MemPS_1000TSItems_1000PSItems-8 1.27MB ± 0% 1.09MB ± 0% -14.64% (p=0.016 n=4+5) CachedSeek/MemPS_10TSItems_10000PSItems-8 9.56MB ± 0% 9.08MB ± 0% -5.03% (p=0.008 n=5+5) CachedSeek/MemPS_100TSItems_10000PSItems-8 9.59MB ± 0% 9.10MB ± 0% -5.12% (p=0.008 n=5+5) CachedSeek/MemPS_1000TSItems_10000PSItems-8 10.0MB ± 0% 9.4MB ± 0% -6.15% (p=0.016 n=4+5) CachedSeek/MemPS_10000TSItems_10000PSItems-8 15.1MB ± 0% 14.4MB ± 0% -5.07% (p=0.016 n=4+5) MemorySeek/10Elements-8 1.77kB ± 0% 1.54kB ± 0% -13.01% (p=0.008 n=5+5) MemorySeek/100Elements-8 14.0kB ± 0% 12.3kB ± 0% -11.96% (p=0.008 n=5+5) MemorySeek/1000Elements-8 114kB ± 0% 98kB ± 0% -14.05% (p=0.008 n=5+5) MemorySeek/10000Elements-8 2.72MB ± 0% 2.56MB ± 0% -5.88% (p=0.008 n=5+5) name old allocs/op new allocs/op delta CachedSeek/MemPS_10TSItems_100PSItems-8 948 ± 0% 626 ± 0% -33.97% (p=0.008 n=5+5) CachedSeek/MemPS_100TSItems_100PSItems-8 1.13k ± 0% 0.63k ± 0% -44.39% (p=0.008 n=5+5) CachedSeek/MemPS_10TSItems_1000PSItems-8 9.05k ± 0% 6.03k ± 0% -33.38% (p=0.008 n=5+5) CachedSeek/MemPS_100TSItems_1000PSItems-8 9.24k ± 0% 6.04k ± 0% -34.66% (p=0.008 n=5+5) CachedSeek/MemPS_1000TSItems_1000PSItems-8 11.0k ± 0% 6.0k ± 0% -45.30% (p=0.008 n=5+5) CachedSeek/MemPS_10TSItems_10000PSItems-8 90.1k ± 0% 60.0k ± 0% -33.33% (p=0.008 n=5+5) CachedSeek/MemPS_100TSItems_10000PSItems-8 90.2k ± 0% 60.0k ± 0% -33.47% (p=0.008 n=5+5) CachedSeek/MemPS_1000TSItems_10000PSItems-8 92.1k ± 0% 60.0k ± 0% -34.77% (p=0.008 n=5+5) CachedSeek/MemPS_10000TSItems_10000PSItems-8 110k ± 0% 60k ± 0% -45.43% (p=0.008 n=5+5) MemorySeek/10Elements-8 18.0 ± 0% 7.0 ± 0% -61.11% (p=0.008 n=5+5) MemorySeek/100Elements-8 111 ± 0% 10 ± 0% -90.99% (p=0.008 n=5+5) MemorySeek/1000Elements-8 1.01k ± 0% 0.01k ± 0% -98.72% (p=0.008 n=5+5) MemorySeek/10000Elements-8 10.0k ± 0% 0.0k ± 0% -99.77% (p=0.008 n=5+5) --- go.mod | 1 + go.sum | 2 + pkg/core/blockchain_test.go | 14 +-- pkg/core/storage/boltdb_store.go | 22 ++-- pkg/core/storage/leveldb_store.go | 21 ++-- pkg/core/storage/memcached_store.go | 87 ++++++-------- pkg/core/storage/memcached_store_test.go | 7 +- pkg/core/storage/memory_store.go | 140 ++++++++++++++--------- pkg/core/storage/store.go | 4 +- 9 files changed, 152 insertions(+), 146 deletions(-) diff --git a/go.mod b/go.mod index ec84a1455..c8b133337 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ require ( github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db github.com/btcsuite/btcd v0.22.0-beta github.com/davecgh/go-spew v1.1.1 + github.com/google/btree v1.0.1 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 github.com/holiman/uint256 v1.2.0 diff --git a/go.sum b/go.sum index 3023f9c2b..251c02f67 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= +github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= +github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 42c81bd38..492c74c1d 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -1373,22 +1373,14 @@ func TestGetClaimable(t *testing.T) { } func TestClose(t *testing.T) { - defer func() { - r := recover() - assert.NotNil(t, r) - }() bc := initTestChain(t, nil, nil) go bc.Run() + hash0 := bc.GetHeaderHash(0) _, err := bc.genBlocks(10) require.NoError(t, err) bc.Close() - // It's a hack, but we use internal knowledge of MemoryStore - // implementation which makes it completely unusable (up to panicing) - // after Close(). - _ = bc.dao.Store.Put([]byte{0}, []byte{1}) - - // This should never be executed. - assert.Nil(t, t) + _, err = bc.GetBlock(hash0) // DB is closed, so this will fail. + require.Error(t, err) } func TestSubscriptions(t *testing.T) { diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index 8f60aa5ee..d3bc6ee3c 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -5,6 +5,7 @@ import ( "fmt" "os" + "github.com/google/btree" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/syndtr/goleveldb/leveldb/util" @@ -85,26 +86,25 @@ func (s *BoltDBStore) Delete(key []byte) error { // PutBatch implements the Store interface. func (s *BoltDBStore) PutBatch(batch Batch) error { memBatch := batch.(*MemoryBatch) - return s.PutChangeSet(memBatch.mem) + return s.PutChangeSet(&memBatch.mem) } // PutChangeSet implements the Store interface. -func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error { +func (s *BoltDBStore) PutChangeSet(puts *btree.BTree) error { var err error return s.db.Batch(func(tx *bbolt.Tx) error { b := tx.Bucket(Bucket) - for k, v := range puts { - if v != nil { - err = b.Put([]byte(k), v) + puts.Ascend(func(i btree.Item) bool { + kv := i.(KeyValue) + if kv.Value != nil { + err = b.Put(kv.Key, kv.Value) } else { - err = b.Delete([]byte(k)) + err = b.Delete(kv.Key) } - if err != nil { - return err - } - } - return nil + return err == nil + }) + return err }) } diff --git a/pkg/core/storage/leveldb_store.go b/pkg/core/storage/leveldb_store.go index 6d5439711..814477edb 100644 --- a/pkg/core/storage/leveldb_store.go +++ b/pkg/core/storage/leveldb_store.go @@ -1,6 +1,7 @@ package storage import ( + "github.com/google/btree" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/opt" @@ -62,21 +63,23 @@ func (s *LevelDBStore) PutBatch(batch Batch) error { } // PutChangeSet implements the Store interface. -func (s *LevelDBStore) PutChangeSet(puts map[string][]byte) error { +func (s *LevelDBStore) PutChangeSet(puts *btree.BTree) error { tx, err := s.db.OpenTransaction() if err != nil { return err } - for k := range puts { - if puts[k] != nil { - err = tx.Put([]byte(k), puts[k], nil) + puts.Ascend(func(i btree.Item) bool { + kv := i.(KeyValue) + if kv.Value != nil { + err = tx.Put(kv.Key, kv.Value, nil) } else { - err = tx.Delete([]byte(k), nil) - } - if err != nil { - tx.Discard() - return err + err = tx.Delete(kv.Key, nil) } + return err == nil + }) + if err != nil { + tx.Discard() + return err } return tx.Commit() } diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 76b21c2a2..e922fc297 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -3,10 +3,9 @@ package storage import ( "bytes" "context" - "sort" - "strings" "sync" + "github.com/google/btree" "github.com/nspcc-dev/neo-go/pkg/util/slice" ) @@ -43,6 +42,11 @@ type ( } ) +// Less implements btree.Item interface. +func (kv KeyValue) Less(other btree.Item) bool { + return bytes.Compare(kv.Key, other.(KeyValue).Key) < 0 +} + // NewMemCachedStore creates a new MemCachedStore object. func NewMemCachedStore(lower Store) *MemCachedStore { return &MemCachedStore{ @@ -55,12 +59,13 @@ func NewMemCachedStore(lower Store) *MemCachedStore { func (s *MemCachedStore) Get(key []byte) ([]byte, error) { s.mut.RLock() defer s.mut.RUnlock() - k := string(key) - if val, ok := s.mem[k]; ok { - if val == nil { + itm := s.mem.Get(KeyValue{Key: key}) + if itm != nil { + kv := itm.(KeyValue) + if kv.Value == nil { return nil, ErrKeyNotFound } - return val, nil + return kv.Value, nil } return s.ps.Get(key) } @@ -72,17 +77,18 @@ func (s *MemCachedStore) GetBatch() *MemBatch { var b MemBatch - b.Put = make([]KeyValueExists, 0, len(s.mem)) + b.Put = make([]KeyValueExists, 0, s.mem.Len()) b.Deleted = make([]KeyValueExists, 0) - for k, v := range s.mem { - key := []byte(k) - _, err := s.ps.Get(key) - if v == nil { - b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil}) + s.mem.Ascend(func(i btree.Item) bool { + kv := i.(KeyValue) + _, err := s.ps.Get(kv.Key) + if kv.Value == nil { + b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: kv, Exists: err == nil}) } else { - b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil}) + b.Put = append(b.Put, KeyValueExists{KeyValue: kv, Exists: err == nil}) } - } + return true + }) return &b } @@ -116,32 +122,10 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix // and point to start seeking from. Backwards seeking from some point is supported // with corresponding `rng` field set. func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) { - // Create memory store `mem` and `del` snapshot not to hold the lock. - var memRes []KeyValueExists - sPrefix := string(rng.Prefix) - lPrefix := len(sPrefix) - sStart := string(rng.Start) - lStart := len(sStart) - isKeyOK := func(key string) bool { - return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0) - } - if rng.Backwards { - isKeyOK = func(key string) bool { - return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0) - } - } + lPrefix := len(rng.Prefix) + s.mut.RLock() - for k, v := range s.MemoryStore.mem { - if isKeyOK(k) { - memRes = append(memRes, KeyValueExists{ - KeyValue: KeyValue{ - Key: []byte(k), - Value: v, - }, - Exists: v != nil, - }) - } - } + var memRes = s.getSeekPairs(rng) ps := s.ps s.mut.RUnlock() @@ -149,15 +133,11 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool res := bytes.Compare(k1, k2) return res != 0 && rng.Backwards == (res > 0) } - // Sort memRes items for further comparison with ps items. - sort.Slice(memRes, func(i, j int) bool { - return less(memRes[i].Key, memRes[j].Key) - }) var ( done bool iMem int - kvMem KeyValueExists + kvMem KeyValue haveMem bool ) if iMem < len(memRes) { @@ -183,7 +163,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool default: var isMem = haveMem && less(kvMem.Key, kvPs.Key) if isMem { - if kvMem.Exists { + if kvMem.Value != nil { if cutPrefix { kvMem.Key = kvMem.Key[lPrefix:] } @@ -224,7 +204,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool break loop default: kvMem = memRes[i] - if kvMem.Exists { + if kvMem.Value != nil { if cutPrefix { kvMem.Key = kvMem.Key[lPrefix:] } @@ -259,7 +239,7 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { defer s.plock.Unlock() s.mut.Lock() - keys = len(s.mem) + keys = s.mem.Len() if keys == 0 { s.mut.Unlock() return 0, nil @@ -271,12 +251,12 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { // unprotected while writes are handled by s proper. var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem}, ps: s.ps} s.ps = tempstore - s.mem = make(map[string][]byte, len(s.mem)) + s.mem = *btree.New(8) if !isSync { s.mut.Unlock() } - err = tempstore.ps.PutChangeSet(tempstore.mem) + err = tempstore.ps.PutChangeSet(&tempstore.mem) if !isSync { s.mut.Lock() @@ -289,11 +269,10 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { } else { // We're toast. We'll try to still keep proper state, but OOM // killer will get to us eventually. - for k := range s.mem { - tempstore.put(k, s.mem[k]) - } - s.ps = tempstore.ps - s.mem = tempstore.mem + s.mem.Ascend(func(i btree.Item) bool { + tempstore.put(i.(KeyValue)) + return true + }) } s.mut.Unlock() return keys, err diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index 100942d7e..8bb86cc33 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -6,6 +6,7 @@ import ( "sort" "testing" + "github.com/google/btree" "github.com/nspcc-dev/neo-go/internal/random" "github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/stretchr/testify/assert" @@ -243,8 +244,8 @@ func BenchmarkCachedSeek(t *testing.B) { "MemPS": func(t testing.TB) Store { return NewMemoryStore() }, - "BoltPS": newBoltStoreForTesting, - "LevelPS": newLevelDBForTesting, + // "BoltPS": newBoltStoreForTesting, + // "LevelPS": newLevelDBForTesting, } for psName, newPS := range stores { for psCount := 100; psCount <= 10000; psCount *= 10 { @@ -287,7 +288,7 @@ func (b *BadStore) Put(k, v []byte) error { func (b *BadStore) PutBatch(Batch) error { return nil } -func (b *BadStore) PutChangeSet(_ map[string][]byte) error { +func (b *BadStore) PutChangeSet(_ *btree.BTree) error { b.onPutBatch() return ErrKeyNotFound } diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 1a5589aa6..9a27165c5 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -2,10 +2,9 @@ package storage import ( "bytes" - "sort" - "strings" "sync" + "github.com/google/btree" "github.com/nspcc-dev/neo-go/pkg/util/slice" ) @@ -13,7 +12,7 @@ import ( // used for testing. Do not use MemoryStore in production. type MemoryStore struct { mut sync.RWMutex - mem map[string][]byte + mem btree.BTree } // MemoryBatch is an in-memory batch compatible with MemoryStore. @@ -23,18 +22,18 @@ type MemoryBatch struct { // Put implements the Batch interface. func (b *MemoryBatch) Put(k, v []byte) { - b.MemoryStore.put(string(k), slice.Copy(v)) + b.MemoryStore.put(dupKV(k, v)) } // Delete implements Batch interface. func (b *MemoryBatch) Delete(k []byte) { - b.MemoryStore.drop(string(k)) + b.MemoryStore.drop(slice.Copy(k)) } // NewMemoryStore creates a new MemoryStore object. func NewMemoryStore() *MemoryStore { return &MemoryStore{ - mem: make(map[string][]byte), + mem: *btree.New(8), } } @@ -42,37 +41,40 @@ func NewMemoryStore() *MemoryStore { func (s *MemoryStore) Get(key []byte) ([]byte, error) { s.mut.RLock() defer s.mut.RUnlock() - if val, ok := s.mem[string(key)]; ok && val != nil { - return val, nil + itm := s.mem.Get(KeyValue{Key: key}) + if itm != nil { + kv := itm.(KeyValue) + if kv.Value != nil { + return kv.Value, nil + } } return nil, ErrKeyNotFound } // put puts a key-value pair into the store, it's supposed to be called // with mutex locked. -func (s *MemoryStore) put(key string, value []byte) { - s.mem[key] = value +func (s *MemoryStore) put(kv KeyValue) { + _ = s.mem.ReplaceOrInsert(kv) } // Put implements the Store interface. Never returns an error. func (s *MemoryStore) Put(key, value []byte) error { - newKey := string(key) - vcopy := slice.Copy(value) + kv := dupKV(key, value) s.mut.Lock() - s.put(newKey, vcopy) + s.put(kv) s.mut.Unlock() return nil } // drop deletes a key-value pair from the store, it's supposed to be called // with mutex locked. -func (s *MemoryStore) drop(key string) { - s.mem[key] = nil +func (s *MemoryStore) drop(key []byte) { + s.put(KeyValue{Key: key}) } // Delete implements Store interface. Never returns an error. func (s *MemoryStore) Delete(key []byte) error { - newKey := string(key) + newKey := slice.Copy(key) s.mut.Lock() s.drop(newKey) s.mut.Unlock() @@ -82,15 +84,16 @@ func (s *MemoryStore) Delete(key []byte) error { // PutBatch implements the Store interface. Never returns an error. func (s *MemoryStore) PutBatch(batch Batch) error { b := batch.(*MemoryBatch) - return s.PutChangeSet(b.mem) + return s.PutChangeSet(&b.mem) } // PutChangeSet implements the Store interface. Never returns an error. -func (s *MemoryStore) PutChangeSet(puts map[string][]byte) error { +func (s *MemoryStore) PutChangeSet(puts *btree.BTree) error { s.mut.Lock() - for k := range puts { - s.put(k, puts[k]) - } + puts.Ascend(func(i btree.Item) bool { + s.put(i.(KeyValue)) + return true + }) s.mut.Unlock() return nil } @@ -106,49 +109,59 @@ func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) { func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) { s.mut.RLock() defer s.mut.RUnlock() - sk := string(key) - for k, v := range s.mem { - if strings.HasPrefix(k, sk) { - f([]byte(k), v) + s.mem.AscendGreaterOrEqual(KeyValue{Key: key}, func(i btree.Item) bool { + kv := i.(KeyValue) + if !bytes.HasPrefix(kv.Key, key) { + return false + } + f(kv.Key, kv.Value) + return true + }) +} + +// getSeekPairs returns KV pairs for current Seek. +func (s *MemoryStore) getSeekPairs(rng SeekRange) []KeyValue { + lPrefix := len(rng.Prefix) + lStart := len(rng.Start) + + var pivot KeyValue + pivot.Key = make([]byte, lPrefix+lStart, lPrefix+lStart+1) + copy(pivot.Key, rng.Prefix) + if lStart != 0 { + copy(pivot.Key[lPrefix:], rng.Start) + } + + var memList []KeyValue + var appender = func(i btree.Item) bool { + kv := i.(KeyValue) + if !bytes.HasPrefix(kv.Key, rng.Prefix) { + return false + } + memList = append(memList, kv) + return true + } + + if !rng.Backwards { + s.mem.AscendGreaterOrEqual(pivot, appender) + } else { + if lStart != 0 { + pivot.Key = append(pivot.Key, 0) // Right after the start key. + s.mem.AscendRange(KeyValue{Key: rng.Prefix}, pivot, appender) + } else { + s.mem.AscendGreaterOrEqual(KeyValue{Key: rng.Prefix}, appender) + } + for i, j := 0, len(memList)-1; i <= j; i, j = i+1, j-1 { + memList[i], memList[j] = memList[j], memList[i] } } + return memList } // seek is an internal unlocked implementation of Seek. `start` denotes whether // seeking starting from the provided prefix should be performed. Backwards // seeking from some point is supported with corresponding SeekRange field set. func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) { - sPrefix := string(rng.Prefix) - lPrefix := len(sPrefix) - sStart := string(rng.Start) - lStart := len(sStart) - var memList []KeyValue - - isKeyOK := func(key string) bool { - return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0) - } - if rng.Backwards { - isKeyOK = func(key string) bool { - return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0) - } - } - less := func(k1, k2 []byte) bool { - res := bytes.Compare(k1, k2) - return res != 0 && rng.Backwards == (res > 0) - } - - for k, v := range s.mem { - if v != nil && isKeyOK(k) { - memList = append(memList, KeyValue{ - Key: []byte(k), - Value: v, - }) - } - } - sort.Slice(memList, func(i, j int) bool { - return less(memList[i].Key, memList[j].Key) - }) - for _, kv := range memList { + for _, kv := range s.getSeekPairs(rng) { if !f(kv.Key, kv.Value) { break } @@ -169,7 +182,20 @@ func newMemoryBatch() *MemoryBatch { // error. func (s *MemoryStore) Close() error { s.mut.Lock() - s.mem = nil + s.mem.Clear(false) s.mut.Unlock() return nil } + +func dupKV(key []byte, value []byte) KeyValue { + var res KeyValue + + s := make([]byte, len(key)+len(value)) + copy(s, key) + res.Key = s[:len(key)] + if value != nil { + copy(s[len(key):], value) + res.Value = s[len(key):] + } + return res +} diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index ff1997234..b97ab26ca 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -4,6 +4,8 @@ import ( "encoding/binary" "errors" "fmt" + + "github.com/google/btree" ) // KeyPrefix constants. @@ -88,7 +90,7 @@ type ( Put(k, v []byte) error PutBatch(Batch) error // PutChangeSet allows to push prepared changeset to the Store. - PutChangeSet(puts map[string][]byte) error + PutChangeSet(puts *btree.BTree) error // Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f. // Seek continues iteration until false is returned from f. // Key and value slices should not be modified.