forked from TrueCloudLab/neoneo-go
35bdfc5eca
Simple and dumb as it is, this allows to separate contract storage from other things and dramatically improve Seek() time over storage (even though it's still unordered!) which in turn improves block processing speed. LevelDB LevelDB (KeepOnlyLatest) BoltDB BoltDB (KeepOnlyLatest) Master real 16m27,936s real 10m9,440s real 16m39,369s real 8m1,227s user 20m12,619s user 26m13,925s user 18m9,162s user 18m5,846s sys 2m56,377s sys 1m32,051s sys 9m52,576s sys 2m9,455s 2 maps real 10m49,495s real 8m53,342s real 11m46,204s real 5m56,043s user 14m19,922s user 24m6,225s user 13m25,691s user 15m4,694s sys 1m53,021s sys 1m23,006s sys 4m31,735s sys 2m8,714s neo-bench performance is mostly unaffected, ~0.5% for 1-1 test and 4% for 10K-10K test both fall within regular test error range.
317 lines
7.7 KiB
Go
317 lines
7.7 KiB
Go
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
|
)
|
|
|
|
// MemCachedStore is a wrapper around persistent store that caches all changes
|
|
// being made for them to be later flushed in one batch.
|
|
type MemCachedStore struct {
|
|
MemoryStore
|
|
|
|
// plock protects Persist from double entrance.
|
|
plock sync.Mutex
|
|
// Persistent Store.
|
|
ps Store
|
|
}
|
|
|
|
type (
|
|
// KeyValue represents key-value pair.
|
|
KeyValue struct {
|
|
Key []byte
|
|
Value []byte
|
|
}
|
|
|
|
// KeyValueExists represents key-value pair with indicator whether the item
|
|
// exists in the persistent storage.
|
|
KeyValueExists struct {
|
|
KeyValue
|
|
|
|
Exists bool
|
|
}
|
|
|
|
// MemBatch represents a changeset to be persisted.
|
|
MemBatch struct {
|
|
Put []KeyValueExists
|
|
Deleted []KeyValueExists
|
|
}
|
|
)
|
|
|
|
// NewMemCachedStore creates a new MemCachedStore object.
|
|
func NewMemCachedStore(lower Store) *MemCachedStore {
|
|
return &MemCachedStore{
|
|
MemoryStore: *NewMemoryStore(),
|
|
ps: lower,
|
|
}
|
|
}
|
|
|
|
// Get implements the Store interface.
|
|
func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
|
|
s.mut.RLock()
|
|
defer s.mut.RUnlock()
|
|
m := s.chooseMap(key)
|
|
if val, ok := m[string(key)]; ok {
|
|
if val == nil {
|
|
return nil, ErrKeyNotFound
|
|
}
|
|
return val, nil
|
|
}
|
|
return s.ps.Get(key)
|
|
}
|
|
|
|
// GetBatch returns currently accumulated changeset.
|
|
func (s *MemCachedStore) GetBatch() *MemBatch {
|
|
s.mut.RLock()
|
|
defer s.mut.RUnlock()
|
|
|
|
var b MemBatch
|
|
|
|
b.Put = make([]KeyValueExists, 0, len(s.mem)+len(s.stor))
|
|
b.Deleted = make([]KeyValueExists, 0)
|
|
for _, m := range []map[string][]byte{s.mem, s.stor} {
|
|
for k, v := range m {
|
|
key := []byte(k)
|
|
_, err := s.ps.Get(key)
|
|
if v == nil {
|
|
b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil})
|
|
} else {
|
|
b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil})
|
|
}
|
|
}
|
|
}
|
|
return &b
|
|
}
|
|
|
|
// Seek implements the Store interface.
|
|
func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
|
s.seek(context.Background(), rng, false, f)
|
|
}
|
|
|
|
// SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and
|
|
// value slices may not be copied and may be modified. SeekAsync can guarantee
|
|
// that key-value items are sorted by key in ascending way.
|
|
func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix bool) chan KeyValue {
|
|
res := make(chan KeyValue)
|
|
go func() {
|
|
s.seek(ctx, rng, cutPrefix, func(k, v []byte) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case res <- KeyValue{Key: k, Value: v}:
|
|
return true
|
|
}
|
|
})
|
|
close(res)
|
|
}()
|
|
|
|
return res
|
|
}
|
|
|
|
// seek is internal representations of Seek* capable of seeking for the given key
|
|
// and supporting early stop using provided context. `cutPrefix` denotes whether provided
|
|
// key needs to be cut off the resulting keys. `rng` specifies prefix items must match
|
|
// and point to start seeking from. Backwards seeking from some point is supported
|
|
// with corresponding `rng` field set.
|
|
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) {
|
|
// Create memory store `mem` and `del` snapshot not to hold the lock.
|
|
var memRes []KeyValueExists
|
|
sPrefix := string(rng.Prefix)
|
|
lPrefix := len(sPrefix)
|
|
sStart := string(rng.Start)
|
|
lStart := len(sStart)
|
|
isKeyOK := func(key string) bool {
|
|
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0)
|
|
}
|
|
if rng.Backwards {
|
|
isKeyOK = func(key string) bool {
|
|
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0)
|
|
}
|
|
}
|
|
s.mut.RLock()
|
|
m := s.MemoryStore.chooseMap(rng.Prefix)
|
|
for k, v := range m {
|
|
if isKeyOK(k) {
|
|
memRes = append(memRes, KeyValueExists{
|
|
KeyValue: KeyValue{
|
|
Key: []byte(k),
|
|
Value: v,
|
|
},
|
|
Exists: v != nil,
|
|
})
|
|
}
|
|
}
|
|
ps := s.ps
|
|
s.mut.RUnlock()
|
|
|
|
less := func(k1, k2 []byte) bool {
|
|
res := bytes.Compare(k1, k2)
|
|
return res != 0 && rng.Backwards == (res > 0)
|
|
}
|
|
// Sort memRes items for further comparison with ps items.
|
|
sort.Slice(memRes, func(i, j int) bool {
|
|
return less(memRes[i].Key, memRes[j].Key)
|
|
})
|
|
|
|
var (
|
|
done bool
|
|
iMem int
|
|
kvMem KeyValueExists
|
|
haveMem bool
|
|
)
|
|
if iMem < len(memRes) {
|
|
kvMem = memRes[iMem]
|
|
haveMem = true
|
|
iMem++
|
|
}
|
|
// Merge results of seek operations in ascending order. It returns whether iterating
|
|
// should be continued.
|
|
mergeFunc := func(k, v []byte) bool {
|
|
if done {
|
|
return false
|
|
}
|
|
kvPs := KeyValue{
|
|
Key: slice.Copy(k),
|
|
Value: slice.Copy(v),
|
|
}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
done = true
|
|
return false
|
|
default:
|
|
var isMem = haveMem && less(kvMem.Key, kvPs.Key)
|
|
if isMem {
|
|
if kvMem.Exists {
|
|
if cutPrefix {
|
|
kvMem.Key = kvMem.Key[lPrefix:]
|
|
}
|
|
if !f(kvMem.Key, kvMem.Value) {
|
|
done = true
|
|
return false
|
|
}
|
|
}
|
|
if iMem < len(memRes) {
|
|
kvMem = memRes[iMem]
|
|
haveMem = true
|
|
iMem++
|
|
} else {
|
|
haveMem = false
|
|
}
|
|
} else {
|
|
if !bytes.Equal(kvMem.Key, kvPs.Key) {
|
|
if cutPrefix {
|
|
kvPs.Key = kvPs.Key[lPrefix:]
|
|
}
|
|
if !f(kvPs.Key, kvPs.Value) {
|
|
done = true
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
ps.Seek(rng, mergeFunc)
|
|
|
|
if !done && haveMem {
|
|
loop:
|
|
for i := iMem - 1; i < len(memRes); i++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
break loop
|
|
default:
|
|
kvMem = memRes[i]
|
|
if kvMem.Exists {
|
|
if cutPrefix {
|
|
kvMem.Key = kvMem.Key[lPrefix:]
|
|
}
|
|
if !f(kvMem.Key, kvMem.Value) {
|
|
break loop
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Persist flushes all the MemoryStore contents into the (supposedly) persistent
|
|
// store ps. MemCachedStore remains accessible for the most part of this action
|
|
// (any new changes will be cached in memory).
|
|
func (s *MemCachedStore) Persist() (int, error) {
|
|
return s.persist(false)
|
|
}
|
|
|
|
// PersistSync flushes all the MemoryStore contents into the (supposedly) persistent
|
|
// store ps. It's different from Persist in that it blocks MemCachedStore completely
|
|
// while flushing things from memory to persistent store.
|
|
func (s *MemCachedStore) PersistSync() (int, error) {
|
|
return s.persist(true)
|
|
}
|
|
|
|
func (s *MemCachedStore) persist(isSync bool) (int, error) {
|
|
var err error
|
|
var keys int
|
|
|
|
s.plock.Lock()
|
|
defer s.plock.Unlock()
|
|
s.mut.Lock()
|
|
|
|
keys = len(s.mem) + len(s.stor)
|
|
if keys == 0 {
|
|
s.mut.Unlock()
|
|
return 0, nil
|
|
}
|
|
|
|
// tempstore technically copies current s in lower layer while real s
|
|
// starts using fresh new maps. This tempstore is only known here and
|
|
// nothing ever changes it, therefore accesses to it (reads) can go
|
|
// unprotected while writes are handled by s proper.
|
|
var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, stor: s.stor}, ps: s.ps}
|
|
s.ps = tempstore
|
|
s.mem = make(map[string][]byte, len(s.mem))
|
|
s.stor = make(map[string][]byte, len(s.stor))
|
|
if !isSync {
|
|
s.mut.Unlock()
|
|
}
|
|
|
|
err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.stor)
|
|
|
|
if !isSync {
|
|
s.mut.Lock()
|
|
}
|
|
if err == nil {
|
|
// tempstore.mem and tempstore.del are completely flushed now
|
|
// to tempstore.ps, so all KV pairs are the same and this
|
|
// substitution has no visible effects.
|
|
s.ps = tempstore.ps
|
|
} else {
|
|
// We're toast. We'll try to still keep proper state, but OOM
|
|
// killer will get to us eventually.
|
|
for k := range s.mem {
|
|
put(tempstore.mem, k, s.mem[k])
|
|
}
|
|
for k := range s.stor {
|
|
put(tempstore.stor, k, s.stor[k])
|
|
}
|
|
s.ps = tempstore.ps
|
|
s.mem = tempstore.mem
|
|
s.stor = tempstore.stor
|
|
}
|
|
s.mut.Unlock()
|
|
return keys, err
|
|
}
|
|
|
|
// Close implements Store interface, clears up memory and closes the lower layer
|
|
// Store.
|
|
func (s *MemCachedStore) Close() error {
|
|
// It's always successful.
|
|
_ = s.MemoryStore.Close()
|
|
return s.ps.Close()
|
|
}
|