From 33072925972cd8ed6edabc55d21b77c78d3af758 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sat, 29 Jan 2022 11:54:25 +0300 Subject: [PATCH] storage: rework MemoryStore with a single map Doesn't affect any benchmarks or tests, but makes things a bit simpler. --- pkg/core/storage/boltdb_store.go | 16 ++++---- pkg/core/storage/leveldb_store.go | 13 +++---- pkg/core/storage/memcached_store.go | 49 ++++++++---------------- pkg/core/storage/memcached_store_test.go | 4 +- pkg/core/storage/memory_store.go | 24 +++--------- pkg/core/storage/store.go | 2 +- 6 files changed, 36 insertions(+), 72 deletions(-) diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index 83f2f3144..8f60aa5ee 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -85,21 +85,21 @@ func (s *BoltDBStore) Delete(key []byte) error { // PutBatch implements the Store interface. func (s *BoltDBStore) PutBatch(batch Batch) error { memBatch := batch.(*MemoryBatch) - return s.PutChangeSet(memBatch.mem, memBatch.del) + return s.PutChangeSet(memBatch.mem) } // PutChangeSet implements the Store interface. -func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error { +func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error { + var err error + return s.db.Batch(func(tx *bbolt.Tx) error { b := tx.Bucket(Bucket) for k, v := range puts { - err := b.Put([]byte(k), v) - if err != nil { - return err + if v != nil { + err = b.Put([]byte(k), v) + } else { + err = b.Delete([]byte(k)) } - } - for k := range dels { - err := b.Delete([]byte(k)) if err != nil { return err } diff --git a/pkg/core/storage/leveldb_store.go b/pkg/core/storage/leveldb_store.go index 410b93e82..6d5439711 100644 --- a/pkg/core/storage/leveldb_store.go +++ b/pkg/core/storage/leveldb_store.go @@ -62,20 +62,17 @@ func (s *LevelDBStore) PutBatch(batch Batch) error { } // PutChangeSet implements the Store interface. -func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error { +func (s *LevelDBStore) PutChangeSet(puts map[string][]byte) error { tx, err := s.db.OpenTransaction() if err != nil { return err } for k := range puts { - err = tx.Put([]byte(k), puts[k], nil) - if err != nil { - tx.Discard() - return err + if puts[k] != nil { + err = tx.Put([]byte(k), puts[k], nil) + } else { + err = tx.Delete([]byte(k), nil) } - } - for k := range dels { - err = tx.Delete([]byte(k), nil) if err != nil { tx.Discard() return err diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index a6e5278a1..aed7b9c83 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -55,13 +55,12 @@ func NewMemCachedStore(lower Store) *MemCachedStore { func (s *MemCachedStore) Get(key []byte) ([]byte, error) { s.mut.RLock() defer s.mut.RUnlock() - k := string(key) - if val, ok := s.mem[k]; ok { + if val, ok := s.mem[string(key)]; ok { + if val == nil { + return nil, ErrKeyNotFound + } return val, nil } - if _, ok := s.del[k]; ok { - return nil, ErrKeyNotFound - } return s.ps.Get(key) } @@ -73,19 +72,16 @@ func (s *MemCachedStore) GetBatch() *MemBatch { var b MemBatch b.Put = make([]KeyValueExists, 0, len(s.mem)) + b.Deleted = make([]KeyValueExists, 0) for k, v := range s.mem { key := []byte(k) _, err := s.ps.Get(key) - b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil}) + if v == nil { + b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil}) + } else { + b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil}) + } } - - b.Deleted = make([]KeyValueExists, 0, len(s.del)) - for k := range s.del { - key := []byte(k) - _, err := s.ps.Get(key) - b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil}) - } - return &b } @@ -141,16 +137,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool Key: []byte(k), Value: v, }, - Exists: true, - }) - } - } - for k := range s.MemoryStore.del { - if isKeyOK(k) { - memRes = append(memRes, KeyValueExists{ - KeyValue: KeyValue{ - Key: []byte(k), - }, + Exists: v != nil, }) } } @@ -265,15 +252,14 @@ func (s *MemCachedStore) PersistSync() (int, error) { func (s *MemCachedStore) persist(isSync bool) (int, error) { var err error - var keys, dkeys int + var keys int s.plock.Lock() defer s.plock.Unlock() s.mut.Lock() keys = len(s.mem) - dkeys = len(s.del) - if keys == 0 && dkeys == 0 { + if keys == 0 { s.mut.Unlock() return 0, nil } @@ -282,15 +268,14 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { // starts using fresh new maps. This tempstore is only known here and // nothing ever changes it, therefore accesses to it (reads) can go // unprotected while writes are handled by s proper. - var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, del: s.del}, ps: s.ps} + var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem}, ps: s.ps} s.ps = tempstore s.mem = make(map[string][]byte, len(s.mem)) - s.del = make(map[string]bool, len(s.del)) if !isSync { s.mut.Unlock() } - err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del) + err = tempstore.ps.PutChangeSet(tempstore.mem) if !isSync { s.mut.Lock() @@ -306,12 +291,8 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { for k := range s.mem { tempstore.put(k, s.mem[k]) } - for k := range s.del { - tempstore.drop(k) - } s.ps = tempstore.ps s.mem = tempstore.mem - s.del = tempstore.del } s.mut.Unlock() return keys, err diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index d46af8370..100942d7e 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -74,7 +74,7 @@ func testMemCachedStorePersist(t *testing.T, ps Store) { c, err = ts.Persist() checkBatch(t, ts, nil, nil) assert.Equal(t, nil, err) - assert.Equal(t, 0, c) + assert.Equal(t, 1, c) v, err = ps.Get([]byte("key")) assert.Equal(t, ErrKeyNotFound, err) assert.Equal(t, []byte(nil), v) @@ -287,7 +287,7 @@ func (b *BadStore) Put(k, v []byte) error { func (b *BadStore) PutBatch(Batch) error { return nil } -func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string]bool) error { +func (b *BadStore) PutChangeSet(_ map[string][]byte) error { b.onPutBatch() return ErrKeyNotFound } diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 506f76224..1a5589aa6 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -14,8 +14,6 @@ import ( type MemoryStore struct { mut sync.RWMutex mem map[string][]byte - // A map, not a slice, to avoid duplicates. - del map[string]bool } // MemoryBatch is an in-memory batch compatible with MemoryStore. @@ -37,7 +35,6 @@ func (b *MemoryBatch) Delete(k []byte) { func NewMemoryStore() *MemoryStore { return &MemoryStore{ mem: make(map[string][]byte), - del: make(map[string]bool), } } @@ -45,7 +42,7 @@ func NewMemoryStore() *MemoryStore { func (s *MemoryStore) Get(key []byte) ([]byte, error) { s.mut.RLock() defer s.mut.RUnlock() - if val, ok := s.mem[string(key)]; ok { + if val, ok := s.mem[string(key)]; ok && val != nil { return val, nil } return nil, ErrKeyNotFound @@ -55,7 +52,6 @@ func (s *MemoryStore) Get(key []byte) ([]byte, error) { // with mutex locked. func (s *MemoryStore) put(key string, value []byte) { s.mem[key] = value - delete(s.del, key) } // Put implements the Store interface. Never returns an error. @@ -71,8 +67,7 @@ func (s *MemoryStore) Put(key, value []byte) error { // drop deletes a key-value pair from the store, it's supposed to be called // with mutex locked. func (s *MemoryStore) drop(key string) { - s.del[key] = true - delete(s.mem, key) + s.mem[key] = nil } // Delete implements Store interface. Never returns an error. @@ -87,18 +82,15 @@ func (s *MemoryStore) Delete(key []byte) error { // PutBatch implements the Store interface. Never returns an error. func (s *MemoryStore) PutBatch(batch Batch) error { b := batch.(*MemoryBatch) - return s.PutChangeSet(b.mem, b.del) + return s.PutChangeSet(b.mem) } // PutChangeSet implements the Store interface. Never returns an error. -func (s *MemoryStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error { +func (s *MemoryStore) PutChangeSet(puts map[string][]byte) error { s.mut.Lock() for k := range puts { s.put(k, puts[k]) } - for k := range dels { - s.drop(k) - } s.mut.Unlock() return nil } @@ -120,11 +112,6 @@ func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) { f([]byte(k), v) } } - for k := range s.del { - if strings.HasPrefix(k, sk) { - f([]byte(k), nil) - } - } } // seek is an internal unlocked implementation of Seek. `start` denotes whether @@ -151,7 +138,7 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) { } for k, v := range s.mem { - if isKeyOK(k) { + if v != nil && isKeyOK(k) { memList = append(memList, KeyValue{ Key: []byte(k), Value: v, @@ -182,7 +169,6 @@ func newMemoryBatch() *MemoryBatch { // error. func (s *MemoryStore) Close() error { s.mut.Lock() - s.del = nil s.mem = nil s.mut.Unlock() return nil diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 8409e7a1f..ff1997234 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -88,7 +88,7 @@ type ( Put(k, v []byte) error PutBatch(Batch) error // PutChangeSet allows to push prepared changeset to the Store. - PutChangeSet(puts map[string][]byte, dels map[string]bool) error + PutChangeSet(puts map[string][]byte) error // Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f. // Seek continues iteration until false is returned from f. // Key and value slices should not be modified.