Merge pull request #2360 from nspcc-dev/gc-optimizations

GC optimizations
This commit is contained in:
Roman Khimov 2022-02-14 17:47:51 +03:00 committed by GitHub
commit 8216500fd7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 179 additions and 103 deletions

View file

@ -44,7 +44,7 @@ import (
// Tuning parameters.
const (
headerBatchCount = 2000
version = "0.2.2"
version = "0.2.3"
defaultInitialGAS = 52000000_00000000
defaultGCPeriod = 10000

View file

@ -118,7 +118,7 @@ func (s *Module) CurrentValidatedHeight() uint32 {
// Init initializes state root module at the given height.
func (s *Module) Init(height uint32) error {
data, err := s.Store.Get([]byte{byte(storage.DataMPT), prefixValidated})
data, err := s.Store.Get([]byte{byte(storage.DataMPTAux), prefixValidated})
if err == nil {
s.validatedHeight.Store(binary.LittleEndian.Uint32(data))
}
@ -156,16 +156,6 @@ func (s *Module) CleanStorage() error {
if err != nil {
return fmt.Errorf("failed to remove outdated MPT-reated items: %w", err)
}
currentLocal := s.currentLocal.Load().(util.Uint256)
if !currentLocal.Equals(util.Uint256{}) {
err := s.addLocalStateRoot(s.Store, &state.MPTRoot{
Index: s.localHeight.Load(),
Root: currentLocal,
})
if err != nil {
return fmt.Errorf("failed to store current local stateroot: %w", err)
}
}
return nil
}
@ -177,7 +167,7 @@ func (s *Module) JumpToState(sr *state.MPTRoot) error {
data := make([]byte, 4)
binary.LittleEndian.PutUint32(data, sr.Index)
if err := s.Store.Put([]byte{byte(storage.DataMPT), prefixValidated}, data); err != nil {
if err := s.Store.Put([]byte{byte(storage.DataMPTAux), prefixValidated}, data); err != nil {
return fmt.Errorf("failed to store validated height: %w", err)
}
s.validatedHeight.Store(sr.Index)
@ -197,30 +187,27 @@ func (s *Module) GC(index uint32, store storage.Store) time.Duration {
var stored int64
s.log.Info("starting MPT garbage collection", zap.Uint32("index", index))
start := time.Now()
b := store.Batch()
store.Seek(storage.SeekRange{
err := store.SeekGC(storage.SeekRange{
Prefix: []byte{byte(storage.DataMPT)},
}, func(k, v []byte) bool {
stored++
if !mpt.IsActiveValue(v) {
h := binary.LittleEndian.Uint32(v[len(v)-4:])
if h > index {
return true
}
b.Delete(k)
if h <= index {
removed++
stored--
return false
}
}
return true
})
err := store.PutBatch(b)
dur := time.Since(start)
if err != nil {
s.log.Error("failed to flush MPT GC changeset", zap.Duration("time", dur), zap.Error(err))
} else {
s.log.Info("finished MPT garbage collection",
zap.Int("removed", removed),
zap.Int64("stored", stored),
zap.Int64("kept", stored),
zap.Duration("time", dur))
}
return dur

View file

@ -29,7 +29,7 @@ func (s *Module) addLocalStateRoot(store *storage.MemCachedStore, sr *state.MPTR
data := make([]byte, 4)
binary.LittleEndian.PutUint32(data, sr.Index)
return store.Put([]byte{byte(storage.DataMPT), prefixLocal}, data)
return store.Put([]byte{byte(storage.DataMPTAux), prefixLocal}, data)
}
func putStateRoot(store *storage.MemCachedStore, key []byte, sr *state.MPTRoot) error {
@ -52,7 +52,7 @@ func (s *Module) getStateRoot(key []byte) (*state.MPTRoot, error) {
func makeStateRootKey(index uint32) []byte {
key := make([]byte, 5)
key[0] = byte(storage.DataMPT)
key[0] = byte(storage.DataMPTAux)
binary.BigEndian.PutUint32(key, index)
return key
}
@ -79,7 +79,7 @@ func (s *Module) AddStateRoot(sr *state.MPTRoot) error {
data := make([]byte, 4)
binary.LittleEndian.PutUint32(data, sr.Index)
if err := s.Store.Put([]byte{byte(storage.DataMPT), prefixValidated}, data); err != nil {
if err := s.Store.Put([]byte{byte(storage.DataMPTAux), prefixValidated}, data); err != nil {
return err
}
s.validatedHeight.Store(sr.Index)

View file

@ -7,7 +7,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"github.com/syndtr/goleveldb/leveldb/util"
"go.etcd.io/bbolt"
)
@ -92,7 +91,7 @@ func (s *BoltDBStore) PutBatch(batch Batch) error {
func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error {
var err error
return s.db.Batch(func(tx *bbolt.Tx) error {
return s.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(Bucket)
for k, v := range puts {
if v != nil {
@ -108,55 +107,63 @@ func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error {
})
}
// SeekGC implements the Store interface.
func (s *BoltDBStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error {
return boltSeek(s.db.Update, rng, func(c *bbolt.Cursor, k, v []byte) (bool, error) {
if !keep(k, v) {
if err := c.Delete(); err != nil {
return false, err
}
}
return true, nil
})
}
// Seek implements the Store interface.
func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
start := make([]byte, len(rng.Prefix)+len(rng.Start))
copy(start, rng.Prefix)
copy(start[len(rng.Prefix):], rng.Start)
if rng.Backwards {
s.seekBackwards(rng.Prefix, start, f)
err := boltSeek(s.db.View, rng, func(_ *bbolt.Cursor, k, v []byte) (bool, error) {
return f(k, v), nil
})
if err != nil {
panic(err)
}
}
func boltSeek(txopener func(func(*bbolt.Tx) error) error, rng SeekRange, f func(c *bbolt.Cursor, k, v []byte) (bool, error)) error {
rang := seekRangeToPrefixes(rng)
return txopener(func(tx *bbolt.Tx) error {
var (
k, v []byte
next func() ([]byte, []byte)
)
c := tx.Bucket(Bucket).Cursor()
if !rng.Backwards {
k, v = c.Seek(rang.Start)
next = c.Next
} else {
s.seek(rng.Prefix, start, f)
}
}
func (s *BoltDBStore) seek(key []byte, start []byte, f func(k, v []byte) bool) {
prefix := util.BytesPrefix(key)
prefix.Start = start
err := s.db.View(func(tx *bbolt.Tx) error {
c := tx.Bucket(Bucket).Cursor()
for k, v := c.Seek(prefix.Start); k != nil && (len(prefix.Limit) == 0 || bytes.Compare(k, prefix.Limit) <= 0); k, v = c.Next() {
if !f(k, v) {
break
}
}
return nil
})
if err != nil {
panic(err)
}
}
func (s *BoltDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte) bool) {
err := s.db.View(func(tx *bbolt.Tx) error {
c := tx.Bucket(Bucket).Cursor()
// Move cursor to the first kv pair which is followed by the pair matching the specified prefix.
if len(start) == 0 {
if len(rang.Limit) == 0 {
lastKey, _ := c.Last()
start = lastKey
k, v = c.Seek(lastKey)
} else {
c.Seek(rang.Limit)
k, v = c.Prev()
}
rng := util.BytesPrefix(start) // in fact, we only need limit based on start slice to iterate backwards starting from this limit
c.Seek(rng.Limit)
for k, v := c.Prev(); k != nil && bytes.HasPrefix(k, key); k, v = c.Prev() {
if !f(k, v) {
next = c.Prev
}
for ; k != nil && bytes.HasPrefix(k, rng.Prefix) && (len(rang.Limit) == 0 || bytes.Compare(k, rang.Limit) <= 0); k, v = next() {
cont, err := f(c, k, v)
if err != nil {
return err
}
if !cont {
break
}
}
return nil
})
if err != nil {
panic(err)
}
}
// Batch implements the Batch interface and returns a boltdb

View file

@ -3,8 +3,8 @@ package storage
import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
// LevelDBOptions configuration for LevelDB.
@ -83,34 +83,47 @@ func (s *LevelDBStore) PutChangeSet(puts map[string][]byte) error {
// Seek implements the Store interface.
func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
start := make([]byte, len(rng.Prefix)+len(rng.Start))
copy(start, rng.Prefix)
copy(start[len(rng.Prefix):], rng.Start)
if rng.Backwards {
s.seekBackwards(rng.Prefix, start, f)
iter := s.db.NewIterator(seekRangeToPrefixes(rng), nil)
s.seek(iter, rng.Backwards, f)
}
// SeekGC implements the Store interface.
func (s *LevelDBStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error {
tx, err := s.db.OpenTransaction()
if err != nil {
return err
}
iter := tx.NewIterator(seekRangeToPrefixes(rng), nil)
s.seek(iter, rng.Backwards, func(k, v []byte) bool {
if !keep(k, v) {
err = tx.Delete(k, nil)
if err != nil {
return false
}
}
return true
})
if err != nil {
return err
}
return tx.Commit()
}
func (s *LevelDBStore) seek(iter iterator.Iterator, backwards bool, f func(k, v []byte) bool) {
var (
next func() bool
ok bool
)
if !backwards {
ok = iter.Next()
next = iter.Next
} else {
s.seek(rng.Prefix, start, f)
}
ok = iter.Last()
next = iter.Prev
}
func (s *LevelDBStore) seek(key []byte, start []byte, f func(k, v []byte) bool) {
prefix := util.BytesPrefix(key)
prefix.Start = start
iter := s.db.NewIterator(prefix, nil)
for iter.Next() {
if !f(iter.Key(), iter.Value()) {
break
}
}
iter.Release()
}
func (s *LevelDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte) bool) {
iRange := util.BytesPrefix(start)
iRange.Start = key
iter := s.db.NewIterator(iRange, nil)
for ok := iter.Last(); ok; ok = iter.Prev() {
for ; ok; ok = next() {
if !f(iter.Key(), iter.Value()) {
break
}

View file

@ -293,6 +293,9 @@ func (b *BadStore) PutChangeSet(_ map[string][]byte) error {
}
func (b *BadStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
}
func (b *BadStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error {
return nil
}
func (b *BadStore) Close() error {
return nil
}

View file

@ -102,6 +102,21 @@ func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
s.mut.RUnlock()
}
// SeekGC implements the Store interface.
func (s *MemoryStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error {
s.mut.Lock()
// We still need to perform normal seek, some GC operations can be
// sensitive to the order of KV pairs.
s.seek(rng, func(k, v []byte) bool {
if !keep(k, v) {
s.drop(string(k))
}
return true
})
s.mut.Unlock()
return nil
}
// SeekAll is like seek but also iterates over deleted items.
func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
s.mut.RLock()

View file

@ -4,12 +4,18 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/syndtr/goleveldb/leveldb/util"
)
// KeyPrefix constants.
const (
DataExecutable KeyPrefix = 0x01
// DataMPT is used for MPT node entries identified by Uint256.
DataMPT KeyPrefix = 0x03
// DataMPTAux is used to store additional MPT data like height-root
// mappings and local/validated heights.
DataMPTAux KeyPrefix = 0x04
STAccount KeyPrefix = 0x40
STContractID KeyPrefix = 0x51
STStorage KeyPrefix = 0x70
@ -94,6 +100,11 @@ type (
// Key and value slices should not be modified.
// Seek can guarantee that key-value items are sorted by key in ascending way.
Seek(rng SeekRange, f func(k, v []byte) bool)
// SeekGC is similar to Seek, but the function should return true if current
// KV pair should be kept and false if it's to be deleted; there is no way to
// do an early exit here. SeekGC only works with the current Store, it won't
// go down to layers below and it takes a full write lock, so use it carefully.
SeekGC(rng SeekRange, keep func(k, v []byte) bool) error
Close() error
}
@ -133,6 +144,24 @@ func AppendPrefixInt(k KeyPrefix, n int) []byte {
return AppendPrefix(k, b)
}
func seekRangeToPrefixes(sr SeekRange) *util.Range {
var (
rang *util.Range
start = make([]byte, len(sr.Prefix)+len(sr.Start))
)
copy(start, sr.Prefix)
copy(start[len(sr.Prefix):], sr.Start)
if !sr.Backwards {
rang = util.BytesPrefix(sr.Prefix)
rang.Start = start
} else {
rang = util.BytesPrefix(start)
rang.Start = sr.Prefix
}
return rang
}
// NewStore creates storage with preselected in configuration database type.
func NewStore(cfg DBConfiguration) (Store, error) {
var store Store

View file

@ -19,10 +19,6 @@ type dbSetup struct {
type dbTestFunction func(*testing.T, Store)
func testStoreClose(t *testing.T, s Store) {
require.NoError(t, s.Close())
}
func testStorePutAndGet(t *testing.T, s Store) {
key := []byte("foo")
value := []byte("bar")
@ -32,8 +28,6 @@ func testStorePutAndGet(t *testing.T, s Store) {
result, err := s.Get(key)
assert.Nil(t, err)
require.Equal(t, value, result)
require.NoError(t, s.Close())
}
func testStoreGetNonExistent(t *testing.T, s Store) {
@ -41,7 +35,6 @@ func testStoreGetNonExistent(t *testing.T, s Store) {
_, err := s.Get(key)
assert.Equal(t, err, ErrKeyNotFound)
require.NoError(t, s.Close())
}
func testStorePutBatch(t *testing.T, s Store) {
@ -63,7 +56,6 @@ func testStorePutBatch(t *testing.T, s Store) {
assert.Nil(t, err)
require.Equal(t, value, newVal)
assert.Equal(t, value, newVal)
require.NoError(t, s.Close())
}
func testStoreSeek(t *testing.T, s Store) {
@ -338,15 +330,12 @@ func testStoreSeek(t *testing.T, s Store) {
})
})
})
require.NoError(t, s.Close())
}
func testStoreDeleteNonExistent(t *testing.T, s Store) {
key := []byte("sparse")
assert.NoError(t, s.Delete(key))
require.NoError(t, s.Close())
}
func testStorePutAndDelete(t *testing.T, s Store) {
@ -365,8 +354,6 @@ func testStorePutAndDelete(t *testing.T, s Store) {
// Double delete.
err = s.Delete(key)
assert.Nil(t, err)
require.NoError(t, s.Close())
}
func testStorePutBatchWithDelete(t *testing.T, s Store) {
@ -435,7 +422,41 @@ func testStorePutBatchWithDelete(t *testing.T, s Store) {
assert.Equal(t, ErrKeyNotFound, err, "%s:%s", k, v)
}
}
require.NoError(t, s.Close())
}
func testStoreSeekGC(t *testing.T, s Store) {
kvs := []KeyValue{
{[]byte("10"), []byte("bar")},
{[]byte("11"), []byte("bara")},
{[]byte("20"), []byte("barb")},
{[]byte("21"), []byte("barc")},
{[]byte("22"), []byte("bard")},
{[]byte("30"), []byte("bare")},
{[]byte("31"), []byte("barf")},
}
for _, v := range kvs {
require.NoError(t, s.Put(v.Key, v.Value))
}
err := s.SeekGC(SeekRange{Prefix: []byte("1")}, func(k, v []byte) bool {
return true
})
require.NoError(t, err)
for i := range kvs {
_, err = s.Get(kvs[i].Key)
require.NoError(t, err)
}
err = s.SeekGC(SeekRange{Prefix: []byte("3")}, func(k, v []byte) bool {
return false
})
require.NoError(t, err)
for i := range kvs[:5] {
_, err = s.Get(kvs[i].Key)
require.NoError(t, err)
}
for _, kv := range kvs[5:] {
_, err = s.Get(kv.Key)
require.Error(t, err)
}
}
func TestAllDBs(t *testing.T) {
@ -445,10 +466,10 @@ func TestAllDBs(t *testing.T) {
{"MemCached", newMemCachedStoreForTesting},
{"Memory", newMemoryStoreForTesting},
}
var tests = []dbTestFunction{testStoreClose, testStorePutAndGet,
var tests = []dbTestFunction{testStorePutAndGet,
testStoreGetNonExistent, testStorePutBatch, testStoreSeek,
testStoreDeleteNonExistent, testStorePutAndDelete,
testStorePutBatchWithDelete}
testStorePutBatchWithDelete, testStoreSeekGC}
for _, db := range DBs {
for _, test := range tests {
s := db.create(t)
@ -457,6 +478,7 @@ func TestAllDBs(t *testing.T) {
}
fname := runtime.FuncForPC(reflect.ValueOf(test).Pointer()).Name()
t.Run(db.name+"/"+fname, twrapper)
require.NoError(t, s.Close())
}
}
}