diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 69863d652..2d7d4345f 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -44,7 +44,7 @@ import ( // Tuning parameters. const ( headerBatchCount = 2000 - version = "0.2.2" + version = "0.2.3" defaultInitialGAS = 52000000_00000000 defaultGCPeriod = 10000 diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index b01828ed8..d729e0b80 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -118,7 +118,7 @@ func (s *Module) CurrentValidatedHeight() uint32 { // Init initializes state root module at the given height. func (s *Module) Init(height uint32) error { - data, err := s.Store.Get([]byte{byte(storage.DataMPT), prefixValidated}) + data, err := s.Store.Get([]byte{byte(storage.DataMPTAux), prefixValidated}) if err == nil { s.validatedHeight.Store(binary.LittleEndian.Uint32(data)) } @@ -156,16 +156,6 @@ func (s *Module) CleanStorage() error { if err != nil { return fmt.Errorf("failed to remove outdated MPT-reated items: %w", err) } - currentLocal := s.currentLocal.Load().(util.Uint256) - if !currentLocal.Equals(util.Uint256{}) { - err := s.addLocalStateRoot(s.Store, &state.MPTRoot{ - Index: s.localHeight.Load(), - Root: currentLocal, - }) - if err != nil { - return fmt.Errorf("failed to store current local stateroot: %w", err) - } - } return nil } @@ -177,7 +167,7 @@ func (s *Module) JumpToState(sr *state.MPTRoot) error { data := make([]byte, 4) binary.LittleEndian.PutUint32(data, sr.Index) - if err := s.Store.Put([]byte{byte(storage.DataMPT), prefixValidated}, data); err != nil { + if err := s.Store.Put([]byte{byte(storage.DataMPTAux), prefixValidated}, data); err != nil { return fmt.Errorf("failed to store validated height: %w", err) } s.validatedHeight.Store(sr.Index) @@ -197,30 +187,27 @@ func (s *Module) GC(index uint32, store storage.Store) time.Duration { var stored int64 s.log.Info("starting MPT garbage collection", zap.Uint32("index", index)) start := time.Now() - b := store.Batch() - store.Seek(storage.SeekRange{ + err := store.SeekGC(storage.SeekRange{ Prefix: []byte{byte(storage.DataMPT)}, }, func(k, v []byte) bool { stored++ if !mpt.IsActiveValue(v) { h := binary.LittleEndian.Uint32(v[len(v)-4:]) - if h > index { - return true + if h <= index { + removed++ + stored-- + return false } - b.Delete(k) - removed++ - stored-- } return true }) - err := store.PutBatch(b) dur := time.Since(start) if err != nil { s.log.Error("failed to flush MPT GC changeset", zap.Duration("time", dur), zap.Error(err)) } else { s.log.Info("finished MPT garbage collection", zap.Int("removed", removed), - zap.Int64("stored", stored), + zap.Int64("kept", stored), zap.Duration("time", dur)) } return dur diff --git a/pkg/core/stateroot/store.go b/pkg/core/stateroot/store.go index f059d1c31..6c1494cc2 100644 --- a/pkg/core/stateroot/store.go +++ b/pkg/core/stateroot/store.go @@ -29,7 +29,7 @@ func (s *Module) addLocalStateRoot(store *storage.MemCachedStore, sr *state.MPTR data := make([]byte, 4) binary.LittleEndian.PutUint32(data, sr.Index) - return store.Put([]byte{byte(storage.DataMPT), prefixLocal}, data) + return store.Put([]byte{byte(storage.DataMPTAux), prefixLocal}, data) } func putStateRoot(store *storage.MemCachedStore, key []byte, sr *state.MPTRoot) error { @@ -52,7 +52,7 @@ func (s *Module) getStateRoot(key []byte) (*state.MPTRoot, error) { func makeStateRootKey(index uint32) []byte { key := make([]byte, 5) - key[0] = byte(storage.DataMPT) + key[0] = byte(storage.DataMPTAux) binary.BigEndian.PutUint32(key, index) return key } @@ -79,7 +79,7 @@ func (s *Module) AddStateRoot(sr *state.MPTRoot) error { data := make([]byte, 4) binary.LittleEndian.PutUint32(data, sr.Index) - if err := s.Store.Put([]byte{byte(storage.DataMPT), prefixValidated}, data); err != nil { + if err := s.Store.Put([]byte{byte(storage.DataMPTAux), prefixValidated}, data); err != nil { return err } s.validatedHeight.Store(sr.Index) diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index 8f60aa5ee..ac6cd3a16 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/util/slice" - "github.com/syndtr/goleveldb/leveldb/util" "go.etcd.io/bbolt" ) @@ -92,7 +91,7 @@ func (s *BoltDBStore) PutBatch(batch Batch) error { func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error { var err error - return s.db.Batch(func(tx *bbolt.Tx) error { + return s.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(Bucket) for k, v := range puts { if v != nil { @@ -108,55 +107,63 @@ func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error { }) } +// SeekGC implements the Store interface. +func (s *BoltDBStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { + return boltSeek(s.db.Update, rng, func(c *bbolt.Cursor, k, v []byte) (bool, error) { + if !keep(k, v) { + if err := c.Delete(); err != nil { + return false, err + } + } + return true, nil + }) +} + // Seek implements the Store interface. func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) { - start := make([]byte, len(rng.Prefix)+len(rng.Start)) - copy(start, rng.Prefix) - copy(start[len(rng.Prefix):], rng.Start) - if rng.Backwards { - s.seekBackwards(rng.Prefix, start, f) - } else { - s.seek(rng.Prefix, start, f) - } -} - -func (s *BoltDBStore) seek(key []byte, start []byte, f func(k, v []byte) bool) { - prefix := util.BytesPrefix(key) - prefix.Start = start - err := s.db.View(func(tx *bbolt.Tx) error { - c := tx.Bucket(Bucket).Cursor() - for k, v := c.Seek(prefix.Start); k != nil && (len(prefix.Limit) == 0 || bytes.Compare(k, prefix.Limit) <= 0); k, v = c.Next() { - if !f(k, v) { - break - } - } - return nil + err := boltSeek(s.db.View, rng, func(_ *bbolt.Cursor, k, v []byte) (bool, error) { + return f(k, v), nil }) if err != nil { panic(err) } } -func (s *BoltDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte) bool) { - err := s.db.View(func(tx *bbolt.Tx) error { +func boltSeek(txopener func(func(*bbolt.Tx) error) error, rng SeekRange, f func(c *bbolt.Cursor, k, v []byte) (bool, error)) error { + rang := seekRangeToPrefixes(rng) + return txopener(func(tx *bbolt.Tx) error { + var ( + k, v []byte + next func() ([]byte, []byte) + ) + c := tx.Bucket(Bucket).Cursor() - // Move cursor to the first kv pair which is followed by the pair matching the specified prefix. - if len(start) == 0 { - lastKey, _ := c.Last() - start = lastKey + + if !rng.Backwards { + k, v = c.Seek(rang.Start) + next = c.Next + } else { + if len(rang.Limit) == 0 { + lastKey, _ := c.Last() + k, v = c.Seek(lastKey) + } else { + c.Seek(rang.Limit) + k, v = c.Prev() + } + next = c.Prev } - rng := util.BytesPrefix(start) // in fact, we only need limit based on start slice to iterate backwards starting from this limit - c.Seek(rng.Limit) - for k, v := c.Prev(); k != nil && bytes.HasPrefix(k, key); k, v = c.Prev() { - if !f(k, v) { + + for ; k != nil && bytes.HasPrefix(k, rng.Prefix) && (len(rang.Limit) == 0 || bytes.Compare(k, rang.Limit) <= 0); k, v = next() { + cont, err := f(c, k, v) + if err != nil { + return err + } + if !cont { break } } return nil }) - if err != nil { - panic(err) - } } // Batch implements the Batch interface and returns a boltdb diff --git a/pkg/core/storage/leveldb_store.go b/pkg/core/storage/leveldb_store.go index 6d5439711..0c843aba3 100644 --- a/pkg/core/storage/leveldb_store.go +++ b/pkg/core/storage/leveldb_store.go @@ -3,8 +3,8 @@ package storage import ( "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/util" ) // LevelDBOptions configuration for LevelDB. @@ -83,34 +83,47 @@ func (s *LevelDBStore) PutChangeSet(puts map[string][]byte) error { // Seek implements the Store interface. func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) { - start := make([]byte, len(rng.Prefix)+len(rng.Start)) - copy(start, rng.Prefix) - copy(start[len(rng.Prefix):], rng.Start) - if rng.Backwards { - s.seekBackwards(rng.Prefix, start, f) - } else { - s.seek(rng.Prefix, start, f) - } + iter := s.db.NewIterator(seekRangeToPrefixes(rng), nil) + s.seek(iter, rng.Backwards, f) } -func (s *LevelDBStore) seek(key []byte, start []byte, f func(k, v []byte) bool) { - prefix := util.BytesPrefix(key) - prefix.Start = start - iter := s.db.NewIterator(prefix, nil) - for iter.Next() { - if !f(iter.Key(), iter.Value()) { - break +// SeekGC implements the Store interface. +func (s *LevelDBStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { + tx, err := s.db.OpenTransaction() + if err != nil { + return err + } + iter := tx.NewIterator(seekRangeToPrefixes(rng), nil) + s.seek(iter, rng.Backwards, func(k, v []byte) bool { + if !keep(k, v) { + err = tx.Delete(k, nil) + if err != nil { + return false + } } + return true + }) + if err != nil { + return err } - iter.Release() + return tx.Commit() } -func (s *LevelDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte) bool) { - iRange := util.BytesPrefix(start) - iRange.Start = key +func (s *LevelDBStore) seek(iter iterator.Iterator, backwards bool, f func(k, v []byte) bool) { + var ( + next func() bool + ok bool + ) - iter := s.db.NewIterator(iRange, nil) - for ok := iter.Last(); ok; ok = iter.Prev() { + if !backwards { + ok = iter.Next() + next = iter.Next + } else { + ok = iter.Last() + next = iter.Prev + } + + for ; ok; ok = next() { if !f(iter.Key(), iter.Value()) { break } diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index 100942d7e..a284a7ad5 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -293,6 +293,9 @@ func (b *BadStore) PutChangeSet(_ map[string][]byte) error { } func (b *BadStore) Seek(rng SeekRange, f func(k, v []byte) bool) { } +func (b *BadStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { + return nil +} func (b *BadStore) Close() error { return nil } diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 1a5589aa6..371e4bf65 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -102,6 +102,21 @@ func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) { s.mut.RUnlock() } +// SeekGC implements the Store interface. +func (s *MemoryStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { + s.mut.Lock() + // We still need to perform normal seek, some GC operations can be + // sensitive to the order of KV pairs. + s.seek(rng, func(k, v []byte) bool { + if !keep(k, v) { + s.drop(string(k)) + } + return true + }) + s.mut.Unlock() + return nil +} + // SeekAll is like seek but also iterates over deleted items. func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) { s.mut.RLock() diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index ff1997234..de5e14ec4 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -4,15 +4,21 @@ import ( "encoding/binary" "errors" "fmt" + + "github.com/syndtr/goleveldb/leveldb/util" ) // KeyPrefix constants. const ( DataExecutable KeyPrefix = 0x01 - DataMPT KeyPrefix = 0x03 - STAccount KeyPrefix = 0x40 - STContractID KeyPrefix = 0x51 - STStorage KeyPrefix = 0x70 + // DataMPT is used for MPT node entries identified by Uint256. + DataMPT KeyPrefix = 0x03 + // DataMPTAux is used to store additional MPT data like height-root + // mappings and local/validated heights. + DataMPTAux KeyPrefix = 0x04 + STAccount KeyPrefix = 0x40 + STContractID KeyPrefix = 0x51 + STStorage KeyPrefix = 0x70 // STTempStorage is used to store contract storage items during state sync process // in order not to mess up the previous state which has its own items stored by // STStorage prefix. Once state exchange process is completed, all items with @@ -94,6 +100,11 @@ type ( // Key and value slices should not be modified. // Seek can guarantee that key-value items are sorted by key in ascending way. Seek(rng SeekRange, f func(k, v []byte) bool) + // SeekGC is similar to Seek, but the function should return true if current + // KV pair should be kept and false if it's to be deleted; there is no way to + // do an early exit here. SeekGC only works with the current Store, it won't + // go down to layers below and it takes a full write lock, so use it carefully. + SeekGC(rng SeekRange, keep func(k, v []byte) bool) error Close() error } @@ -133,6 +144,24 @@ func AppendPrefixInt(k KeyPrefix, n int) []byte { return AppendPrefix(k, b) } +func seekRangeToPrefixes(sr SeekRange) *util.Range { + var ( + rang *util.Range + start = make([]byte, len(sr.Prefix)+len(sr.Start)) + ) + copy(start, sr.Prefix) + copy(start[len(sr.Prefix):], sr.Start) + + if !sr.Backwards { + rang = util.BytesPrefix(sr.Prefix) + rang.Start = start + } else { + rang = util.BytesPrefix(start) + rang.Start = sr.Prefix + } + return rang +} + // NewStore creates storage with preselected in configuration database type. func NewStore(cfg DBConfiguration) (Store, error) { var store Store diff --git a/pkg/core/storage/storeandbatch_test.go b/pkg/core/storage/storeandbatch_test.go index 355bcafdf..6b8d7c102 100644 --- a/pkg/core/storage/storeandbatch_test.go +++ b/pkg/core/storage/storeandbatch_test.go @@ -19,10 +19,6 @@ type dbSetup struct { type dbTestFunction func(*testing.T, Store) -func testStoreClose(t *testing.T, s Store) { - require.NoError(t, s.Close()) -} - func testStorePutAndGet(t *testing.T, s Store) { key := []byte("foo") value := []byte("bar") @@ -32,8 +28,6 @@ func testStorePutAndGet(t *testing.T, s Store) { result, err := s.Get(key) assert.Nil(t, err) require.Equal(t, value, result) - - require.NoError(t, s.Close()) } func testStoreGetNonExistent(t *testing.T, s Store) { @@ -41,7 +35,6 @@ func testStoreGetNonExistent(t *testing.T, s Store) { _, err := s.Get(key) assert.Equal(t, err, ErrKeyNotFound) - require.NoError(t, s.Close()) } func testStorePutBatch(t *testing.T, s Store) { @@ -63,7 +56,6 @@ func testStorePutBatch(t *testing.T, s Store) { assert.Nil(t, err) require.Equal(t, value, newVal) assert.Equal(t, value, newVal) - require.NoError(t, s.Close()) } func testStoreSeek(t *testing.T, s Store) { @@ -338,15 +330,12 @@ func testStoreSeek(t *testing.T, s Store) { }) }) }) - - require.NoError(t, s.Close()) } func testStoreDeleteNonExistent(t *testing.T, s Store) { key := []byte("sparse") assert.NoError(t, s.Delete(key)) - require.NoError(t, s.Close()) } func testStorePutAndDelete(t *testing.T, s Store) { @@ -365,8 +354,6 @@ func testStorePutAndDelete(t *testing.T, s Store) { // Double delete. err = s.Delete(key) assert.Nil(t, err) - - require.NoError(t, s.Close()) } func testStorePutBatchWithDelete(t *testing.T, s Store) { @@ -435,7 +422,41 @@ func testStorePutBatchWithDelete(t *testing.T, s Store) { assert.Equal(t, ErrKeyNotFound, err, "%s:%s", k, v) } } - require.NoError(t, s.Close()) +} + +func testStoreSeekGC(t *testing.T, s Store) { + kvs := []KeyValue{ + {[]byte("10"), []byte("bar")}, + {[]byte("11"), []byte("bara")}, + {[]byte("20"), []byte("barb")}, + {[]byte("21"), []byte("barc")}, + {[]byte("22"), []byte("bard")}, + {[]byte("30"), []byte("bare")}, + {[]byte("31"), []byte("barf")}, + } + for _, v := range kvs { + require.NoError(t, s.Put(v.Key, v.Value)) + } + err := s.SeekGC(SeekRange{Prefix: []byte("1")}, func(k, v []byte) bool { + return true + }) + require.NoError(t, err) + for i := range kvs { + _, err = s.Get(kvs[i].Key) + require.NoError(t, err) + } + err = s.SeekGC(SeekRange{Prefix: []byte("3")}, func(k, v []byte) bool { + return false + }) + require.NoError(t, err) + for i := range kvs[:5] { + _, err = s.Get(kvs[i].Key) + require.NoError(t, err) + } + for _, kv := range kvs[5:] { + _, err = s.Get(kv.Key) + require.Error(t, err) + } } func TestAllDBs(t *testing.T) { @@ -445,10 +466,10 @@ func TestAllDBs(t *testing.T) { {"MemCached", newMemCachedStoreForTesting}, {"Memory", newMemoryStoreForTesting}, } - var tests = []dbTestFunction{testStoreClose, testStorePutAndGet, + var tests = []dbTestFunction{testStorePutAndGet, testStoreGetNonExistent, testStorePutBatch, testStoreSeek, testStoreDeleteNonExistent, testStorePutAndDelete, - testStorePutBatchWithDelete} + testStorePutBatchWithDelete, testStoreSeekGC} for _, db := range DBs { for _, test := range tests { s := db.create(t) @@ -457,6 +478,7 @@ func TestAllDBs(t *testing.T) { } fname := runtime.FuncForPC(reflect.ValueOf(test).Pointer()).Name() t.Run(db.name+"/"+fname, twrapper) + require.NoError(t, s.Close()) } } }