diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index e2c6c52a0..dc435a72c 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -71,6 +71,14 @@ func (s *BoltDBStore) Get(key []byte) (val []byte, err error) { return } +// Delete implements the Store interface. +func (s *BoltDBStore) Delete(key []byte) error { + return s.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(Bucket) + return b.Delete(key) + }) +} + // PutBatch implements the Store interface. func (s *BoltDBStore) PutBatch(batch Batch) error { return s.db.Batch(func(tx *bbolt.Tx) error { @@ -81,6 +89,12 @@ func (s *BoltDBStore) PutBatch(batch Batch) error { return err } } + for k := range batch.(*MemoryBatch).del { + err := b.Delete([]byte(k)) + if err != nil { + return err + } + } return nil }) } diff --git a/pkg/core/storage/leveldb_store.go b/pkg/core/storage/leveldb_store.go index f024c41d3..b2637744e 100644 --- a/pkg/core/storage/leveldb_store.go +++ b/pkg/core/storage/leveldb_store.go @@ -48,6 +48,11 @@ func (s *LevelDBStore) Get(key []byte) ([]byte, error) { return value, err } +// Delete implements the Store interface. +func (s *LevelDBStore) Delete(key []byte) error { + return s.db.Delete(key, nil) +} + // PutBatch implements the Store interface. func (s *LevelDBStore) PutBatch(batch Batch) error { lvldbBatch := batch.(*leveldb.Batch) diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 8ce29c7db..571ab64e5 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -10,11 +10,15 @@ import ( type MemoryStore struct { mut sync.RWMutex mem map[string][]byte + // A map, not a slice, to avoid duplicates. + del map[string]bool } // MemoryBatch a in-memory batch compatible with MemoryStore. type MemoryBatch struct { m map[string][]byte + // A map, not a slice, to avoid duplicates. + del map[string]bool } // Put implements the Batch interface. @@ -23,6 +27,14 @@ func (b *MemoryBatch) Put(k, v []byte) { copy(vcopy, v) kcopy := string(k) b.m[kcopy] = vcopy + delete(b.del, kcopy) +} + +// Delete implements Batch interface. +func (b *MemoryBatch) Delete(k []byte) { + kcopy := string(k) + delete(b.m, kcopy) + b.del[kcopy] = true } // Len implements the Batch interface. @@ -34,6 +46,7 @@ func (b *MemoryBatch) Len() int { func NewMemoryStore() *MemoryStore { return &MemoryStore{ mem: make(map[string][]byte), + del: make(map[string]bool), } } @@ -50,7 +63,19 @@ func (s *MemoryStore) Get(key []byte) ([]byte, error) { // Put implements the Store interface. Never returns an error. func (s *MemoryStore) Put(key, value []byte) error { s.mut.Lock() - s.mem[string(key)] = value + newKey := string(key) + s.mem[newKey] = value + delete(s.del, newKey) + s.mut.Unlock() + return nil +} + +// Delete implements Store interface. Never returns an error. +func (s *MemoryStore) Delete(key []byte) error { + s.mut.Lock() + newKey := string(key) + s.del[newKey] = true + delete(s.mem, newKey) s.mut.Unlock() return nil } @@ -58,6 +83,9 @@ func (s *MemoryStore) Put(key, value []byte) error { // PutBatch implements the Store interface. Never returns an error. func (s *MemoryStore) PutBatch(batch Batch) error { b := batch.(*MemoryBatch) + for k := range b.del { + _ = s.Delete([]byte(k)) + } for k, v := range b.m { _ = s.Put([]byte(k), v) } @@ -81,7 +109,8 @@ func (s *MemoryStore) Batch() Batch { // newMemoryBatch returns new memory batch. func newMemoryBatch() *MemoryBatch { return &MemoryBatch{ - m: make(map[string][]byte), + m: make(map[string][]byte), + del: make(map[string]bool), } } @@ -91,17 +120,22 @@ func (s *MemoryStore) Persist(ps Store) (int, error) { s.mut.Lock() defer s.mut.Unlock() batch := ps.Batch() - keys := 0 + keys, dkeys := 0, 0 for k, v := range s.mem { batch.Put([]byte(k), v) keys++ } + for k := range s.del { + batch.Delete([]byte(k)) + dkeys++ + } var err error - if keys != 0 { + if keys != 0 || dkeys != 0 { err = ps.PutBatch(batch) } if err == nil { s.mem = make(map[string][]byte) + s.del = make(map[string]bool) } return keys, err } @@ -110,6 +144,7 @@ func (s *MemoryStore) Persist(ps Store) (int, error) { // error. func (s *MemoryStore) Close() error { s.mut.Lock() + s.del = nil s.mem = nil s.mut.Unlock() return nil diff --git a/pkg/core/storage/memory_store_test.go b/pkg/core/storage/memory_store_test.go index 30aa526fd..96a7f5da5 100644 --- a/pkg/core/storage/memory_store_test.go +++ b/pkg/core/storage/memory_store_test.go @@ -125,4 +125,16 @@ func TestMemoryStorePersist(t *testing.T) { c, err = ts.Persist(ps) assert.Equal(t, nil, err) assert.Equal(t, 0, c) + // test persisting deletions + err = ts.Delete([]byte("key")) + assert.Equal(t, nil, err) + c, err = ts.Persist(ps) + assert.Equal(t, nil, err) + assert.Equal(t, 0, c) + v, err = ps.Get([]byte("key")) + assert.Equal(t, ErrKeyNotFound, err) + assert.Equal(t, []byte(nil), v) + v, err = ps.Get([]byte("key2")) + assert.Equal(t, nil, err) + assert.Equal(t, []byte("value2"), v) } diff --git a/pkg/core/storage/redis_store.go b/pkg/core/storage/redis_store.go index 30021c9fd..0d8ef8e74 100644 --- a/pkg/core/storage/redis_store.go +++ b/pkg/core/storage/redis_store.go @@ -48,6 +48,12 @@ func (s *RedisStore) Get(k []byte) ([]byte, error) { return []byte(val), nil } +// Delete implements the Store interface. +func (s *RedisStore) Delete(k []byte) error { + s.client.Del(string(k)) + return nil +} + // Put implements the Store interface. func (s *RedisStore) Put(k, v []byte) error { s.client.Set(string(k), string(v), 0) @@ -60,6 +66,9 @@ func (s *RedisStore) PutBatch(b Batch) error { for k, v := range b.(*MemoryBatch).m { pipe.Set(k, v, 0) } + for k := range b.(*MemoryBatch).del { + pipe.Del(k) + } _, err := pipe.Exec() return err } diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 92fdeab07..a9bbacd5b 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -32,6 +32,7 @@ type ( // information. Store interface { Batch() Batch + Delete(k []byte) error Get([]byte) ([]byte, error) Put(k, v []byte) error PutBatch(Batch) error @@ -43,6 +44,7 @@ type ( // Each Store implementation is responsible of casting a Batch // to its appropriate type. Batch interface { + Delete(k []byte) Put(k, v []byte) Len() int }