neoneo-go/pkg/core/storage/memcached_store.go
Roman Khimov 5fbe838fd4 core: add and use synchronous persist to avoid OOM
b9be892bf9 has made Persist asynchronous which
is very effective in allowing the system to continue processing
blocks/transactions while flushing things to disk. It at the same time is very
dangerous in that if the disk is slow and it takes much time to flush KV set
(more than persisting interval), there might be even bigger new KV set in
MemCachedStore by the time it finishes. Even if the system immediately starts
to flush this new data set it (being bigger) can take more time than the
previous one. And while doing so a new data set will appear in memory,
potentially again bigger than this.

So we can easily end up with the system going out of control, consuming more
and more memory and taking more and more time to persist a single set of
data. To avoid this we need to detect such condition and just wait for Persist
to really finish its job and release the resources.
2021-11-22 10:41:40 +03:00

294 lines
6.7 KiB
Go

package storage
import (
"bytes"
"context"
"sort"
"strings"
"sync"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
)
// MemCachedStore is a wrapper around persistent store that caches all changes
// being made for them to be later flushed in one batch.
type MemCachedStore struct {
MemoryStore
// plock protects Persist from double entrance.
plock sync.Mutex
// Persistent Store.
ps Store
}
type (
// KeyValue represents key-value pair.
KeyValue struct {
Key []byte
Value []byte
}
// KeyValueExists represents key-value pair with indicator whether the item
// exists in the persistent storage.
KeyValueExists struct {
KeyValue
Exists bool
}
// MemBatch represents a changeset to be persisted.
MemBatch struct {
Put []KeyValueExists
Deleted []KeyValueExists
}
)
// NewMemCachedStore creates a new MemCachedStore object.
func NewMemCachedStore(lower Store) *MemCachedStore {
return &MemCachedStore{
MemoryStore: *NewMemoryStore(),
ps: lower,
}
}
// Get implements the Store interface.
func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
s.mut.RLock()
defer s.mut.RUnlock()
k := string(key)
if val, ok := s.mem[k]; ok {
return val, nil
}
if _, ok := s.del[k]; ok {
return nil, ErrKeyNotFound
}
return s.ps.Get(key)
}
// GetBatch returns currently accumulated changeset.
func (s *MemCachedStore) GetBatch() *MemBatch {
s.mut.RLock()
defer s.mut.RUnlock()
var b MemBatch
b.Put = make([]KeyValueExists, 0, len(s.mem))
for k, v := range s.mem {
key := []byte(k)
_, err := s.ps.Get(key)
b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil})
}
b.Deleted = make([]KeyValueExists, 0, len(s.del))
for k := range s.del {
key := []byte(k)
_, err := s.ps.Get(key)
b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil})
}
return &b
}
// Seek implements the Store interface.
func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
s.seek(context.Background(), key, false, f)
}
// SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and
// value slices may not be copied and may be modified. SeekAsync can guarantee
// that key-value items are sorted by key in ascending way.
func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bool) chan KeyValue {
res := make(chan KeyValue)
go func() {
s.seek(ctx, key, cutPrefix, func(k, v []byte) {
res <- KeyValue{
Key: k,
Value: v,
}
})
close(res)
}()
return res
}
func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f func(k, v []byte)) {
// Create memory store `mem` and `del` snapshot not to hold the lock.
var memRes []KeyValueExists
sk := string(key)
s.mut.RLock()
for k, v := range s.MemoryStore.mem {
if strings.HasPrefix(k, sk) {
memRes = append(memRes, KeyValueExists{
KeyValue: KeyValue{
Key: []byte(k),
Value: v,
},
Exists: true,
})
}
}
for k := range s.MemoryStore.del {
if strings.HasPrefix(k, sk) {
memRes = append(memRes, KeyValueExists{
KeyValue: KeyValue{
Key: []byte(k),
},
})
}
}
ps := s.ps
s.mut.RUnlock()
// Sort memRes items for further comparison with ps items.
sort.Slice(memRes, func(i, j int) bool {
return bytes.Compare(memRes[i].Key, memRes[j].Key) < 0
})
var (
done bool
iMem int
kvMem KeyValueExists
haveMem bool
)
if iMem < len(memRes) {
kvMem = memRes[iMem]
haveMem = true
iMem++
}
// Merge results of seek operations in ascending order.
ps.Seek(key, func(k, v []byte) {
if done {
return
}
kvPs := KeyValue{
Key: slice.Copy(k),
Value: slice.Copy(v),
}
loop:
for {
select {
case <-ctx.Done():
done = true
break loop
default:
var isMem = haveMem && (bytes.Compare(kvMem.Key, kvPs.Key) < 0)
if isMem {
if kvMem.Exists {
if cutPrefix {
kvMem.Key = kvMem.Key[len(key):]
}
f(kvMem.Key, kvMem.Value)
}
if iMem < len(memRes) {
kvMem = memRes[iMem]
haveMem = true
iMem++
} else {
haveMem = false
}
} else {
if !bytes.Equal(kvMem.Key, kvPs.Key) {
if cutPrefix {
kvPs.Key = kvPs.Key[len(key):]
}
f(kvPs.Key, kvPs.Value)
}
break loop
}
}
}
})
if !done && haveMem {
loop:
for i := iMem - 1; i < len(memRes); i++ {
select {
case <-ctx.Done():
break loop
default:
kvMem = memRes[i]
if kvMem.Exists {
if cutPrefix {
kvMem.Key = kvMem.Key[len(key):]
}
f(kvMem.Key, kvMem.Value)
}
}
}
}
}
// Persist flushes all the MemoryStore contents into the (supposedly) persistent
// store ps. MemCachedStore remains accessible for the most part of this action
// (any new changes will be cached in memory).
func (s *MemCachedStore) Persist() (int, error) {
return s.persist(false)
}
// PersistSync flushes all the MemoryStore contents into the (supposedly) persistent
// store ps. It's different from Persist in that it blocks MemCachedStore completely
// while flushing things from memory to persistent store.
func (s *MemCachedStore) PersistSync() (int, error) {
return s.persist(true)
}
func (s *MemCachedStore) persist(isSync bool) (int, error) {
var err error
var keys, dkeys int
s.plock.Lock()
defer s.plock.Unlock()
s.mut.Lock()
keys = len(s.mem)
dkeys = len(s.del)
if keys == 0 && dkeys == 0 {
s.mut.Unlock()
return 0, nil
}
// tempstore technically copies current s in lower layer while real s
// starts using fresh new maps. This tempstore is only known here and
// nothing ever changes it, therefore accesses to it (reads) can go
// unprotected while writes are handled by s proper.
var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, del: s.del}, ps: s.ps}
s.ps = tempstore
s.mem = make(map[string][]byte)
s.del = make(map[string]bool)
if !isSync {
s.mut.Unlock()
}
err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del)
if !isSync {
s.mut.Lock()
}
if err == nil {
// tempstore.mem and tempstore.del are completely flushed now
// to tempstore.ps, so all KV pairs are the same and this
// substitution has no visible effects.
s.ps = tempstore.ps
} else {
// We're toast. We'll try to still keep proper state, but OOM
// killer will get to us eventually.
for k := range s.mem {
tempstore.put(k, s.mem[k])
}
for k := range s.del {
tempstore.drop(k)
}
s.ps = tempstore.ps
s.mem = tempstore.mem
s.del = tempstore.del
}
s.mut.Unlock()
return keys, err
}
// Close implements Store interface, clears up memory and closes the lower layer
// Store.
func (s *MemCachedStore) Close() error {
// It's always successful.
_ = s.MemoryStore.Close()
return s.ps.Close()
}