diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 1958bab5e..6c2cddf67 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -93,20 +93,42 @@ func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { // Persist flushes all the MemoryStore contents into the (supposedly) persistent // store ps. func (s *MemCachedStore) Persist() (int, error) { + var err error + var keys, dkeys int + s.mut.Lock() defer s.mut.Unlock() - batch := s.ps.Batch() - keys, dkeys := 0, 0 - for k, v := range s.mem { - batch.Put([]byte(k), v) - keys++ + + keys = len(s.mem) + dkeys = len(s.del) + if keys == 0 && dkeys == 0 { + return 0, nil } - for k := range s.del { - batch.Delete([]byte(k)) - dkeys++ + + memStore, ok := s.ps.(*MemoryStore) + if !ok { + memCachedStore, ok := s.ps.(*MemCachedStore) + if ok { + memStore = &memCachedStore.MemoryStore + } } - var err error - if keys != 0 || dkeys != 0 { + if memStore != nil { + memStore.mut.Lock() + for k := range s.mem { + memStore.put(k, s.mem[k]) + } + for k := range s.del { + memStore.drop(k) + } + memStore.mut.Unlock() + } else { + batch := s.ps.Batch() + for k := range s.mem { + batch.Put([]byte(k), s.mem[k]) + } + for k := range s.del { + batch.Delete([]byte(k)) + } err = s.ps.PutBatch(batch) } if err == nil { diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index cf7bcf40f..8addd4088 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -7,9 +7,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestMemCachedStorePersist(t *testing.T) { - // persistent Store - ps := NewMemoryStore() +func testMemCachedStorePersist(t *testing.T, ps Store) { // cached Store ts := NewMemCachedStore(ps) // persisting nothing should do nothing @@ -94,6 +92,22 @@ func checkBatch(t *testing.T, ts *MemCachedStore, put []KeyValue, del []KeyValue } } +func TestMemCachedPersist(t *testing.T) { + t.Run("MemoryStore", func(t *testing.T) { + ps := NewMemoryStore() + testMemCachedStorePersist(t, ps) + }) + t.Run("MemoryCachedStore", func(t *testing.T) { + ps1 := NewMemoryStore() + ps2 := NewMemCachedStore(ps1) + testMemCachedStorePersist(t, ps2) + }) + t.Run("BoltDBStore", func(t *testing.T) { + ps := newBoltStoreForTesting(t) + testMemCachedStorePersist(t, ps) + }) +} + func TestCachedGetFromPersistent(t *testing.T) { key := []byte("key") value := []byte("value")