[#1431] pilorama: Use Batch for write transactions

Helps a lot in case of concurrent request flow.

```
name                      old time/op    new time/op    delta
ApplySequential/bbolt-8     78.0µs ± 9%    59.8µs ± 4%  -23.39%  (p=0.000 n=10+9)
ApplyReorderLast/bbolt-8     143µs ± 5%     113µs ±15%  -21.06%  (p=0.000 n=10+10)

name                      old alloc/op   new alloc/op   delta
ApplySequential/bbolt-8     56.9kB ± 8%    28.9kB ± 3%  -49.22%  (p=0.000 n=10+10)
ApplyReorderLast/bbolt-8    87.3kB ± 3%    40.9kB ±10%  -53.16%  (p=0.000 n=10+10)

name                      old allocs/op  new allocs/op  delta
ApplySequential/bbolt-8        224 ±11%       262 ± 5%  +16.93%  (p=0.000 n=9+10)
ApplyReorderLast/bbolt-8       518 ± 4%       674 ±11%  +30.09%  (p=0.000 n=10+10)
```

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-05-25 11:33:23 +03:00
parent d6d7e35454
commit fa57a8be44
2 changed files with 24 additions and 7 deletions

View file

@ -17,6 +17,8 @@ type boltForest struct {
db *bbolt.DB
}
const defaultMaxBatchSize = 10
var (
dataBucket = []byte{0}
logBucket = []byte{1}
@ -54,6 +56,7 @@ func (t *boltForest) Open() error {
return err
}
db.MaxBatchSize = defaultMaxBatchSize
t.db = db
return db.Update(func(tx *bbolt.Tx) error {
@ -73,7 +76,7 @@ func (t *boltForest) Close() error { return t.db.Close() }
// TreeMove implements the Forest interface.
func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error) {
var lm *LogMove
return lm, t.db.Update(func(tx *bbolt.Tx) error {
return lm, t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
if err != nil {
return err
@ -97,7 +100,7 @@ func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, pa
var lm []LogMove
var key [17]byte
err := t.db.Update(func(tx *bbolt.Tx) error {
err := t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
if err != nil {
return err
@ -171,7 +174,7 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
// TreeApply implements the Forest interface.
func (t *boltForest) TreeApply(cid cidSDK.ID, treeID string, m *Move) error {
return t.db.Update(func(tx *bbolt.Tx) error {
return t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
if err != nil {
return err

View file

@ -474,6 +474,9 @@ const benchNodeCount = 1000
func BenchmarkApplySequential(b *testing.B) {
for i := range providers {
if providers[i].name == "inmemory" { // memory backend is not thread-safe
continue
}
b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
ops := make([]Move, opCount)
@ -499,6 +502,9 @@ func BenchmarkApplyReorderLast(b *testing.B) {
const blockSize = 10
for i := range providers {
if providers[i].name == "inmemory" { // memory backend is not thread-safe
continue
}
b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
ops := make([]Move, opCount)
@ -529,14 +535,22 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
ops := genFunc(b.N)
cid := cidtest.ID()
treeID := "version"
ch := make(chan *Move, b.N)
for i := range ops {
ch <- &ops[i]
}
b.ResetTimer()
b.ReportAllocs()
for i := range ops {
if err := s.TreeApply(cid, treeID, &ops[i]); err != nil {
b.SetParallelism(50)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
op := <-ch
if err := s.TreeApply(cid, treeID, op); err != nil {
b.Fatalf("error in `Apply`: %v", err)
}
}
})
}
func TestTreeGetByPath(t *testing.T) {