storage: drop Put and Delete from Store interface

It's only changed with PutChangeSet, single KV operations are handled by
MemCachedStore.
This commit is contained in:
Roman Khimov 2022-02-16 16:48:47 +03:00
parent 017795c9c1
commit be24bf6412
8 changed files with 82 additions and 126 deletions

View file

@ -47,15 +47,6 @@ func NewBoltDBStore(cfg BoltDBOptions) (*BoltDBStore, error) {
return &BoltDBStore{db: db}, nil return &BoltDBStore{db: db}, nil
} }
// Put implements the Store interface.
func (s *BoltDBStore) Put(key, value []byte) error {
return s.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(Bucket)
err := b.Put(key, value)
return err
})
}
// Get implements the Store interface. // Get implements the Store interface.
func (s *BoltDBStore) Get(key []byte) (val []byte, err error) { func (s *BoltDBStore) Get(key []byte) (val []byte, err error) {
err = s.db.View(func(tx *bbolt.Tx) error { err = s.db.View(func(tx *bbolt.Tx) error {
@ -73,14 +64,6 @@ func (s *BoltDBStore) Get(key []byte) (val []byte, err error) {
return return
} }
// Delete implements the Store interface.
func (s *BoltDBStore) Delete(key []byte) error {
return s.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(Bucket)
return b.Delete(key)
})
}
// PutChangeSet implements the Store interface. // PutChangeSet implements the Store interface.
func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error { func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error {
var err error var err error

View file

@ -36,11 +36,6 @@ func NewLevelDBStore(cfg LevelDBOptions) (*LevelDBStore, error) {
}, nil }, nil
} }
// Put implements the Store interface.
func (s *LevelDBStore) Put(key, value []byte) error {
return s.db.Put(key, value, nil)
}
// Get implements the Store interface. // Get implements the Store interface.
func (s *LevelDBStore) Get(key []byte) ([]byte, error) { func (s *LevelDBStore) Get(key []byte) ([]byte, error) {
value, err := s.db.Get(key, nil) value, err := s.db.Get(key, nil)
@ -50,11 +45,6 @@ func (s *LevelDBStore) Get(key []byte) ([]byte, error) {
return value, err return value, err
} }
// Delete implements the Store interface.
func (s *LevelDBStore) Delete(key []byte) error {
return s.db.Delete(key, nil)
}
// PutChangeSet implements the Store interface. // PutChangeSet implements the Store interface.
func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error { func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error {
tx, err := s.db.OpenTransaction() tx, err := s.db.OpenTransaction()

View file

@ -65,6 +65,25 @@ func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
return s.ps.Get(key) return s.ps.Get(key)
} }
// Put puts new KV pair into the store. Never returns an error.
func (s *MemCachedStore) Put(key, value []byte) error {
newKey := string(key)
vcopy := slice.Copy(value)
s.mut.Lock()
put(s.chooseMap(key), newKey, vcopy)
s.mut.Unlock()
return nil
}
// Delete drops KV pair from the store. Never returns an error.
func (s *MemCachedStore) Delete(key []byte) error {
newKey := string(key)
s.mut.Lock()
put(s.chooseMap(key), newKey, nil)
s.mut.Unlock()
return nil
}
// GetBatch returns currently accumulated changeset. // GetBatch returns currently accumulated changeset.
func (s *MemCachedStore) GetBatch() *MemBatch { func (s *MemCachedStore) GetBatch() *MemBatch {
s.mut.RLock() s.mut.RLock()

View file

@ -12,6 +12,34 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestMemCachedPutGetDelete(t *testing.T) {
ps := NewMemoryStore()
s := NewMemCachedStore(ps)
key := []byte("foo")
value := []byte("bar")
require.NoError(t, s.Put(key, value))
result, err := s.Get(key)
assert.Nil(t, err)
require.Equal(t, value, result)
err = s.Delete(key)
assert.Nil(t, err)
_, err = s.Get(key)
assert.NotNil(t, err)
assert.Equal(t, err, ErrKeyNotFound)
// Double delete.
err = s.Delete(key)
assert.Nil(t, err)
// Nonexistent.
key = []byte("sparse")
assert.NoError(t, s.Delete(key))
}
func testMemCachedStorePersist(t *testing.T, ps Store) { func testMemCachedStorePersist(t *testing.T, ps Store) {
// cached Store // cached Store
ts := NewMemCachedStore(ps) ts := NewMemCachedStore(ps)
@ -123,7 +151,7 @@ func TestCachedGetFromPersistent(t *testing.T) {
ps := NewMemoryStore() ps := NewMemoryStore()
ts := NewMemCachedStore(ps) ts := NewMemCachedStore(ps)
assert.NoError(t, ps.Put(key, value)) assert.NoError(t, ps.PutChangeSet(map[string][]byte{string(key): value}, nil))
val, err := ts.Get(key) val, err := ts.Get(key)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, value, val) assert.Equal(t, value, val)
@ -156,14 +184,14 @@ func TestCachedSeek(t *testing.T) {
ts = NewMemCachedStore(ps) ts = NewMemCachedStore(ps)
) )
for _, v := range lowerKVs { for _, v := range lowerKVs {
require.NoError(t, ps.Put(v.Key, v.Value)) require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil))
} }
for _, v := range deletedKVs { for _, v := range deletedKVs {
require.NoError(t, ps.Put(v.Key, v.Value)) require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil))
require.NoError(t, ts.Delete(v.Key)) require.NoError(t, ts.Delete(v.Key))
} }
for _, v := range updatedKVs { for _, v := range updatedKVs {
require.NoError(t, ps.Put(v.Key, []byte("stub"))) require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil))
require.NoError(t, ts.Put(v.Key, v.Value)) require.NoError(t, ts.Put(v.Key, v.Value))
} }
foundKVs := make(map[string][]byte) foundKVs := make(map[string][]byte)
@ -199,36 +227,38 @@ func benchmarkCachedSeek(t *testing.B, ps Store, psElementsCount, tsElementsCoun
) )
for i := 0; i < psElementsCount; i++ { for i := 0; i < psElementsCount; i++ {
// lower KVs with matching prefix that should be found // lower KVs with matching prefix that should be found
require.NoError(t, ps.Put(append(lowerPrefixGood, random.Bytes(10)...), []byte("value"))) require.NoError(t, ts.Put(append(lowerPrefixGood, random.Bytes(10)...), []byte("value")))
// lower KVs with non-matching prefix that shouldn't be found // lower KVs with non-matching prefix that shouldn't be found
require.NoError(t, ps.Put(append(lowerPrefixBad, random.Bytes(10)...), []byte("value"))) require.NoError(t, ts.Put(append(lowerPrefixBad, random.Bytes(10)...), []byte("value")))
// deleted KVs with matching prefix that shouldn't be found // deleted KVs with matching prefix that shouldn't be found
key := append(deletedPrefixGood, random.Bytes(10)...) key := append(deletedPrefixGood, random.Bytes(10)...)
require.NoError(t, ps.Put(key, []byte("deleted"))) require.NoError(t, ts.Put(key, []byte("deleted")))
if i < tsElementsCount { if i < tsElementsCount {
require.NoError(t, ts.Delete(key)) require.NoError(t, ts.Delete(key))
} }
// deleted KVs with non-matching prefix that shouldn't be found // deleted KVs with non-matching prefix that shouldn't be found
key = append(deletedPrefixBad, random.Bytes(10)...) key = append(deletedPrefixBad, random.Bytes(10)...)
require.NoError(t, ps.Put(key, []byte("deleted"))) require.NoError(t, ts.Put(key, []byte("deleted")))
if i < tsElementsCount { if i < tsElementsCount {
require.NoError(t, ts.Delete(key)) require.NoError(t, ts.Delete(key))
} }
// updated KVs with matching prefix that should be found // updated KVs with matching prefix that should be found
key = append(updatedPrefixGood, random.Bytes(10)...) key = append(updatedPrefixGood, random.Bytes(10)...)
require.NoError(t, ps.Put(key, []byte("stub"))) require.NoError(t, ts.Put(key, []byte("stub")))
if i < tsElementsCount { if i < tsElementsCount {
require.NoError(t, ts.Put(key, []byte("updated"))) require.NoError(t, ts.Put(key, []byte("updated")))
} }
// updated KVs with non-matching prefix that shouldn't be found // updated KVs with non-matching prefix that shouldn't be found
key = append(updatedPrefixBad, random.Bytes(10)...) key = append(updatedPrefixBad, random.Bytes(10)...)
require.NoError(t, ps.Put(key, []byte("stub"))) require.NoError(t, ts.Put(key, []byte("stub")))
if i < tsElementsCount { if i < tsElementsCount {
require.NoError(t, ts.Put(key, []byte("updated"))) require.NoError(t, ts.Put(key, []byte("updated")))
} }
} }
_, err := ts.PersistSync()
require.NoError(t, err)
t.ReportAllocs() t.ReportAllocs()
t.ResetTimer() t.ResetTimer()
@ -347,14 +377,14 @@ func TestCachedSeekSorting(t *testing.T) {
ts = NewMemCachedStore(ps) ts = NewMemCachedStore(ps)
) )
for _, v := range lowerKVs { for _, v := range lowerKVs {
require.NoError(t, ps.Put(v.Key, v.Value)) require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil))
} }
for _, v := range deletedKVs { for _, v := range deletedKVs {
require.NoError(t, ps.Put(v.Key, v.Value)) require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil))
require.NoError(t, ts.Delete(v.Key)) require.NoError(t, ts.Delete(v.Key))
} }
for _, v := range updatedKVs { for _, v := range updatedKVs {
require.NoError(t, ps.Put(v.Key, []byte("stub"))) require.NoError(t, ps.PutChangeSet(map[string][]byte{string(v.Key): v.Value}, nil))
require.NoError(t, ts.Put(v.Key, v.Value)) require.NoError(t, ts.Put(v.Key, v.Value))
} }
var foundKVs []KeyValue var foundKVs []KeyValue

View file

@ -5,8 +5,6 @@ import (
"sort" "sort"
"strings" "strings"
"sync" "sync"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
) )
// MemoryStore is an in-memory implementation of a Store, mainly // MemoryStore is an in-memory implementation of a Store, mainly
@ -51,31 +49,6 @@ func put(m map[string][]byte, key string, value []byte) {
m[key] = value m[key] = value
} }
// Put implements the Store interface. Never returns an error.
func (s *MemoryStore) Put(key, value []byte) error {
newKey := string(key)
vcopy := slice.Copy(value)
s.mut.Lock()
put(s.chooseMap(key), newKey, vcopy)
s.mut.Unlock()
return nil
}
// drop deletes a key-value pair from the store, it's supposed to be called
// with mutex locked.
func drop(m map[string][]byte, key string) {
m[key] = nil
}
// Delete implements Store interface. Never returns an error.
func (s *MemoryStore) Delete(key []byte) error {
newKey := string(key)
s.mut.Lock()
drop(s.chooseMap(key), newKey)
s.mut.Unlock()
return nil
}
// PutChangeSet implements the Store interface. Never returns an error. // PutChangeSet implements the Store interface. Never returns an error.
func (s *MemoryStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error { func (s *MemoryStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error {
s.mut.Lock() s.mut.Lock()
@ -103,7 +76,7 @@ func (s *MemoryStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error {
// sensitive to the order of KV pairs. // sensitive to the order of KV pairs.
s.seek(rng, func(k, v []byte) bool { s.seek(rng, func(k, v []byte) bool {
if !keep(k, v) { if !keep(k, v) {
drop(s.chooseMap(k), string(k)) delete(s.chooseMap(k), string(k))
} }
return true return true
}) })

View file

@ -20,10 +20,13 @@ func BenchmarkMemorySeek(t *testing.B) {
searchPrefix = []byte{1} searchPrefix = []byte{1}
badPrefix = []byte{2} badPrefix = []byte{2}
) )
ts := NewMemCachedStore(ms)
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
require.NoError(t, ms.Put(append(searchPrefix, random.Bytes(10)...), random.Bytes(10))) require.NoError(t, ts.Put(append(searchPrefix, random.Bytes(10)...), random.Bytes(10)))
require.NoError(t, ms.Put(append(badPrefix, random.Bytes(10)...), random.Bytes(10))) require.NoError(t, ts.Put(append(badPrefix, random.Bytes(10)...), random.Bytes(10)))
} }
_, err := ts.PersistSync()
require.NoError(t, err)
t.ReportAllocs() t.ReportAllocs()
t.ResetTimer() t.ResetTimer()

View file

@ -82,12 +82,11 @@ type SeekRange struct {
var ErrKeyNotFound = errors.New("key not found") var ErrKeyNotFound = errors.New("key not found")
type ( type (
// Store is anything that can persist and retrieve the blockchain. // Store is the underlying KV backend for the blockchain data, it's
// information. // not intended to be used directly, you wrap it with some memory cache
// layer most of the time.
Store interface { Store interface {
Delete(k []byte) error
Get([]byte) ([]byte, error) Get([]byte) ([]byte, error)
Put(k, v []byte) error
// PutChangeSet allows to push prepared changeset to the Store. // PutChangeSet allows to push prepared changeset to the Store.
PutChangeSet(puts map[string][]byte, stor map[string][]byte) error PutChangeSet(puts map[string][]byte, stor map[string][]byte) error
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f. // Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.

View file

@ -19,17 +19,6 @@ type dbSetup struct {
type dbTestFunction func(*testing.T, Store) type dbTestFunction func(*testing.T, Store)
func testStorePutAndGet(t *testing.T, s Store) {
key := []byte("foo")
value := []byte("bar")
require.NoError(t, s.Put(key, value))
result, err := s.Get(key)
assert.Nil(t, err)
require.Equal(t, value, result)
}
func testStoreGetNonExistent(t *testing.T, s Store) { func testStoreGetNonExistent(t *testing.T, s Store) {
key := []byte("sparse") key := []byte("sparse")
@ -37,7 +26,7 @@ func testStoreGetNonExistent(t *testing.T, s Store) {
assert.Equal(t, err, ErrKeyNotFound) assert.Equal(t, err, ErrKeyNotFound)
} }
func testStoreSeek(t *testing.T, s Store) { func pushSeekDataSet(t *testing.T, s Store) []KeyValue {
// Use the same set of kvs to test Seek with different prefix/start values. // Use the same set of kvs to test Seek with different prefix/start values.
kvs := []KeyValue{ kvs := []KeyValue{
{[]byte("10"), []byte("bar")}, {[]byte("10"), []byte("bar")},
@ -48,10 +37,17 @@ func testStoreSeek(t *testing.T, s Store) {
{[]byte("30"), []byte("bare")}, {[]byte("30"), []byte("bare")},
{[]byte("31"), []byte("barf")}, {[]byte("31"), []byte("barf")},
} }
up := NewMemCachedStore(s)
for _, v := range kvs { for _, v := range kvs {
require.NoError(t, s.Put(v.Key, v.Value)) require.NoError(t, up.Put(v.Key, v.Value))
}
_, err := up.PersistSync()
require.NoError(t, err)
return kvs
} }
func testStoreSeek(t *testing.T, s Store) {
kvs := pushSeekDataSet(t, s)
check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue, backwards bool, cont func(k, v []byte) bool) { check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue, backwards bool, cont func(k, v []byte) bool) {
// Seek result expected to be sorted in an ascending (for forwards seeking) or descending (for backwards seeking) way. // Seek result expected to be sorted in an ascending (for forwards seeking) or descending (for backwards seeking) way.
cmpFunc := func(i, j int) bool { cmpFunc := func(i, j int) bool {
@ -209,43 +205,8 @@ func testStoreSeek(t *testing.T, s Store) {
}) })
} }
func testStoreDeleteNonExistent(t *testing.T, s Store) {
key := []byte("sparse")
assert.NoError(t, s.Delete(key))
}
func testStorePutAndDelete(t *testing.T, s Store) {
key := []byte("foo")
value := []byte("bar")
require.NoError(t, s.Put(key, value))
err := s.Delete(key)
assert.Nil(t, err)
_, err = s.Get(key)
assert.NotNil(t, err)
assert.Equal(t, err, ErrKeyNotFound)
// Double delete.
err = s.Delete(key)
assert.Nil(t, err)
}
func testStoreSeekGC(t *testing.T, s Store) { func testStoreSeekGC(t *testing.T, s Store) {
kvs := []KeyValue{ kvs := pushSeekDataSet(t, s)
{[]byte("10"), []byte("bar")},
{[]byte("11"), []byte("bara")},
{[]byte("20"), []byte("barb")},
{[]byte("21"), []byte("barc")},
{[]byte("22"), []byte("bard")},
{[]byte("30"), []byte("bare")},
{[]byte("31"), []byte("barf")},
}
for _, v := range kvs {
require.NoError(t, s.Put(v.Key, v.Value))
}
err := s.SeekGC(SeekRange{Prefix: []byte("1")}, func(k, v []byte) bool { err := s.SeekGC(SeekRange{Prefix: []byte("1")}, func(k, v []byte) bool {
return true return true
}) })
@ -275,9 +236,7 @@ func TestAllDBs(t *testing.T) {
{"MemCached", newMemCachedStoreForTesting}, {"MemCached", newMemCachedStoreForTesting},
{"Memory", newMemoryStoreForTesting}, {"Memory", newMemoryStoreForTesting},
} }
var tests = []dbTestFunction{testStorePutAndGet, var tests = []dbTestFunction{testStoreGetNonExistent, testStoreSeek,
testStoreGetNonExistent, testStoreSeek,
testStoreDeleteNonExistent, testStorePutAndDelete,
testStoreSeekGC} testStoreSeekGC}
for _, db := range DBs { for _, db := range DBs {
for _, test := range tests { for _, test := range tests {