diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index b01828ed8..8c73d7e28 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -197,23 +197,20 @@ func (s *Module) GC(index uint32, store storage.Store) time.Duration { var stored int64 s.log.Info("starting MPT garbage collection", zap.Uint32("index", index)) start := time.Now() - b := store.Batch() - store.Seek(storage.SeekRange{ + err := store.SeekGC(storage.SeekRange{ Prefix: []byte{byte(storage.DataMPT)}, }, func(k, v []byte) bool { stored++ if !mpt.IsActiveValue(v) { h := binary.LittleEndian.Uint32(v[len(v)-4:]) - if h > index { - return true + if h <= index { + removed++ + stored-- + return false } - b.Delete(k) - removed++ - stored-- } return true }) - err := store.PutBatch(b) dur := time.Since(start) if err != nil { s.log.Error("failed to flush MPT GC changeset", zap.Duration("time", dur), zap.Error(err)) diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index 8157e84eb..ac6cd3a16 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -107,10 +107,31 @@ func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error { }) } +// SeekGC implements the Store interface. +func (s *BoltDBStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { + return boltSeek(s.db.Update, rng, func(c *bbolt.Cursor, k, v []byte) (bool, error) { + if !keep(k, v) { + if err := c.Delete(); err != nil { + return false, err + } + } + return true, nil + }) +} + // Seek implements the Store interface. func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) { + err := boltSeek(s.db.View, rng, func(_ *bbolt.Cursor, k, v []byte) (bool, error) { + return f(k, v), nil + }) + if err != nil { + panic(err) + } +} + +func boltSeek(txopener func(func(*bbolt.Tx) error) error, rng SeekRange, f func(c *bbolt.Cursor, k, v []byte) (bool, error)) error { rang := seekRangeToPrefixes(rng) - err := s.db.View(func(tx *bbolt.Tx) error { + return txopener(func(tx *bbolt.Tx) error { var ( k, v []byte next func() ([]byte, []byte) @@ -133,15 +154,16 @@ func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) { } for ; k != nil && bytes.HasPrefix(k, rng.Prefix) && (len(rang.Limit) == 0 || bytes.Compare(k, rang.Limit) <= 0); k, v = next() { - if !f(k, v) { + cont, err := f(c, k, v) + if err != nil { + return err + } + if !cont { break } } return nil }) - if err != nil { - panic(err) - } } // Batch implements the Batch interface and returns a boltdb diff --git a/pkg/core/storage/leveldb_store.go b/pkg/core/storage/leveldb_store.go index f38109931..0c843aba3 100644 --- a/pkg/core/storage/leveldb_store.go +++ b/pkg/core/storage/leveldb_store.go @@ -3,6 +3,7 @@ package storage import ( "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" ) @@ -82,13 +83,39 @@ func (s *LevelDBStore) PutChangeSet(puts map[string][]byte) error { // Seek implements the Store interface. func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) { + iter := s.db.NewIterator(seekRangeToPrefixes(rng), nil) + s.seek(iter, rng.Backwards, f) +} + +// SeekGC implements the Store interface. +func (s *LevelDBStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { + tx, err := s.db.OpenTransaction() + if err != nil { + return err + } + iter := tx.NewIterator(seekRangeToPrefixes(rng), nil) + s.seek(iter, rng.Backwards, func(k, v []byte) bool { + if !keep(k, v) { + err = tx.Delete(k, nil) + if err != nil { + return false + } + } + return true + }) + if err != nil { + return err + } + return tx.Commit() +} + +func (s *LevelDBStore) seek(iter iterator.Iterator, backwards bool, f func(k, v []byte) bool) { var ( next func() bool ok bool - iter = s.db.NewIterator(seekRangeToPrefixes(rng), nil) ) - if !rng.Backwards { + if !backwards { ok = iter.Next() next = iter.Next } else { diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index 100942d7e..a284a7ad5 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -293,6 +293,9 @@ func (b *BadStore) PutChangeSet(_ map[string][]byte) error { } func (b *BadStore) Seek(rng SeekRange, f func(k, v []byte) bool) { } +func (b *BadStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { + return nil +} func (b *BadStore) Close() error { return nil } diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 1a5589aa6..371e4bf65 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -102,6 +102,21 @@ func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) { s.mut.RUnlock() } +// SeekGC implements the Store interface. +func (s *MemoryStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { + s.mut.Lock() + // We still need to perform normal seek, some GC operations can be + // sensitive to the order of KV pairs. + s.seek(rng, func(k, v []byte) bool { + if !keep(k, v) { + s.drop(string(k)) + } + return true + }) + s.mut.Unlock() + return nil +} + // SeekAll is like seek but also iterates over deleted items. func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) { s.mut.RLock() diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 5759f9b17..029d21fbf 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -96,6 +96,11 @@ type ( // Key and value slices should not be modified. // Seek can guarantee that key-value items are sorted by key in ascending way. Seek(rng SeekRange, f func(k, v []byte) bool) + // SeekGC is similar to Seek, but the function should return true if current + // KV pair should be kept and false if it's to be deleted; there is no way to + // do an early exit here. SeekGC only works with the current Store, it won't + // go down to layers below and it takes a full write lock, so use it carefully. + SeekGC(rng SeekRange, keep func(k, v []byte) bool) error Close() error } diff --git a/pkg/core/storage/storeandbatch_test.go b/pkg/core/storage/storeandbatch_test.go index 355bcafdf..38bf5f282 100644 --- a/pkg/core/storage/storeandbatch_test.go +++ b/pkg/core/storage/storeandbatch_test.go @@ -438,6 +438,41 @@ func testStorePutBatchWithDelete(t *testing.T, s Store) { require.NoError(t, s.Close()) } +func testStoreSeekGC(t *testing.T, s Store) { + kvs := []KeyValue{ + {[]byte("10"), []byte("bar")}, + {[]byte("11"), []byte("bara")}, + {[]byte("20"), []byte("barb")}, + {[]byte("21"), []byte("barc")}, + {[]byte("22"), []byte("bard")}, + {[]byte("30"), []byte("bare")}, + {[]byte("31"), []byte("barf")}, + } + for _, v := range kvs { + require.NoError(t, s.Put(v.Key, v.Value)) + } + err := s.SeekGC(SeekRange{Prefix: []byte("1")}, func(k, v []byte) bool { + return true + }) + require.NoError(t, err) + for i := range kvs { + _, err = s.Get(kvs[i].Key) + require.NoError(t, err) + } + err = s.SeekGC(SeekRange{Prefix: []byte("3")}, func(k, v []byte) bool { + return false + }) + require.NoError(t, err) + for i := range kvs[:5] { + _, err = s.Get(kvs[i].Key) + require.NoError(t, err) + } + for _, kv := range kvs[5:] { + _, err = s.Get(kv.Key) + require.Error(t, err) + } +} + func TestAllDBs(t *testing.T) { var DBs = []dbSetup{ {"BoltDB", newBoltStoreForTesting}, @@ -448,7 +483,7 @@ func TestAllDBs(t *testing.T) { var tests = []dbTestFunction{testStoreClose, testStorePutAndGet, testStoreGetNonExistent, testStorePutBatch, testStoreSeek, testStoreDeleteNonExistent, testStorePutAndDelete, - testStorePutBatchWithDelete} + testStorePutBatchWithDelete, testStoreSeekGC} for _, db := range DBs { for _, test := range tests { s := db.create(t)