storage: btree-based MemoryStore

name                                          old time/op    new time/op    delta
CachedSeek/MemPS_10TSItems_100PSItems-8          123µs ± 5%      36µs ±10%  -70.80%  (p=0.008 n=5+5)
CachedSeek/MemPS_100TSItems_100PSItems-8         191µs ± 3%      48µs ± 5%  -74.78%  (p=0.016 n=5+4)
CachedSeek/MemPS_10TSItems_1000PSItems-8        1.59ms ± 2%    0.54ms ±13%  -65.81%  (p=0.008 n=5+5)
CachedSeek/MemPS_100TSItems_1000PSItems-8       1.67ms ± 3%    0.59ms ± 4%  -64.93%  (p=0.008 n=5+5)
CachedSeek/MemPS_1000TSItems_1000PSItems-8      2.66ms ± 3%    0.96ms ± 3%  -63.93%  (p=0.008 n=5+5)
CachedSeek/MemPS_10TSItems_10000PSItems-8       23.8ms ± 3%    10.3ms ± 8%  -56.50%  (p=0.008 n=5+5)
CachedSeek/MemPS_100TSItems_10000PSItems-8      23.8ms ± 1%    10.4ms ± 3%  -56.38%  (p=0.008 n=5+5)
CachedSeek/MemPS_1000TSItems_10000PSItems-8     25.6ms ± 2%    10.6ms ± 2%  -58.36%  (p=0.008 n=5+5)
CachedSeek/MemPS_10000TSItems_10000PSItems-8    39.9ms ± 1%    21.4ms ± 3%  -46.39%  (p=0.008 n=5+5)
MemorySeek/10Elements-8                         2.74µs ± 4%    1.44µs ± 1%  -47.49%  (p=0.008 n=5+5)
MemorySeek/100Elements-8                        31.5µs ± 3%     7.3µs ± 3%  -76.71%  (p=0.008 n=5+5)
MemorySeek/1000Elements-8                        395µs ± 2%      64µs ±16%  -83.88%  (p=0.008 n=5+5)
MemorySeek/10000Elements-8                      5.89ms ± 2%    2.34ms ±21%  -60.26%  (p=0.008 n=5+5)

name                                          old alloc/op   new alloc/op   delta
CachedSeek/MemPS_10TSItems_100PSItems-8         64.4kB ± 0%    58.6kB ± 0%   -9.02%  (p=0.008 n=5+5)
CachedSeek/MemPS_100TSItems_100PSItems-8        96.0kB ± 0%    80.1kB ± 0%  -16.52%  (p=0.008 n=5+5)
CachedSeek/MemPS_10TSItems_1000PSItems-8         785kB ± 0%     736kB ± 0%   -6.24%  (p=0.008 n=5+5)
CachedSeek/MemPS_100TSItems_1000PSItems-8        817kB ± 0%     758kB ± 0%   -7.23%  (p=0.000 n=5+4)
CachedSeek/MemPS_1000TSItems_1000PSItems-8      1.27MB ± 0%    1.09MB ± 0%  -14.64%  (p=0.016 n=4+5)
CachedSeek/MemPS_10TSItems_10000PSItems-8       9.56MB ± 0%    9.08MB ± 0%   -5.03%  (p=0.008 n=5+5)
CachedSeek/MemPS_100TSItems_10000PSItems-8      9.59MB ± 0%    9.10MB ± 0%   -5.12%  (p=0.008 n=5+5)
CachedSeek/MemPS_1000TSItems_10000PSItems-8     10.0MB ± 0%     9.4MB ± 0%   -6.15%  (p=0.016 n=4+5)
CachedSeek/MemPS_10000TSItems_10000PSItems-8    15.1MB ± 0%    14.4MB ± 0%   -5.07%  (p=0.016 n=4+5)
MemorySeek/10Elements-8                         1.77kB ± 0%    1.54kB ± 0%  -13.01%  (p=0.008 n=5+5)
MemorySeek/100Elements-8                        14.0kB ± 0%    12.3kB ± 0%  -11.96%  (p=0.008 n=5+5)
MemorySeek/1000Elements-8                        114kB ± 0%      98kB ± 0%  -14.05%  (p=0.008 n=5+5)
MemorySeek/10000Elements-8                      2.72MB ± 0%    2.56MB ± 0%   -5.88%  (p=0.008 n=5+5)

name                                          old allocs/op  new allocs/op  delta
CachedSeek/MemPS_10TSItems_100PSItems-8            948 ± 0%       626 ± 0%  -33.97%  (p=0.008 n=5+5)
CachedSeek/MemPS_100TSItems_100PSItems-8         1.13k ± 0%     0.63k ± 0%  -44.39%  (p=0.008 n=5+5)
CachedSeek/MemPS_10TSItems_1000PSItems-8         9.05k ± 0%     6.03k ± 0%  -33.38%  (p=0.008 n=5+5)
CachedSeek/MemPS_100TSItems_1000PSItems-8        9.24k ± 0%     6.04k ± 0%  -34.66%  (p=0.008 n=5+5)
CachedSeek/MemPS_1000TSItems_1000PSItems-8       11.0k ± 0%      6.0k ± 0%  -45.30%  (p=0.008 n=5+5)
CachedSeek/MemPS_10TSItems_10000PSItems-8        90.1k ± 0%     60.0k ± 0%  -33.33%  (p=0.008 n=5+5)
CachedSeek/MemPS_100TSItems_10000PSItems-8       90.2k ± 0%     60.0k ± 0%  -33.47%  (p=0.008 n=5+5)
CachedSeek/MemPS_1000TSItems_10000PSItems-8      92.1k ± 0%     60.0k ± 0%  -34.77%  (p=0.008 n=5+5)
CachedSeek/MemPS_10000TSItems_10000PSItems-8      110k ± 0%       60k ± 0%  -45.43%  (p=0.008 n=5+5)
MemorySeek/10Elements-8                           18.0 ± 0%       7.0 ± 0%  -61.11%  (p=0.008 n=5+5)
MemorySeek/100Elements-8                           111 ± 0%        10 ± 0%  -90.99%  (p=0.008 n=5+5)
MemorySeek/1000Elements-8                        1.01k ± 0%     0.01k ± 0%  -98.72%  (p=0.008 n=5+5)
MemorySeek/10000Elements-8                       10.0k ± 0%      0.0k ± 0%  -99.77%  (p=0.008 n=5+5)
This commit is contained in:
Roman Khimov 2022-02-02 14:30:46 +03:00
parent 4058cb30ba
commit b94da149bb
9 changed files with 152 additions and 146 deletions

1
go.mod
View file

@ -5,6 +5,7 @@ require (
github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db
github.com/btcsuite/btcd v0.22.0-beta github.com/btcsuite/btcd v0.22.0-beta
github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew v1.1.1
github.com/google/btree v1.0.1
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru v0.5.4
github.com/holiman/uint256 v1.2.0 github.com/holiman/uint256 v1.2.0

2
go.sum
View file

@ -111,6 +111,8 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=

View file

@ -1373,22 +1373,14 @@ func TestGetClaimable(t *testing.T) {
} }
func TestClose(t *testing.T) { func TestClose(t *testing.T) {
defer func() {
r := recover()
assert.NotNil(t, r)
}()
bc := initTestChain(t, nil, nil) bc := initTestChain(t, nil, nil)
go bc.Run() go bc.Run()
hash0 := bc.GetHeaderHash(0)
_, err := bc.genBlocks(10) _, err := bc.genBlocks(10)
require.NoError(t, err) require.NoError(t, err)
bc.Close() bc.Close()
// It's a hack, but we use internal knowledge of MemoryStore _, err = bc.GetBlock(hash0) // DB is closed, so this will fail.
// implementation which makes it completely unusable (up to panicing) require.Error(t, err)
// after Close().
_ = bc.dao.Store.Put([]byte{0}, []byte{1})
// This should never be executed.
assert.Nil(t, t)
} }
func TestSubscriptions(t *testing.T) { func TestSubscriptions(t *testing.T) {

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/google/btree"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/util/slice"
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
@ -85,26 +86,25 @@ func (s *BoltDBStore) Delete(key []byte) error {
// PutBatch implements the Store interface. // PutBatch implements the Store interface.
func (s *BoltDBStore) PutBatch(batch Batch) error { func (s *BoltDBStore) PutBatch(batch Batch) error {
memBatch := batch.(*MemoryBatch) memBatch := batch.(*MemoryBatch)
return s.PutChangeSet(memBatch.mem) return s.PutChangeSet(&memBatch.mem)
} }
// PutChangeSet implements the Store interface. // PutChangeSet implements the Store interface.
func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error { func (s *BoltDBStore) PutChangeSet(puts *btree.BTree) error {
var err error var err error
return s.db.Batch(func(tx *bbolt.Tx) error { return s.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(Bucket) b := tx.Bucket(Bucket)
for k, v := range puts { puts.Ascend(func(i btree.Item) bool {
if v != nil { kv := i.(KeyValue)
err = b.Put([]byte(k), v) if kv.Value != nil {
err = b.Put(kv.Key, kv.Value)
} else { } else {
err = b.Delete([]byte(k)) err = b.Delete(kv.Key)
} }
if err != nil { return err == nil
})
return err return err
}
}
return nil
}) })
} }

View file

@ -1,6 +1,7 @@
package storage package storage
import ( import (
"github.com/google/btree"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
@ -62,22 +63,24 @@ func (s *LevelDBStore) PutBatch(batch Batch) error {
} }
// PutChangeSet implements the Store interface. // PutChangeSet implements the Store interface.
func (s *LevelDBStore) PutChangeSet(puts map[string][]byte) error { func (s *LevelDBStore) PutChangeSet(puts *btree.BTree) error {
tx, err := s.db.OpenTransaction() tx, err := s.db.OpenTransaction()
if err != nil { if err != nil {
return err return err
} }
for k := range puts { puts.Ascend(func(i btree.Item) bool {
if puts[k] != nil { kv := i.(KeyValue)
err = tx.Put([]byte(k), puts[k], nil) if kv.Value != nil {
err = tx.Put(kv.Key, kv.Value, nil)
} else { } else {
err = tx.Delete([]byte(k), nil) err = tx.Delete(kv.Key, nil)
} }
return err == nil
})
if err != nil { if err != nil {
tx.Discard() tx.Discard()
return err return err
} }
}
return tx.Commit() return tx.Commit()
} }

View file

@ -3,10 +3,9 @@ package storage
import ( import (
"bytes" "bytes"
"context" "context"
"sort"
"strings"
"sync" "sync"
"github.com/google/btree"
"github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/util/slice"
) )
@ -43,6 +42,11 @@ type (
} }
) )
// Less implements btree.Item interface.
func (kv KeyValue) Less(other btree.Item) bool {
return bytes.Compare(kv.Key, other.(KeyValue).Key) < 0
}
// NewMemCachedStore creates a new MemCachedStore object. // NewMemCachedStore creates a new MemCachedStore object.
func NewMemCachedStore(lower Store) *MemCachedStore { func NewMemCachedStore(lower Store) *MemCachedStore {
return &MemCachedStore{ return &MemCachedStore{
@ -55,12 +59,13 @@ func NewMemCachedStore(lower Store) *MemCachedStore {
func (s *MemCachedStore) Get(key []byte) ([]byte, error) { func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
s.mut.RLock() s.mut.RLock()
defer s.mut.RUnlock() defer s.mut.RUnlock()
k := string(key) itm := s.mem.Get(KeyValue{Key: key})
if val, ok := s.mem[k]; ok { if itm != nil {
if val == nil { kv := itm.(KeyValue)
if kv.Value == nil {
return nil, ErrKeyNotFound return nil, ErrKeyNotFound
} }
return val, nil return kv.Value, nil
} }
return s.ps.Get(key) return s.ps.Get(key)
} }
@ -72,17 +77,18 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
var b MemBatch var b MemBatch
b.Put = make([]KeyValueExists, 0, len(s.mem)) b.Put = make([]KeyValueExists, 0, s.mem.Len())
b.Deleted = make([]KeyValueExists, 0) b.Deleted = make([]KeyValueExists, 0)
for k, v := range s.mem { s.mem.Ascend(func(i btree.Item) bool {
key := []byte(k) kv := i.(KeyValue)
_, err := s.ps.Get(key) _, err := s.ps.Get(kv.Key)
if v == nil { if kv.Value == nil {
b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil}) b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: kv, Exists: err == nil})
} else { } else {
b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil}) b.Put = append(b.Put, KeyValueExists{KeyValue: kv, Exists: err == nil})
}
} }
return true
})
return &b return &b
} }
@ -116,32 +122,10 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix
// and point to start seeking from. Backwards seeking from some point is supported // and point to start seeking from. Backwards seeking from some point is supported
// with corresponding `rng` field set. // with corresponding `rng` field set.
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) { func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) {
// Create memory store `mem` and `del` snapshot not to hold the lock. lPrefix := len(rng.Prefix)
var memRes []KeyValueExists
sPrefix := string(rng.Prefix)
lPrefix := len(sPrefix)
sStart := string(rng.Start)
lStart := len(sStart)
isKeyOK := func(key string) bool {
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0)
}
if rng.Backwards {
isKeyOK = func(key string) bool {
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0)
}
}
s.mut.RLock() s.mut.RLock()
for k, v := range s.MemoryStore.mem { var memRes = s.getSeekPairs(rng)
if isKeyOK(k) {
memRes = append(memRes, KeyValueExists{
KeyValue: KeyValue{
Key: []byte(k),
Value: v,
},
Exists: v != nil,
})
}
}
ps := s.ps ps := s.ps
s.mut.RUnlock() s.mut.RUnlock()
@ -149,15 +133,11 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
res := bytes.Compare(k1, k2) res := bytes.Compare(k1, k2)
return res != 0 && rng.Backwards == (res > 0) return res != 0 && rng.Backwards == (res > 0)
} }
// Sort memRes items for further comparison with ps items.
sort.Slice(memRes, func(i, j int) bool {
return less(memRes[i].Key, memRes[j].Key)
})
var ( var (
done bool done bool
iMem int iMem int
kvMem KeyValueExists kvMem KeyValue
haveMem bool haveMem bool
) )
if iMem < len(memRes) { if iMem < len(memRes) {
@ -183,7 +163,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
default: default:
var isMem = haveMem && less(kvMem.Key, kvPs.Key) var isMem = haveMem && less(kvMem.Key, kvPs.Key)
if isMem { if isMem {
if kvMem.Exists { if kvMem.Value != nil {
if cutPrefix { if cutPrefix {
kvMem.Key = kvMem.Key[lPrefix:] kvMem.Key = kvMem.Key[lPrefix:]
} }
@ -224,7 +204,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
break loop break loop
default: default:
kvMem = memRes[i] kvMem = memRes[i]
if kvMem.Exists { if kvMem.Value != nil {
if cutPrefix { if cutPrefix {
kvMem.Key = kvMem.Key[lPrefix:] kvMem.Key = kvMem.Key[lPrefix:]
} }
@ -259,7 +239,7 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) {
defer s.plock.Unlock() defer s.plock.Unlock()
s.mut.Lock() s.mut.Lock()
keys = len(s.mem) keys = s.mem.Len()
if keys == 0 { if keys == 0 {
s.mut.Unlock() s.mut.Unlock()
return 0, nil return 0, nil
@ -271,12 +251,12 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) {
// unprotected while writes are handled by s proper. // unprotected while writes are handled by s proper.
var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem}, ps: s.ps} var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem}, ps: s.ps}
s.ps = tempstore s.ps = tempstore
s.mem = make(map[string][]byte, len(s.mem)) s.mem = *btree.New(8)
if !isSync { if !isSync {
s.mut.Unlock() s.mut.Unlock()
} }
err = tempstore.ps.PutChangeSet(tempstore.mem) err = tempstore.ps.PutChangeSet(&tempstore.mem)
if !isSync { if !isSync {
s.mut.Lock() s.mut.Lock()
@ -289,11 +269,10 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) {
} else { } else {
// We're toast. We'll try to still keep proper state, but OOM // We're toast. We'll try to still keep proper state, but OOM
// killer will get to us eventually. // killer will get to us eventually.
for k := range s.mem { s.mem.Ascend(func(i btree.Item) bool {
tempstore.put(k, s.mem[k]) tempstore.put(i.(KeyValue))
} return true
s.ps = tempstore.ps })
s.mem = tempstore.mem
} }
s.mut.Unlock() s.mut.Unlock()
return keys, err return keys, err

View file

@ -6,6 +6,7 @@ import (
"sort" "sort"
"testing" "testing"
"github.com/google/btree"
"github.com/nspcc-dev/neo-go/internal/random" "github.com/nspcc-dev/neo-go/internal/random"
"github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/util/slice"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -243,8 +244,8 @@ func BenchmarkCachedSeek(t *testing.B) {
"MemPS": func(t testing.TB) Store { "MemPS": func(t testing.TB) Store {
return NewMemoryStore() return NewMemoryStore()
}, },
"BoltPS": newBoltStoreForTesting, // "BoltPS": newBoltStoreForTesting,
"LevelPS": newLevelDBForTesting, // "LevelPS": newLevelDBForTesting,
} }
for psName, newPS := range stores { for psName, newPS := range stores {
for psCount := 100; psCount <= 10000; psCount *= 10 { for psCount := 100; psCount <= 10000; psCount *= 10 {
@ -287,7 +288,7 @@ func (b *BadStore) Put(k, v []byte) error {
func (b *BadStore) PutBatch(Batch) error { func (b *BadStore) PutBatch(Batch) error {
return nil return nil
} }
func (b *BadStore) PutChangeSet(_ map[string][]byte) error { func (b *BadStore) PutChangeSet(_ *btree.BTree) error {
b.onPutBatch() b.onPutBatch()
return ErrKeyNotFound return ErrKeyNotFound
} }

View file

@ -2,10 +2,9 @@ package storage
import ( import (
"bytes" "bytes"
"sort"
"strings"
"sync" "sync"
"github.com/google/btree"
"github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/util/slice"
) )
@ -13,7 +12,7 @@ import (
// used for testing. Do not use MemoryStore in production. // used for testing. Do not use MemoryStore in production.
type MemoryStore struct { type MemoryStore struct {
mut sync.RWMutex mut sync.RWMutex
mem map[string][]byte mem btree.BTree
} }
// MemoryBatch is an in-memory batch compatible with MemoryStore. // MemoryBatch is an in-memory batch compatible with MemoryStore.
@ -23,18 +22,18 @@ type MemoryBatch struct {
// Put implements the Batch interface. // Put implements the Batch interface.
func (b *MemoryBatch) Put(k, v []byte) { func (b *MemoryBatch) Put(k, v []byte) {
b.MemoryStore.put(string(k), slice.Copy(v)) b.MemoryStore.put(dupKV(k, v))
} }
// Delete implements Batch interface. // Delete implements Batch interface.
func (b *MemoryBatch) Delete(k []byte) { func (b *MemoryBatch) Delete(k []byte) {
b.MemoryStore.drop(string(k)) b.MemoryStore.drop(slice.Copy(k))
} }
// NewMemoryStore creates a new MemoryStore object. // NewMemoryStore creates a new MemoryStore object.
func NewMemoryStore() *MemoryStore { func NewMemoryStore() *MemoryStore {
return &MemoryStore{ return &MemoryStore{
mem: make(map[string][]byte), mem: *btree.New(8),
} }
} }
@ -42,37 +41,40 @@ func NewMemoryStore() *MemoryStore {
func (s *MemoryStore) Get(key []byte) ([]byte, error) { func (s *MemoryStore) Get(key []byte) ([]byte, error) {
s.mut.RLock() s.mut.RLock()
defer s.mut.RUnlock() defer s.mut.RUnlock()
if val, ok := s.mem[string(key)]; ok && val != nil { itm := s.mem.Get(KeyValue{Key: key})
return val, nil if itm != nil {
kv := itm.(KeyValue)
if kv.Value != nil {
return kv.Value, nil
}
} }
return nil, ErrKeyNotFound return nil, ErrKeyNotFound
} }
// put puts a key-value pair into the store, it's supposed to be called // put puts a key-value pair into the store, it's supposed to be called
// with mutex locked. // with mutex locked.
func (s *MemoryStore) put(key string, value []byte) { func (s *MemoryStore) put(kv KeyValue) {
s.mem[key] = value _ = s.mem.ReplaceOrInsert(kv)
} }
// Put implements the Store interface. Never returns an error. // Put implements the Store interface. Never returns an error.
func (s *MemoryStore) Put(key, value []byte) error { func (s *MemoryStore) Put(key, value []byte) error {
newKey := string(key) kv := dupKV(key, value)
vcopy := slice.Copy(value)
s.mut.Lock() s.mut.Lock()
s.put(newKey, vcopy) s.put(kv)
s.mut.Unlock() s.mut.Unlock()
return nil return nil
} }
// drop deletes a key-value pair from the store, it's supposed to be called // drop deletes a key-value pair from the store, it's supposed to be called
// with mutex locked. // with mutex locked.
func (s *MemoryStore) drop(key string) { func (s *MemoryStore) drop(key []byte) {
s.mem[key] = nil s.put(KeyValue{Key: key})
} }
// Delete implements Store interface. Never returns an error. // Delete implements Store interface. Never returns an error.
func (s *MemoryStore) Delete(key []byte) error { func (s *MemoryStore) Delete(key []byte) error {
newKey := string(key) newKey := slice.Copy(key)
s.mut.Lock() s.mut.Lock()
s.drop(newKey) s.drop(newKey)
s.mut.Unlock() s.mut.Unlock()
@ -82,15 +84,16 @@ func (s *MemoryStore) Delete(key []byte) error {
// PutBatch implements the Store interface. Never returns an error. // PutBatch implements the Store interface. Never returns an error.
func (s *MemoryStore) PutBatch(batch Batch) error { func (s *MemoryStore) PutBatch(batch Batch) error {
b := batch.(*MemoryBatch) b := batch.(*MemoryBatch)
return s.PutChangeSet(b.mem) return s.PutChangeSet(&b.mem)
} }
// PutChangeSet implements the Store interface. Never returns an error. // PutChangeSet implements the Store interface. Never returns an error.
func (s *MemoryStore) PutChangeSet(puts map[string][]byte) error { func (s *MemoryStore) PutChangeSet(puts *btree.BTree) error {
s.mut.Lock() s.mut.Lock()
for k := range puts { puts.Ascend(func(i btree.Item) bool {
s.put(k, puts[k]) s.put(i.(KeyValue))
} return true
})
s.mut.Unlock() s.mut.Unlock()
return nil return nil
} }
@ -106,49 +109,59 @@ func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) { func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
s.mut.RLock() s.mut.RLock()
defer s.mut.RUnlock() defer s.mut.RUnlock()
sk := string(key) s.mem.AscendGreaterOrEqual(KeyValue{Key: key}, func(i btree.Item) bool {
for k, v := range s.mem { kv := i.(KeyValue)
if strings.HasPrefix(k, sk) { if !bytes.HasPrefix(kv.Key, key) {
f([]byte(k), v) return false
}
f(kv.Key, kv.Value)
return true
})
}
// getSeekPairs returns KV pairs for current Seek.
func (s *MemoryStore) getSeekPairs(rng SeekRange) []KeyValue {
lPrefix := len(rng.Prefix)
lStart := len(rng.Start)
var pivot KeyValue
pivot.Key = make([]byte, lPrefix+lStart, lPrefix+lStart+1)
copy(pivot.Key, rng.Prefix)
if lStart != 0 {
copy(pivot.Key[lPrefix:], rng.Start)
}
var memList []KeyValue
var appender = func(i btree.Item) bool {
kv := i.(KeyValue)
if !bytes.HasPrefix(kv.Key, rng.Prefix) {
return false
}
memList = append(memList, kv)
return true
}
if !rng.Backwards {
s.mem.AscendGreaterOrEqual(pivot, appender)
} else {
if lStart != 0 {
pivot.Key = append(pivot.Key, 0) // Right after the start key.
s.mem.AscendRange(KeyValue{Key: rng.Prefix}, pivot, appender)
} else {
s.mem.AscendGreaterOrEqual(KeyValue{Key: rng.Prefix}, appender)
}
for i, j := 0, len(memList)-1; i <= j; i, j = i+1, j-1 {
memList[i], memList[j] = memList[j], memList[i]
} }
} }
return memList
} }
// seek is an internal unlocked implementation of Seek. `start` denotes whether // seek is an internal unlocked implementation of Seek. `start` denotes whether
// seeking starting from the provided prefix should be performed. Backwards // seeking starting from the provided prefix should be performed. Backwards
// seeking from some point is supported with corresponding SeekRange field set. // seeking from some point is supported with corresponding SeekRange field set.
func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) { func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) {
sPrefix := string(rng.Prefix) for _, kv := range s.getSeekPairs(rng) {
lPrefix := len(sPrefix)
sStart := string(rng.Start)
lStart := len(sStart)
var memList []KeyValue
isKeyOK := func(key string) bool {
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0)
}
if rng.Backwards {
isKeyOK = func(key string) bool {
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0)
}
}
less := func(k1, k2 []byte) bool {
res := bytes.Compare(k1, k2)
return res != 0 && rng.Backwards == (res > 0)
}
for k, v := range s.mem {
if v != nil && isKeyOK(k) {
memList = append(memList, KeyValue{
Key: []byte(k),
Value: v,
})
}
}
sort.Slice(memList, func(i, j int) bool {
return less(memList[i].Key, memList[j].Key)
})
for _, kv := range memList {
if !f(kv.Key, kv.Value) { if !f(kv.Key, kv.Value) {
break break
} }
@ -169,7 +182,20 @@ func newMemoryBatch() *MemoryBatch {
// error. // error.
func (s *MemoryStore) Close() error { func (s *MemoryStore) Close() error {
s.mut.Lock() s.mut.Lock()
s.mem = nil s.mem.Clear(false)
s.mut.Unlock() s.mut.Unlock()
return nil return nil
} }
func dupKV(key []byte, value []byte) KeyValue {
var res KeyValue
s := make([]byte, len(key)+len(value))
copy(s, key)
res.Key = s[:len(key)]
if value != nil {
copy(s[len(key):], value)
res.Value = s[len(key):]
}
return res
}

View file

@ -4,6 +4,8 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"github.com/google/btree"
) )
// KeyPrefix constants. // KeyPrefix constants.
@ -88,7 +90,7 @@ type (
Put(k, v []byte) error Put(k, v []byte) error
PutBatch(Batch) error PutBatch(Batch) error
// PutChangeSet allows to push prepared changeset to the Store. // PutChangeSet allows to push prepared changeset to the Store.
PutChangeSet(puts map[string][]byte) error PutChangeSet(puts *btree.BTree) error
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f. // Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
// Seek continues iteration until false is returned from f. // Seek continues iteration until false is returned from f.
// Key and value slices should not be modified. // Key and value slices should not be modified.