storage: add SeekGC interface for GC

It's very special, single-purpose thing, but it improves cumulative time spent
in GC by ~10% for LevelDB and by ~36% for BoltDB during 1050K mainnet chain
processing. While the overall chain import time doesn't change in any
noticeable way (~1%), I think it's still worth it, for machines with slower
disks the difference might be more noticeable.
This commit is contained in:
Roman Khimov 2022-02-11 20:35:45 +03:00
parent 51b804ab0e
commit ad606101c7
7 changed files with 120 additions and 16 deletions

View file

@ -197,23 +197,20 @@ func (s *Module) GC(index uint32, store storage.Store) time.Duration {
var stored int64
s.log.Info("starting MPT garbage collection", zap.Uint32("index", index))
start := time.Now()
b := store.Batch()
store.Seek(storage.SeekRange{
err := store.SeekGC(storage.SeekRange{
Prefix: []byte{byte(storage.DataMPT)},
}, func(k, v []byte) bool {
stored++
if !mpt.IsActiveValue(v) {
h := binary.LittleEndian.Uint32(v[len(v)-4:])
if h > index {
return true
if h <= index {
removed++
stored--
return false
}
b.Delete(k)
removed++
stored--
}
return true
})
err := store.PutBatch(b)
dur := time.Since(start)
if err != nil {
s.log.Error("failed to flush MPT GC changeset", zap.Duration("time", dur), zap.Error(err))

View file

@ -107,10 +107,31 @@ func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error {
})
}
// SeekGC implements the Store interface.
func (s *BoltDBStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error {
return boltSeek(s.db.Update, rng, func(c *bbolt.Cursor, k, v []byte) (bool, error) {
if !keep(k, v) {
if err := c.Delete(); err != nil {
return false, err
}
}
return true, nil
})
}
// Seek implements the Store interface.
func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
err := boltSeek(s.db.View, rng, func(_ *bbolt.Cursor, k, v []byte) (bool, error) {
return f(k, v), nil
})
if err != nil {
panic(err)
}
}
func boltSeek(txopener func(func(*bbolt.Tx) error) error, rng SeekRange, f func(c *bbolt.Cursor, k, v []byte) (bool, error)) error {
rang := seekRangeToPrefixes(rng)
err := s.db.View(func(tx *bbolt.Tx) error {
return txopener(func(tx *bbolt.Tx) error {
var (
k, v []byte
next func() ([]byte, []byte)
@ -133,15 +154,16 @@ func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
}
for ; k != nil && bytes.HasPrefix(k, rng.Prefix) && (len(rang.Limit) == 0 || bytes.Compare(k, rang.Limit) <= 0); k, v = next() {
if !f(k, v) {
cont, err := f(c, k, v)
if err != nil {
return err
}
if !cont {
break
}
}
return nil
})
if err != nil {
panic(err)
}
}
// Batch implements the Batch interface and returns a boltdb

View file

@ -3,6 +3,7 @@ package storage
import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
)
@ -82,13 +83,39 @@ func (s *LevelDBStore) PutChangeSet(puts map[string][]byte) error {
// Seek implements the Store interface.
func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
iter := s.db.NewIterator(seekRangeToPrefixes(rng), nil)
s.seek(iter, rng.Backwards, f)
}
// SeekGC implements the Store interface.
func (s *LevelDBStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error {
tx, err := s.db.OpenTransaction()
if err != nil {
return err
}
iter := tx.NewIterator(seekRangeToPrefixes(rng), nil)
s.seek(iter, rng.Backwards, func(k, v []byte) bool {
if !keep(k, v) {
err = tx.Delete(k, nil)
if err != nil {
return false
}
}
return true
})
if err != nil {
return err
}
return tx.Commit()
}
func (s *LevelDBStore) seek(iter iterator.Iterator, backwards bool, f func(k, v []byte) bool) {
var (
next func() bool
ok bool
iter = s.db.NewIterator(seekRangeToPrefixes(rng), nil)
)
if !rng.Backwards {
if !backwards {
ok = iter.Next()
next = iter.Next
} else {

View file

@ -293,6 +293,9 @@ func (b *BadStore) PutChangeSet(_ map[string][]byte) error {
}
func (b *BadStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
}
func (b *BadStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error {
return nil
}
func (b *BadStore) Close() error {
return nil
}

View file

@ -102,6 +102,21 @@ func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
s.mut.RUnlock()
}
// SeekGC implements the Store interface.
func (s *MemoryStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error {
s.mut.Lock()
// We still need to perform normal seek, some GC operations can be
// sensitive to the order of KV pairs.
s.seek(rng, func(k, v []byte) bool {
if !keep(k, v) {
s.drop(string(k))
}
return true
})
s.mut.Unlock()
return nil
}
// SeekAll is like seek but also iterates over deleted items.
func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
s.mut.RLock()

View file

@ -96,6 +96,11 @@ type (
// Key and value slices should not be modified.
// Seek can guarantee that key-value items are sorted by key in ascending way.
Seek(rng SeekRange, f func(k, v []byte) bool)
// SeekGC is similar to Seek, but the function should return true if current
// KV pair should be kept and false if it's to be deleted; there is no way to
// do an early exit here. SeekGC only works with the current Store, it won't
// go down to layers below and it takes a full write lock, so use it carefully.
SeekGC(rng SeekRange, keep func(k, v []byte) bool) error
Close() error
}

View file

@ -438,6 +438,41 @@ func testStorePutBatchWithDelete(t *testing.T, s Store) {
require.NoError(t, s.Close())
}
func testStoreSeekGC(t *testing.T, s Store) {
kvs := []KeyValue{
{[]byte("10"), []byte("bar")},
{[]byte("11"), []byte("bara")},
{[]byte("20"), []byte("barb")},
{[]byte("21"), []byte("barc")},
{[]byte("22"), []byte("bard")},
{[]byte("30"), []byte("bare")},
{[]byte("31"), []byte("barf")},
}
for _, v := range kvs {
require.NoError(t, s.Put(v.Key, v.Value))
}
err := s.SeekGC(SeekRange{Prefix: []byte("1")}, func(k, v []byte) bool {
return true
})
require.NoError(t, err)
for i := range kvs {
_, err = s.Get(kvs[i].Key)
require.NoError(t, err)
}
err = s.SeekGC(SeekRange{Prefix: []byte("3")}, func(k, v []byte) bool {
return false
})
require.NoError(t, err)
for i := range kvs[:5] {
_, err = s.Get(kvs[i].Key)
require.NoError(t, err)
}
for _, kv := range kvs[5:] {
_, err = s.Get(kv.Key)
require.Error(t, err)
}
}
func TestAllDBs(t *testing.T) {
var DBs = []dbSetup{
{"BoltDB", newBoltStoreForTesting},
@ -448,7 +483,7 @@ func TestAllDBs(t *testing.T) {
var tests = []dbTestFunction{testStoreClose, testStorePutAndGet,
testStoreGetNonExistent, testStorePutBatch, testStoreSeek,
testStoreDeleteNonExistent, testStorePutAndDelete,
testStorePutBatchWithDelete}
testStorePutBatchWithDelete, testStoreSeekGC}
for _, db := range DBs {
for _, test := range tests {
s := db.create(t)