storage: implement GetBatch() to view storage changes
GetBatch returns changes to be persisted.
This commit is contained in:
parent
bcff9faac4
commit
fb9af98179
2 changed files with 58 additions and 0 deletions
|
@ -9,6 +9,20 @@ type MemCachedStore struct {
|
||||||
ps Store
|
ps Store
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type (
|
||||||
|
// KeyValue represents key-value pair.
|
||||||
|
KeyValue struct {
|
||||||
|
Key []byte
|
||||||
|
Value []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// MemBatch represents a changeset to be persisted.
|
||||||
|
MemBatch struct {
|
||||||
|
Put []KeyValue
|
||||||
|
Deleted []KeyValue
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// NewMemCachedStore creates a new MemCachedStore object.
|
// NewMemCachedStore creates a new MemCachedStore object.
|
||||||
func NewMemCachedStore(lower Store) *MemCachedStore {
|
func NewMemCachedStore(lower Store) *MemCachedStore {
|
||||||
return &MemCachedStore{
|
return &MemCachedStore{
|
||||||
|
@ -31,6 +45,26 @@ func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
|
||||||
return s.ps.Get(key)
|
return s.ps.Get(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetBatch returns currently accumulated changeset.
|
||||||
|
func (s *MemCachedStore) GetBatch() *MemBatch {
|
||||||
|
s.mut.RLock()
|
||||||
|
defer s.mut.RUnlock()
|
||||||
|
|
||||||
|
var b MemBatch
|
||||||
|
|
||||||
|
b.Put = make([]KeyValue, 0, len(s.mem))
|
||||||
|
for k, v := range s.mem {
|
||||||
|
b.Put = append(b.Put, KeyValue{Key: []byte(k), Value: v})
|
||||||
|
}
|
||||||
|
|
||||||
|
b.Deleted = make([]KeyValue, 0, len(s.del))
|
||||||
|
for k := range s.del {
|
||||||
|
b.Deleted = append(b.Deleted, KeyValue{Key: []byte(k)})
|
||||||
|
}
|
||||||
|
|
||||||
|
return &b
|
||||||
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
|
func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
|
||||||
s.mut.RLock()
|
s.mut.RLock()
|
||||||
|
|
|
@ -18,7 +18,9 @@ func TestMemCachedStorePersist(t *testing.T) {
|
||||||
assert.Equal(t, 0, c)
|
assert.Equal(t, 0, c)
|
||||||
// persisting one key should result in one key in ps and nothing in ts
|
// persisting one key should result in one key in ps and nothing in ts
|
||||||
assert.NoError(t, ts.Put([]byte("key"), []byte("value")))
|
assert.NoError(t, ts.Put([]byte("key"), []byte("value")))
|
||||||
|
checkBatch(t, ts, []KeyValue{{[]byte("key"), []byte("value")}}, nil)
|
||||||
c, err = ts.Persist()
|
c, err = ts.Persist()
|
||||||
|
checkBatch(t, ts, nil, nil)
|
||||||
assert.Equal(t, nil, err)
|
assert.Equal(t, nil, err)
|
||||||
assert.Equal(t, 1, c)
|
assert.Equal(t, 1, c)
|
||||||
v, err := ps.Get([]byte("key"))
|
v, err := ps.Get([]byte("key"))
|
||||||
|
@ -35,9 +37,14 @@ func TestMemCachedStorePersist(t *testing.T) {
|
||||||
v, err = ps.Get([]byte("key2"))
|
v, err = ps.Get([]byte("key2"))
|
||||||
assert.Equal(t, ErrKeyNotFound, err)
|
assert.Equal(t, ErrKeyNotFound, err)
|
||||||
assert.Equal(t, []byte(nil), v)
|
assert.Equal(t, []byte(nil), v)
|
||||||
|
checkBatch(t, ts, []KeyValue{
|
||||||
|
{[]byte("key"), []byte("newvalue")},
|
||||||
|
{[]byte("key2"), []byte("value2")},
|
||||||
|
}, nil)
|
||||||
// two keys should be persisted (one overwritten and one new) and
|
// two keys should be persisted (one overwritten and one new) and
|
||||||
// available in the ps
|
// available in the ps
|
||||||
c, err = ts.Persist()
|
c, err = ts.Persist()
|
||||||
|
checkBatch(t, ts, nil, nil)
|
||||||
assert.Equal(t, nil, err)
|
assert.Equal(t, nil, err)
|
||||||
assert.Equal(t, 2, c)
|
assert.Equal(t, 2, c)
|
||||||
v, err = ts.MemoryStore.Get([]byte("key"))
|
v, err = ts.MemoryStore.Get([]byte("key"))
|
||||||
|
@ -52,6 +59,7 @@ func TestMemCachedStorePersist(t *testing.T) {
|
||||||
v, err = ps.Get([]byte("key2"))
|
v, err = ps.Get([]byte("key2"))
|
||||||
assert.Equal(t, nil, err)
|
assert.Equal(t, nil, err)
|
||||||
assert.Equal(t, []byte("value2"), v)
|
assert.Equal(t, []byte("value2"), v)
|
||||||
|
checkBatch(t, ts, nil, nil)
|
||||||
// we've persisted some values, make sure successive persist is a no-op
|
// we've persisted some values, make sure successive persist is a no-op
|
||||||
c, err = ts.Persist()
|
c, err = ts.Persist()
|
||||||
assert.Equal(t, nil, err)
|
assert.Equal(t, nil, err)
|
||||||
|
@ -59,7 +67,9 @@ func TestMemCachedStorePersist(t *testing.T) {
|
||||||
// test persisting deletions
|
// test persisting deletions
|
||||||
err = ts.Delete([]byte("key"))
|
err = ts.Delete([]byte("key"))
|
||||||
assert.Equal(t, nil, err)
|
assert.Equal(t, nil, err)
|
||||||
|
checkBatch(t, ts, nil, []KeyValue{{Key: []byte("key")}})
|
||||||
c, err = ts.Persist()
|
c, err = ts.Persist()
|
||||||
|
checkBatch(t, ts, nil, nil)
|
||||||
assert.Equal(t, nil, err)
|
assert.Equal(t, nil, err)
|
||||||
assert.Equal(t, 0, c)
|
assert.Equal(t, 0, c)
|
||||||
v, err = ps.Get([]byte("key"))
|
v, err = ps.Get([]byte("key"))
|
||||||
|
@ -70,6 +80,20 @@ func TestMemCachedStorePersist(t *testing.T) {
|
||||||
assert.Equal(t, []byte("value2"), v)
|
assert.Equal(t, []byte("value2"), v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkBatch(t *testing.T, ts *MemCachedStore, put []KeyValue, del []KeyValue) {
|
||||||
|
b := ts.GetBatch()
|
||||||
|
assert.Equal(t, len(put), len(b.Put), "wrong number of put elements in a batch")
|
||||||
|
assert.Equal(t, len(del), len(b.Deleted), "wrong number of deleted elements in a batch")
|
||||||
|
|
||||||
|
for i := range put {
|
||||||
|
assert.Contains(t, b.Put, put[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range del {
|
||||||
|
assert.Contains(t, b.Deleted, del[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCachedGetFromPersistent(t *testing.T) {
|
func TestCachedGetFromPersistent(t *testing.T) {
|
||||||
key := []byte("key")
|
key := []byte("key")
|
||||||
value := []byte("value")
|
value := []byte("value")
|
||||||
|
|
Loading…
Reference in a new issue