diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 3aba953e..687dcaa7 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -17,6 +17,8 @@ type boltForest struct { db *bbolt.DB } +const defaultMaxBatchSize = 10 + var ( dataBucket = []byte{0} logBucket = []byte{1} @@ -54,6 +56,7 @@ func (t *boltForest) Open() error { return err } + db.MaxBatchSize = defaultMaxBatchSize t.db = db return db.Update(func(tx *bbolt.Tx) error { @@ -73,7 +76,7 @@ func (t *boltForest) Close() error { return t.db.Close() } // TreeMove implements the Forest interface. func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error) { var lm *LogMove - return lm, t.db.Update(func(tx *bbolt.Tx) error { + return lm, t.db.Batch(func(tx *bbolt.Tx) error { bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) if err != nil { return err @@ -97,7 +100,7 @@ func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, pa var lm []LogMove var key [17]byte - err := t.db.Update(func(tx *bbolt.Tx) error { + err := t.db.Batch(func(tx *bbolt.Tx) error { bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) if err != nil { return err @@ -171,7 +174,7 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 { // TreeApply implements the Forest interface. func (t *boltForest) TreeApply(cid cidSDK.ID, treeID string, m *Move) error { - return t.db.Update(func(tx *bbolt.Tx) error { + return t.db.Batch(func(tx *bbolt.Tx) error { bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) if err != nil { return err diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index 322d5aab..fe1ce52d 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -474,6 +474,9 @@ const benchNodeCount = 1000 func BenchmarkApplySequential(b *testing.B) { for i := range providers { + if providers[i].name == "inmemory" { // memory backend is not thread-safe + continue + } b.Run(providers[i].name, func(b *testing.B) { benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move { ops := make([]Move, opCount) @@ -499,6 +502,9 @@ func BenchmarkApplyReorderLast(b *testing.B) { const blockSize = 10 for i := range providers { + if providers[i].name == "inmemory" { // memory backend is not thread-safe + continue + } b.Run(providers[i].name, func(b *testing.B) { benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move { ops := make([]Move, opCount) @@ -529,14 +535,22 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) { ops := genFunc(b.N) cid := cidtest.ID() treeID := "version" + ch := make(chan *Move, b.N) + for i := range ops { + ch <- &ops[i] + } b.ResetTimer() b.ReportAllocs() - for i := range ops { - if err := s.TreeApply(cid, treeID, &ops[i]); err != nil { - b.Fatalf("error in `Apply`: %v", err) + b.SetParallelism(50) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + op := <-ch + if err := s.TreeApply(cid, treeID, op); err != nil { + b.Fatalf("error in `Apply`: %v", err) + } } - } + }) } func TestTreeGetByPath(t *testing.T) {