forked from TrueCloudLab/neoneo-go
core: optimize (*MemCachedStore).Seek operation
Real persistent storage guarantees that result of Seek is sorted by keys. The idea of optimisation is to merge two sorted seek results into one (memStore+persistentStore), so that (*MemCachedStore).Seek will return sorted list. The only thing that remains is to sort items got from (*MemoryStore).Seek.
This commit is contained in:
parent
191cc45032
commit
7ba88e98e2
3 changed files with 145 additions and 13 deletions
|
@ -1,6 +1,13 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import "sync"
|
import (
|
||||||
|
"bytes"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||||
|
)
|
||||||
|
|
||||||
// MemCachedStore is a wrapper around persistent store that caches all changes
|
// MemCachedStore is a wrapper around persistent store that caches all changes
|
||||||
// being made for them to be later flushed in one batch.
|
// being made for them to be later flushed in one batch.
|
||||||
|
@ -83,21 +90,93 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
|
func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
|
||||||
|
// Create memory store `mem` and `del` snapshot not to hold the lock.
|
||||||
|
memRes := make([]KeyValueExists, 0)
|
||||||
|
sk := string(key)
|
||||||
s.mut.RLock()
|
s.mut.RLock()
|
||||||
defer s.mut.RUnlock()
|
for k, v := range s.MemoryStore.mem {
|
||||||
s.MemoryStore.seek(key, f)
|
if strings.HasPrefix(k, sk) {
|
||||||
s.ps.Seek(key, func(k, v []byte) {
|
memRes = append(memRes, KeyValueExists{
|
||||||
elem := string(k)
|
KeyValue: KeyValue{
|
||||||
// If it's in mem, we already called f() for it in MemoryStore.Seek().
|
Key: []byte(k),
|
||||||
_, present := s.mem[elem]
|
Value: slice.Copy(v),
|
||||||
if !present {
|
},
|
||||||
// If it's in del, we shouldn't be calling f() anyway.
|
Exists: true,
|
||||||
_, present = s.del[elem]
|
})
|
||||||
}
|
}
|
||||||
if !present {
|
}
|
||||||
f(k, v)
|
for k := range s.MemoryStore.del {
|
||||||
|
if strings.HasPrefix(k, sk) {
|
||||||
|
memRes = append(memRes, KeyValueExists{
|
||||||
|
KeyValue: KeyValue{
|
||||||
|
Key: []byte(k),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.mut.RUnlock()
|
||||||
|
// Sort memRes items for further comparison with ps items.
|
||||||
|
sort.Slice(memRes, func(i, j int) bool {
|
||||||
|
return bytes.Compare(memRes[i].Key, memRes[j].Key) < 0
|
||||||
|
})
|
||||||
|
|
||||||
|
var (
|
||||||
|
data1 = make(chan KeyValueExists)
|
||||||
|
data2 = make(chan KeyValue)
|
||||||
|
seekres = make(chan KeyValue)
|
||||||
|
)
|
||||||
|
// Seek over memory store.
|
||||||
|
go func() {
|
||||||
|
for _, kv := range memRes {
|
||||||
|
data1 <- kv
|
||||||
|
}
|
||||||
|
close(data1)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Seek over persistent store.
|
||||||
|
go func() {
|
||||||
|
s.mut.RLock()
|
||||||
|
s.ps.Seek(key, func(k, v []byte) {
|
||||||
|
// Must copy here, #1468.
|
||||||
|
data2 <- KeyValue{
|
||||||
|
Key: slice.Copy(k),
|
||||||
|
Value: slice.Copy(v),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
s.mut.RUnlock()
|
||||||
|
close(data2)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Merge results of seek operations in ascending order.
|
||||||
|
go func() {
|
||||||
|
kvMem, haveMem := <-data1
|
||||||
|
kvPs, havePs := <-data2
|
||||||
|
for {
|
||||||
|
if !haveMem && !havePs {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var isMem = haveMem && (!havePs || (bytes.Compare(kvMem.Key, kvPs.Key) < 0))
|
||||||
|
if isMem {
|
||||||
|
if kvMem.Exists {
|
||||||
|
seekres <- KeyValue{
|
||||||
|
Key: kvMem.Key,
|
||||||
|
Value: kvMem.Value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
kvMem, haveMem = <-data1
|
||||||
|
} else {
|
||||||
|
if !bytes.Equal(kvMem.Key, kvPs.Key) {
|
||||||
|
seekres <- kvPs
|
||||||
|
}
|
||||||
|
kvPs, havePs = <-data2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(seekres)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for r := range seekres {
|
||||||
|
f(r.Key, r.Value)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Persist flushes all the MemoryStore contents into the (supposedly) persistent
|
// Persist flushes all the MemoryStore contents into the (supposedly) persistent
|
||||||
|
|
|
@ -1,10 +1,13 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/internal/random"
|
"github.com/nspcc-dev/neo-go/internal/random"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -319,3 +322,52 @@ func TestMemCachedPersistFailing(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, b1, res)
|
require.Equal(t, b1, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCachedSeekSorting(t *testing.T) {
|
||||||
|
var (
|
||||||
|
// Given this prefix...
|
||||||
|
goodPrefix = []byte{1}
|
||||||
|
// these pairs should be found...
|
||||||
|
lowerKVs = []kvSeen{
|
||||||
|
{[]byte{1, 2, 3}, []byte("bra"), false},
|
||||||
|
{[]byte{1, 2, 5}, []byte("bar"), false},
|
||||||
|
{[]byte{1, 3, 3}, []byte("bra"), false},
|
||||||
|
{[]byte{1, 3, 5}, []byte("bra"), false},
|
||||||
|
}
|
||||||
|
// and these should be not.
|
||||||
|
deletedKVs = []kvSeen{
|
||||||
|
{[]byte{1, 7, 3}, []byte("pow"), false},
|
||||||
|
{[]byte{1, 7, 4}, []byte("qaz"), false},
|
||||||
|
}
|
||||||
|
// and these should be not.
|
||||||
|
updatedKVs = []kvSeen{
|
||||||
|
{[]byte{1, 2, 4}, []byte("zaq"), false},
|
||||||
|
{[]byte{1, 2, 6}, []byte("zaq"), false},
|
||||||
|
{[]byte{1, 3, 2}, []byte("wop"), false},
|
||||||
|
{[]byte{1, 3, 4}, []byte("zaq"), false},
|
||||||
|
}
|
||||||
|
ps = NewMemoryStore()
|
||||||
|
ts = NewMemCachedStore(ps)
|
||||||
|
)
|
||||||
|
for _, v := range lowerKVs {
|
||||||
|
require.NoError(t, ps.Put(v.key, v.val))
|
||||||
|
}
|
||||||
|
for _, v := range deletedKVs {
|
||||||
|
require.NoError(t, ps.Put(v.key, v.val))
|
||||||
|
require.NoError(t, ts.Delete(v.key))
|
||||||
|
}
|
||||||
|
for _, v := range updatedKVs {
|
||||||
|
require.NoError(t, ps.Put(v.key, []byte("stub")))
|
||||||
|
require.NoError(t, ts.Put(v.key, v.val))
|
||||||
|
}
|
||||||
|
var foundKVs []kvSeen
|
||||||
|
ts.Seek(goodPrefix, func(k, v []byte) {
|
||||||
|
foundKVs = append(foundKVs, kvSeen{key: slice.Copy(k), val: slice.Copy(v)})
|
||||||
|
})
|
||||||
|
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
||||||
|
expected := append(lowerKVs, updatedKVs...)
|
||||||
|
sort.Slice(expected, func(i, j int) bool {
|
||||||
|
return bytes.Compare(expected[i].key, expected[j].key) < 0
|
||||||
|
})
|
||||||
|
require.Equal(t, expected, foundKVs)
|
||||||
|
}
|
||||||
|
|
|
@ -55,7 +55,8 @@ type (
|
||||||
// PutChangeSet allows to push prepared changeset to the Store.
|
// PutChangeSet allows to push prepared changeset to the Store.
|
||||||
PutChangeSet(puts map[string][]byte, dels map[string]bool) error
|
PutChangeSet(puts map[string][]byte, dels map[string]bool) error
|
||||||
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
||||||
// Key and value slices should not be modified.
|
// Key and value slices should not be modified. Seek can guarantee that key-value items are sorted by
|
||||||
|
// key in ascending way.
|
||||||
Seek(k []byte, f func(k, v []byte))
|
Seek(k []byte, f func(k, v []byte))
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue