From fb9af981792891b949a8fa8e6e1719b374e0eb5a Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 6 Feb 2020 16:11:32 +0300 Subject: [PATCH] storage: implement GetBatch() to view storage changes GetBatch returns changes to be persisted. --- pkg/core/storage/memcached_store.go | 34 ++++++++++++++++++++++++ pkg/core/storage/memcached_store_test.go | 24 +++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index de13fcada..bc99393eb 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -9,6 +9,20 @@ type MemCachedStore struct { ps Store } +type ( + // KeyValue represents key-value pair. + KeyValue struct { + Key []byte + Value []byte + } + + // MemBatch represents a changeset to be persisted. + MemBatch struct { + Put []KeyValue + Deleted []KeyValue + } +) + // NewMemCachedStore creates a new MemCachedStore object. func NewMemCachedStore(lower Store) *MemCachedStore { return &MemCachedStore{ @@ -31,6 +45,26 @@ func (s *MemCachedStore) Get(key []byte) ([]byte, error) { return s.ps.Get(key) } +// GetBatch returns currently accumulated changeset. +func (s *MemCachedStore) GetBatch() *MemBatch { + s.mut.RLock() + defer s.mut.RUnlock() + + var b MemBatch + + b.Put = make([]KeyValue, 0, len(s.mem)) + for k, v := range s.mem { + b.Put = append(b.Put, KeyValue{Key: []byte(k), Value: v}) + } + + b.Deleted = make([]KeyValue, 0, len(s.del)) + for k := range s.del { + b.Deleted = append(b.Deleted, KeyValue{Key: []byte(k)}) + } + + return &b +} + // Seek implements the Store interface. func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { s.mut.RLock() diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index b99fc6330..b3ac62cb3 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -18,7 +18,9 @@ func TestMemCachedStorePersist(t *testing.T) { assert.Equal(t, 0, c) // persisting one key should result in one key in ps and nothing in ts assert.NoError(t, ts.Put([]byte("key"), []byte("value"))) + checkBatch(t, ts, []KeyValue{{[]byte("key"), []byte("value")}}, nil) c, err = ts.Persist() + checkBatch(t, ts, nil, nil) assert.Equal(t, nil, err) assert.Equal(t, 1, c) v, err := ps.Get([]byte("key")) @@ -35,9 +37,14 @@ func TestMemCachedStorePersist(t *testing.T) { v, err = ps.Get([]byte("key2")) assert.Equal(t, ErrKeyNotFound, err) assert.Equal(t, []byte(nil), v) + checkBatch(t, ts, []KeyValue{ + {[]byte("key"), []byte("newvalue")}, + {[]byte("key2"), []byte("value2")}, + }, nil) // two keys should be persisted (one overwritten and one new) and // available in the ps c, err = ts.Persist() + checkBatch(t, ts, nil, nil) assert.Equal(t, nil, err) assert.Equal(t, 2, c) v, err = ts.MemoryStore.Get([]byte("key")) @@ -52,6 +59,7 @@ func TestMemCachedStorePersist(t *testing.T) { v, err = ps.Get([]byte("key2")) assert.Equal(t, nil, err) assert.Equal(t, []byte("value2"), v) + checkBatch(t, ts, nil, nil) // we've persisted some values, make sure successive persist is a no-op c, err = ts.Persist() assert.Equal(t, nil, err) @@ -59,7 +67,9 @@ func TestMemCachedStorePersist(t *testing.T) { // test persisting deletions err = ts.Delete([]byte("key")) assert.Equal(t, nil, err) + checkBatch(t, ts, nil, []KeyValue{{Key: []byte("key")}}) c, err = ts.Persist() + checkBatch(t, ts, nil, nil) assert.Equal(t, nil, err) assert.Equal(t, 0, c) v, err = ps.Get([]byte("key")) @@ -70,6 +80,20 @@ func TestMemCachedStorePersist(t *testing.T) { assert.Equal(t, []byte("value2"), v) } +func checkBatch(t *testing.T, ts *MemCachedStore, put []KeyValue, del []KeyValue) { + b := ts.GetBatch() + assert.Equal(t, len(put), len(b.Put), "wrong number of put elements in a batch") + assert.Equal(t, len(del), len(b.Deleted), "wrong number of deleted elements in a batch") + + for i := range put { + assert.Contains(t, b.Put, put[i]) + } + + for i := range del { + assert.Contains(t, b.Deleted, del[i]) + } +} + func TestCachedGetFromPersistent(t *testing.T) { key := []byte("key") value := []byte("value")