forked from TrueCloudLab/neoneo-go
storage: rework MemoryStore with a single map
Doesn't affect any benchmarks or tests, but makes things a bit simpler.
This commit is contained in:
parent
2bc493a839
commit
3307292597
6 changed files with 36 additions and 72 deletions
|
@ -85,21 +85,21 @@ func (s *BoltDBStore) Delete(key []byte) error {
|
||||||
// PutBatch implements the Store interface.
|
// PutBatch implements the Store interface.
|
||||||
func (s *BoltDBStore) PutBatch(batch Batch) error {
|
func (s *BoltDBStore) PutBatch(batch Batch) error {
|
||||||
memBatch := batch.(*MemoryBatch)
|
memBatch := batch.(*MemoryBatch)
|
||||||
return s.PutChangeSet(memBatch.mem, memBatch.del)
|
return s.PutChangeSet(memBatch.mem)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutChangeSet implements the Store interface.
|
// PutChangeSet implements the Store interface.
|
||||||
func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
|
func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error {
|
||||||
|
var err error
|
||||||
|
|
||||||
return s.db.Batch(func(tx *bbolt.Tx) error {
|
return s.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(Bucket)
|
b := tx.Bucket(Bucket)
|
||||||
for k, v := range puts {
|
for k, v := range puts {
|
||||||
err := b.Put([]byte(k), v)
|
if v != nil {
|
||||||
if err != nil {
|
err = b.Put([]byte(k), v)
|
||||||
return err
|
} else {
|
||||||
|
err = b.Delete([]byte(k))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
for k := range dels {
|
|
||||||
err := b.Delete([]byte(k))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,20 +62,17 @@ func (s *LevelDBStore) PutBatch(batch Batch) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutChangeSet implements the Store interface.
|
// PutChangeSet implements the Store interface.
|
||||||
func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
|
func (s *LevelDBStore) PutChangeSet(puts map[string][]byte) error {
|
||||||
tx, err := s.db.OpenTransaction()
|
tx, err := s.db.OpenTransaction()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for k := range puts {
|
for k := range puts {
|
||||||
err = tx.Put([]byte(k), puts[k], nil)
|
if puts[k] != nil {
|
||||||
if err != nil {
|
err = tx.Put([]byte(k), puts[k], nil)
|
||||||
tx.Discard()
|
} else {
|
||||||
return err
|
err = tx.Delete([]byte(k), nil)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
for k := range dels {
|
|
||||||
err = tx.Delete([]byte(k), nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Discard()
|
tx.Discard()
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -55,13 +55,12 @@ func NewMemCachedStore(lower Store) *MemCachedStore {
|
||||||
func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
|
func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
|
||||||
s.mut.RLock()
|
s.mut.RLock()
|
||||||
defer s.mut.RUnlock()
|
defer s.mut.RUnlock()
|
||||||
k := string(key)
|
if val, ok := s.mem[string(key)]; ok {
|
||||||
if val, ok := s.mem[k]; ok {
|
if val == nil {
|
||||||
|
return nil, ErrKeyNotFound
|
||||||
|
}
|
||||||
return val, nil
|
return val, nil
|
||||||
}
|
}
|
||||||
if _, ok := s.del[k]; ok {
|
|
||||||
return nil, ErrKeyNotFound
|
|
||||||
}
|
|
||||||
return s.ps.Get(key)
|
return s.ps.Get(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,19 +72,16 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
|
||||||
var b MemBatch
|
var b MemBatch
|
||||||
|
|
||||||
b.Put = make([]KeyValueExists, 0, len(s.mem))
|
b.Put = make([]KeyValueExists, 0, len(s.mem))
|
||||||
|
b.Deleted = make([]KeyValueExists, 0)
|
||||||
for k, v := range s.mem {
|
for k, v := range s.mem {
|
||||||
key := []byte(k)
|
key := []byte(k)
|
||||||
_, err := s.ps.Get(key)
|
_, err := s.ps.Get(key)
|
||||||
b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil})
|
if v == nil {
|
||||||
|
b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil})
|
||||||
|
} else {
|
||||||
|
b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
b.Deleted = make([]KeyValueExists, 0, len(s.del))
|
|
||||||
for k := range s.del {
|
|
||||||
key := []byte(k)
|
|
||||||
_, err := s.ps.Get(key)
|
|
||||||
b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil})
|
|
||||||
}
|
|
||||||
|
|
||||||
return &b
|
return &b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,16 +137,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
||||||
Key: []byte(k),
|
Key: []byte(k),
|
||||||
Value: v,
|
Value: v,
|
||||||
},
|
},
|
||||||
Exists: true,
|
Exists: v != nil,
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for k := range s.MemoryStore.del {
|
|
||||||
if isKeyOK(k) {
|
|
||||||
memRes = append(memRes, KeyValueExists{
|
|
||||||
KeyValue: KeyValue{
|
|
||||||
Key: []byte(k),
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -265,15 +252,14 @@ func (s *MemCachedStore) PersistSync() (int, error) {
|
||||||
|
|
||||||
func (s *MemCachedStore) persist(isSync bool) (int, error) {
|
func (s *MemCachedStore) persist(isSync bool) (int, error) {
|
||||||
var err error
|
var err error
|
||||||
var keys, dkeys int
|
var keys int
|
||||||
|
|
||||||
s.plock.Lock()
|
s.plock.Lock()
|
||||||
defer s.plock.Unlock()
|
defer s.plock.Unlock()
|
||||||
s.mut.Lock()
|
s.mut.Lock()
|
||||||
|
|
||||||
keys = len(s.mem)
|
keys = len(s.mem)
|
||||||
dkeys = len(s.del)
|
if keys == 0 {
|
||||||
if keys == 0 && dkeys == 0 {
|
|
||||||
s.mut.Unlock()
|
s.mut.Unlock()
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
@ -282,15 +268,14 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) {
|
||||||
// starts using fresh new maps. This tempstore is only known here and
|
// starts using fresh new maps. This tempstore is only known here and
|
||||||
// nothing ever changes it, therefore accesses to it (reads) can go
|
// nothing ever changes it, therefore accesses to it (reads) can go
|
||||||
// unprotected while writes are handled by s proper.
|
// unprotected while writes are handled by s proper.
|
||||||
var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, del: s.del}, ps: s.ps}
|
var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem}, ps: s.ps}
|
||||||
s.ps = tempstore
|
s.ps = tempstore
|
||||||
s.mem = make(map[string][]byte, len(s.mem))
|
s.mem = make(map[string][]byte, len(s.mem))
|
||||||
s.del = make(map[string]bool, len(s.del))
|
|
||||||
if !isSync {
|
if !isSync {
|
||||||
s.mut.Unlock()
|
s.mut.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del)
|
err = tempstore.ps.PutChangeSet(tempstore.mem)
|
||||||
|
|
||||||
if !isSync {
|
if !isSync {
|
||||||
s.mut.Lock()
|
s.mut.Lock()
|
||||||
|
@ -306,12 +291,8 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) {
|
||||||
for k := range s.mem {
|
for k := range s.mem {
|
||||||
tempstore.put(k, s.mem[k])
|
tempstore.put(k, s.mem[k])
|
||||||
}
|
}
|
||||||
for k := range s.del {
|
|
||||||
tempstore.drop(k)
|
|
||||||
}
|
|
||||||
s.ps = tempstore.ps
|
s.ps = tempstore.ps
|
||||||
s.mem = tempstore.mem
|
s.mem = tempstore.mem
|
||||||
s.del = tempstore.del
|
|
||||||
}
|
}
|
||||||
s.mut.Unlock()
|
s.mut.Unlock()
|
||||||
return keys, err
|
return keys, err
|
||||||
|
|
|
@ -74,7 +74,7 @@ func testMemCachedStorePersist(t *testing.T, ps Store) {
|
||||||
c, err = ts.Persist()
|
c, err = ts.Persist()
|
||||||
checkBatch(t, ts, nil, nil)
|
checkBatch(t, ts, nil, nil)
|
||||||
assert.Equal(t, nil, err)
|
assert.Equal(t, nil, err)
|
||||||
assert.Equal(t, 0, c)
|
assert.Equal(t, 1, c)
|
||||||
v, err = ps.Get([]byte("key"))
|
v, err = ps.Get([]byte("key"))
|
||||||
assert.Equal(t, ErrKeyNotFound, err)
|
assert.Equal(t, ErrKeyNotFound, err)
|
||||||
assert.Equal(t, []byte(nil), v)
|
assert.Equal(t, []byte(nil), v)
|
||||||
|
@ -287,7 +287,7 @@ func (b *BadStore) Put(k, v []byte) error {
|
||||||
func (b *BadStore) PutBatch(Batch) error {
|
func (b *BadStore) PutBatch(Batch) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string]bool) error {
|
func (b *BadStore) PutChangeSet(_ map[string][]byte) error {
|
||||||
b.onPutBatch()
|
b.onPutBatch()
|
||||||
return ErrKeyNotFound
|
return ErrKeyNotFound
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,8 +14,6 @@ import (
|
||||||
type MemoryStore struct {
|
type MemoryStore struct {
|
||||||
mut sync.RWMutex
|
mut sync.RWMutex
|
||||||
mem map[string][]byte
|
mem map[string][]byte
|
||||||
// A map, not a slice, to avoid duplicates.
|
|
||||||
del map[string]bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MemoryBatch is an in-memory batch compatible with MemoryStore.
|
// MemoryBatch is an in-memory batch compatible with MemoryStore.
|
||||||
|
@ -37,7 +35,6 @@ func (b *MemoryBatch) Delete(k []byte) {
|
||||||
func NewMemoryStore() *MemoryStore {
|
func NewMemoryStore() *MemoryStore {
|
||||||
return &MemoryStore{
|
return &MemoryStore{
|
||||||
mem: make(map[string][]byte),
|
mem: make(map[string][]byte),
|
||||||
del: make(map[string]bool),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +42,7 @@ func NewMemoryStore() *MemoryStore {
|
||||||
func (s *MemoryStore) Get(key []byte) ([]byte, error) {
|
func (s *MemoryStore) Get(key []byte) ([]byte, error) {
|
||||||
s.mut.RLock()
|
s.mut.RLock()
|
||||||
defer s.mut.RUnlock()
|
defer s.mut.RUnlock()
|
||||||
if val, ok := s.mem[string(key)]; ok {
|
if val, ok := s.mem[string(key)]; ok && val != nil {
|
||||||
return val, nil
|
return val, nil
|
||||||
}
|
}
|
||||||
return nil, ErrKeyNotFound
|
return nil, ErrKeyNotFound
|
||||||
|
@ -55,7 +52,6 @@ func (s *MemoryStore) Get(key []byte) ([]byte, error) {
|
||||||
// with mutex locked.
|
// with mutex locked.
|
||||||
func (s *MemoryStore) put(key string, value []byte) {
|
func (s *MemoryStore) put(key string, value []byte) {
|
||||||
s.mem[key] = value
|
s.mem[key] = value
|
||||||
delete(s.del, key)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put implements the Store interface. Never returns an error.
|
// Put implements the Store interface. Never returns an error.
|
||||||
|
@ -71,8 +67,7 @@ func (s *MemoryStore) Put(key, value []byte) error {
|
||||||
// drop deletes a key-value pair from the store, it's supposed to be called
|
// drop deletes a key-value pair from the store, it's supposed to be called
|
||||||
// with mutex locked.
|
// with mutex locked.
|
||||||
func (s *MemoryStore) drop(key string) {
|
func (s *MemoryStore) drop(key string) {
|
||||||
s.del[key] = true
|
s.mem[key] = nil
|
||||||
delete(s.mem, key)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete implements Store interface. Never returns an error.
|
// Delete implements Store interface. Never returns an error.
|
||||||
|
@ -87,18 +82,15 @@ func (s *MemoryStore) Delete(key []byte) error {
|
||||||
// PutBatch implements the Store interface. Never returns an error.
|
// PutBatch implements the Store interface. Never returns an error.
|
||||||
func (s *MemoryStore) PutBatch(batch Batch) error {
|
func (s *MemoryStore) PutBatch(batch Batch) error {
|
||||||
b := batch.(*MemoryBatch)
|
b := batch.(*MemoryBatch)
|
||||||
return s.PutChangeSet(b.mem, b.del)
|
return s.PutChangeSet(b.mem)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutChangeSet implements the Store interface. Never returns an error.
|
// PutChangeSet implements the Store interface. Never returns an error.
|
||||||
func (s *MemoryStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
|
func (s *MemoryStore) PutChangeSet(puts map[string][]byte) error {
|
||||||
s.mut.Lock()
|
s.mut.Lock()
|
||||||
for k := range puts {
|
for k := range puts {
|
||||||
s.put(k, puts[k])
|
s.put(k, puts[k])
|
||||||
}
|
}
|
||||||
for k := range dels {
|
|
||||||
s.drop(k)
|
|
||||||
}
|
|
||||||
s.mut.Unlock()
|
s.mut.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -120,11 +112,6 @@ func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
|
||||||
f([]byte(k), v)
|
f([]byte(k), v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for k := range s.del {
|
|
||||||
if strings.HasPrefix(k, sk) {
|
|
||||||
f([]byte(k), nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// seek is an internal unlocked implementation of Seek. `start` denotes whether
|
// seek is an internal unlocked implementation of Seek. `start` denotes whether
|
||||||
|
@ -151,7 +138,7 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, v := range s.mem {
|
for k, v := range s.mem {
|
||||||
if isKeyOK(k) {
|
if v != nil && isKeyOK(k) {
|
||||||
memList = append(memList, KeyValue{
|
memList = append(memList, KeyValue{
|
||||||
Key: []byte(k),
|
Key: []byte(k),
|
||||||
Value: v,
|
Value: v,
|
||||||
|
@ -182,7 +169,6 @@ func newMemoryBatch() *MemoryBatch {
|
||||||
// error.
|
// error.
|
||||||
func (s *MemoryStore) Close() error {
|
func (s *MemoryStore) Close() error {
|
||||||
s.mut.Lock()
|
s.mut.Lock()
|
||||||
s.del = nil
|
|
||||||
s.mem = nil
|
s.mem = nil
|
||||||
s.mut.Unlock()
|
s.mut.Unlock()
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -88,7 +88,7 @@ type (
|
||||||
Put(k, v []byte) error
|
Put(k, v []byte) error
|
||||||
PutBatch(Batch) error
|
PutBatch(Batch) error
|
||||||
// PutChangeSet allows to push prepared changeset to the Store.
|
// PutChangeSet allows to push prepared changeset to the Store.
|
||||||
PutChangeSet(puts map[string][]byte, dels map[string]bool) error
|
PutChangeSet(puts map[string][]byte) error
|
||||||
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
||||||
// Seek continues iteration until false is returned from f.
|
// Seek continues iteration until false is returned from f.
|
||||||
// Key and value slices should not be modified.
|
// Key and value slices should not be modified.
|
||||||
|
|
Loading…
Reference in a new issue