[#1431] pilorama: Use Batch for write transactions

Helps a lot in case of concurrent request flow.

```
name                      old time/op    new time/op    delta
ApplySequential/bbolt-8     78.0µs ± 9%    59.8µs ± 4%  -23.39%  (p=0.000 n=10+9)
ApplyReorderLast/bbolt-8     143µs ± 5%     113µs ±15%  -21.06%  (p=0.000 n=10+10)

name                      old alloc/op   new alloc/op   delta
ApplySequential/bbolt-8     56.9kB ± 8%    28.9kB ± 3%  -49.22%  (p=0.000 n=10+10)
ApplyReorderLast/bbolt-8    87.3kB ± 3%    40.9kB ±10%  -53.16%  (p=0.000 n=10+10)

name                      old allocs/op  new allocs/op  delta
ApplySequential/bbolt-8        224 ±11%       262 ± 5%  +16.93%  (p=0.000 n=9+10)
ApplyReorderLast/bbolt-8       518 ± 4%       674 ±11%  +30.09%  (p=0.000 n=10+10)
```

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-05-25 11:33:23 +03:00 committed by fyrchik
parent f0a67f948d
commit 3312924b82
2 changed files with 24 additions and 7 deletions

View file

@ -17,6 +17,8 @@ type boltForest struct {
db *bbolt.DB db *bbolt.DB
} }
const defaultMaxBatchSize = 10
var ( var (
dataBucket = []byte{0} dataBucket = []byte{0}
logBucket = []byte{1} logBucket = []byte{1}
@ -54,6 +56,7 @@ func (t *boltForest) Open() error {
return err return err
} }
db.MaxBatchSize = defaultMaxBatchSize
t.db = db t.db = db
return db.Update(func(tx *bbolt.Tx) error { return db.Update(func(tx *bbolt.Tx) error {
@ -73,7 +76,7 @@ func (t *boltForest) Close() error { return t.db.Close() }
// TreeMove implements the Forest interface. // TreeMove implements the Forest interface.
func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error) { func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error) {
var lm *LogMove var lm *LogMove
return lm, t.db.Update(func(tx *bbolt.Tx) error { return lm, t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
if err != nil { if err != nil {
return err return err
@ -97,7 +100,7 @@ func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, pa
var lm []LogMove var lm []LogMove
var key [17]byte var key [17]byte
err := t.db.Update(func(tx *bbolt.Tx) error { err := t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
if err != nil { if err != nil {
return err return err
@ -171,7 +174,7 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
// TreeApply implements the Forest interface. // TreeApply implements the Forest interface.
func (t *boltForest) TreeApply(cid cidSDK.ID, treeID string, m *Move) error { func (t *boltForest) TreeApply(cid cidSDK.ID, treeID string, m *Move) error {
return t.db.Update(func(tx *bbolt.Tx) error { return t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
if err != nil { if err != nil {
return err return err

View file

@ -474,6 +474,9 @@ const benchNodeCount = 1000
func BenchmarkApplySequential(b *testing.B) { func BenchmarkApplySequential(b *testing.B) {
for i := range providers { for i := range providers {
if providers[i].name == "inmemory" { // memory backend is not thread-safe
continue
}
b.Run(providers[i].name, func(b *testing.B) { b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move { benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
ops := make([]Move, opCount) ops := make([]Move, opCount)
@ -499,6 +502,9 @@ func BenchmarkApplyReorderLast(b *testing.B) {
const blockSize = 10 const blockSize = 10
for i := range providers { for i := range providers {
if providers[i].name == "inmemory" { // memory backend is not thread-safe
continue
}
b.Run(providers[i].name, func(b *testing.B) { b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move { benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
ops := make([]Move, opCount) ops := make([]Move, opCount)
@ -529,14 +535,22 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
ops := genFunc(b.N) ops := genFunc(b.N)
cid := cidtest.ID() cid := cidtest.ID()
treeID := "version" treeID := "version"
ch := make(chan *Move, b.N)
for i := range ops {
ch <- &ops[i]
}
b.ResetTimer() b.ResetTimer()
b.ReportAllocs() b.ReportAllocs()
for i := range ops { b.SetParallelism(50)
if err := s.TreeApply(cid, treeID, &ops[i]); err != nil { b.RunParallel(func(pb *testing.PB) {
b.Fatalf("error in `Apply`: %v", err) for pb.Next() {
op := <-ch
if err := s.TreeApply(cid, treeID, op); err != nil {
b.Fatalf("error in `Apply`: %v", err)
}
} }
} })
} }
func TestTreeGetByPath(t *testing.T) { func TestTreeGetByPath(t *testing.T) {