neoneo-go/pkg/core/storage/badgerdb_store.go
Roman Khimov ae071d4542 storage: introduce PutChangeSet and use it for Persist
We're using batches in wrong way during persist, we already have all changes
accumulated in two maps and then we move them to batch and then this is
applied. For some DBs like BoltDB this batch is just another MemoryStore, so
we essentially just shuffle the changeset from one map to another, for others
like LevelDB batch is just a serialized set of KV pairs, it doesn't help much
on subsequent PutBatch, we just duplicate the changeset again.

So introduce PutChangeSet that allows to take two maps with sets and deletes
directly. It also allows to simplify MemCachedStore logic.

neo-bench for single node with 10 workers, LevelDB:

  Reference:

  RPS    30189.132 30556.448 30390.482 ≈ 30379    ±  0.61%
  TPS    29427.344 29418.687 29434.273 ≈ 29427    ±  0.03%
  CPU %     33.304    27.179    33.860 ≈    31.45 ± 11.79%
  Mem MB   800.677   798.389   715.042 ≈   771    ±  6.33%

  Patched:

  RPS    30264.326 30386.364 30166.231 ≈ 30272    ± 0.36% ⇅
  TPS    29444.673 29407.440 29452.478 ≈ 29435    ± 0.08% ⇅
  CPU %     34.012    32.597    33.467 ≈   33.36  ± 2.14% ⇅
  Mem MB   549.126   523.656   517.684 ≈  530     ± 3.15% ↓ 31.26%

BoltDB:

  Reference:

  RPS    31937.647 31551.684 31850.408 ≈ 31780    ±  0.64%
  TPS    31292.049 30368.368 31307.724 ≈ 30989    ±  1.74%
  CPU %     33.792    22.339    35.887 ≈    30.67 ± 23.78%
  Mem MB  1271.687  1254.472  1215.639 ≈  1247    ±  2.30%

  Patched:

  RPS    31746.818 30859.485 31689.761 ≈ 31432    ± 1.58% ⇅
  TPS    31271.499 30340.726 30342.568 ≈ 30652    ± 1.75% ⇅
  CPU %     34.611    34.414    31.553 ≈    33.53 ± 5.11% ⇅
  Mem MB  1262.960  1231.389  1335.569 ≈  1277    ± 4.18% ⇅
2021-08-12 17:42:16 +03:00

155 lines
3.5 KiB
Go

package storage
import (
"os"
"github.com/dgraph-io/badger/v2"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
)
// BadgerDBOptions configuration for BadgerDB.
type BadgerDBOptions struct {
Dir string `yaml:"BadgerDir"`
}
// BadgerDBStore is the official storage implementation for storing and retrieving
// blockchain data.
type BadgerDBStore struct {
db *badger.DB
}
// BadgerDBBatch is a wrapper around badger.WriteBatch, compatible with Batch interface.
type BadgerDBBatch struct {
batch *badger.WriteBatch
}
// Delete implements the Batch interface.
func (b *BadgerDBBatch) Delete(key []byte) {
err := b.batch.Delete(key)
if err != nil {
panic(err)
}
}
// Put implements the Batch interface.
func (b *BadgerDBBatch) Put(key, value []byte) {
keycopy := slice.Copy(key)
valuecopy := slice.Copy(value)
err := b.batch.Set(keycopy, valuecopy)
if err != nil {
panic(err)
}
}
// NewBadgerDBStore returns a new BadgerDBStore object that will
// initialize the database found at the given path.
func NewBadgerDBStore(cfg BadgerDBOptions) (*BadgerDBStore, error) {
// BadgerDB isn't able to make nested directories
err := os.MkdirAll(cfg.Dir, os.ModePerm)
if err != nil {
panic(err)
}
opts := badger.DefaultOptions(cfg.Dir) // should be exposed via BadgerDBOptions if anything needed
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
return &BadgerDBStore{
db: db,
}, nil
}
// Batch implements the Batch interface and returns a badgerdb
// compatible Batch.
func (b *BadgerDBStore) Batch() Batch {
return &BadgerDBBatch{b.db.NewWriteBatch()}
}
// Delete implements the Store interface.
func (b *BadgerDBStore) Delete(key []byte) error {
return b.db.Update(func(txn *badger.Txn) error {
return txn.Delete(key)
})
}
// Get implements the Store interface.
func (b *BadgerDBStore) Get(key []byte) ([]byte, error) {
var val []byte
err := b.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
return ErrKeyNotFound
}
val, err = item.ValueCopy(nil)
return err
})
return val, err
}
// Put implements the Store interface.
func (b *BadgerDBStore) Put(key, value []byte) error {
return b.db.Update(func(txn *badger.Txn) error {
err := txn.Set(key, value)
return err
})
}
// PutBatch implements the Store interface.
func (b *BadgerDBStore) PutBatch(batch Batch) error {
defer batch.(*BadgerDBBatch).batch.Cancel()
return batch.(*BadgerDBBatch).batch.Flush()
}
// PutChangeSet implements the Store interface.
func (b *BadgerDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
return b.db.Update(func(txn *badger.Txn) error {
for k, v := range puts {
err := txn.Set([]byte(k), v)
if err != nil {
return err
}
}
for k := range dels {
err := txn.Delete([]byte(k))
if err != nil {
return err
}
}
return nil
})
}
// Seek implements the Store interface.
func (b *BadgerDBStore) Seek(key []byte, f func(k, v []byte)) {
err := b.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.IteratorOptions{
PrefetchValues: true,
PrefetchSize: 100,
Reverse: false,
AllVersions: false,
Prefix: key,
InternalAccess: false,
})
defer it.Close()
for it.Seek(key); it.ValidForPrefix(key); it.Next() {
item := it.Item()
k := item.Key()
v, err := item.ValueCopy(nil)
if err != nil {
return err
}
f(k, v)
}
return nil
})
if err != nil {
panic(err)
}
}
// Close releases all db resources.
func (b *BadgerDBStore) Close() error {
return b.db.Close()
}