diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index 638a00404..69b06611b 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -22,6 +22,8 @@ func (db *DB) Open() error { if err != nil { return fmt.Errorf("can't open boltDB database: %w", err) } + db.boltDB.MaxBatchDelay = db.boltBatchDelay + db.boltDB.MaxBatchSize = db.boltBatchSize db.log.Debug("opened boltDB instance for Metabase") diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 8fbecfad6..70469a476 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "strings" + "time" v2object "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-node/pkg/util/logger" @@ -30,6 +31,9 @@ type Option func(*cfg) type cfg struct { boltOptions *bbolt.Options // optional + boltBatchSize int + boltBatchDelay time.Duration + info Info log *logger.Logger @@ -40,8 +44,9 @@ func defaultCfg() *cfg { info: Info{ Permission: os.ModePerm, // 0777 }, - - log: zap.L(), + boltBatchDelay: bbolt.DefaultMaxBatchDelay, + boltBatchSize: bbolt.DefaultMaxBatchSize, + log: zap.L(), } } @@ -144,3 +149,21 @@ func WithPermissions(perm fs.FileMode) Option { c.info.Permission = perm } } + +// WithBatchSize returns option to specify maximum concurrent operations +// to be processed in a single transactions. +// This option is missing from `bbolt.Options` but is set right after DB is open. +func WithBatchSize(s int) Option { + return func(c *cfg) { + c.boltBatchSize = s + } +} + +// WithBatchDelay returns option to specify maximum time to wait before +// the batch of concurrent transactions is processed. +// This option is missing from `bbolt.Options` but is set right after DB is open. +func WithBatchDelay(d time.Duration) Option { + return func(c *cfg) { + c.boltBatchDelay = d + } +} diff --git a/pkg/local_object_storage/metabase/db_test.go b/pkg/local_object_storage/metabase/db_test.go index 0712267b7..70fb28936 100644 --- a/pkg/local_object_storage/metabase/db_test.go +++ b/pkg/local_object_storage/metabase/db_test.go @@ -46,10 +46,11 @@ func testOID() *oidSDK.ID { return id } -func newDB(t testing.TB) *meta.DB { +func newDB(t testing.TB, opts ...meta.Option) *meta.DB { path := t.Name() - bdb := meta.New(meta.WithPath(path), meta.WithPermissions(0600)) + bdb := meta.New(append([]meta.Option{meta.WithPath(path), meta.WithPermissions(0600)}, + opts...)...) require.NoError(t, bdb.Open()) @@ -61,11 +62,11 @@ func newDB(t testing.TB) *meta.DB { return bdb } -func generateObject(t *testing.T) *object.Object { +func generateObject(t testing.TB) *object.Object { return generateObjectWithCID(t, cidtest.ID()) } -func generateObjectWithCID(t *testing.T, cid *cid.ID) *object.Object { +func generateObjectWithCID(t testing.TB, cid *cid.ID) *object.Object { version := version.New() version.SetMajor(2) version.SetMinor(1) diff --git a/pkg/local_object_storage/metabase/put_test.go b/pkg/local_object_storage/metabase/put_test.go index 47da660cb..24c0b93c3 100644 --- a/pkg/local_object_storage/metabase/put_test.go +++ b/pkg/local_object_storage/metabase/put_test.go @@ -1,14 +1,81 @@ package meta_test import ( + "runtime" + "strconv" "testing" + "time" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/util/rand" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) +func prepareObjects(t testing.TB, n int) []*objectSDK.Object { + cid := cidtest.ID() + parentID := objecttest.ID() + objs := make([]*objectSDK.Object, n) + for i := range objs { + objs[i] = generateObjectWithCID(t, cid) + + // FKBT indices. + attrs := make([]objectSDK.Attribute, 20) + for j := range attrs { + attrs[j].SetKey("abc" + strconv.FormatUint(rand.Uint64()%4, 16)) + attrs[j].SetValue("xyz" + strconv.FormatUint(rand.Uint64()%4, 16)) + } + objs[i].SetAttributes(attrs...) + + // List indices. + if i%2 == 0 { + objs[i].SetParentID(parentID) + } + } + return objs +} + +func BenchmarkPut(b *testing.B) { + b.Run("parallel", func(b *testing.B) { + db := newDB(b, + meta.WithBatchDelay(time.Millisecond*10), + meta.WithBatchSize(runtime.NumCPU())) + // Ensure the benchmark is bound by CPU and not waiting batch-delay time. + b.SetParallelism(1) + + index := atomic.NewInt64(-1) + objs := prepareObjects(b, b.N) + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := meta.Put(db, objs[index.Inc()], nil); err != nil { + b.Fatal(err) + } + } + }) + }) + b.Run("sequential", func(b *testing.B) { + db := newDB(b, + meta.WithBatchDelay(time.Millisecond*10), + meta.WithBatchSize(1)) + index := atomic.NewInt64(-1) + objs := prepareObjects(b, b.N) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + if err := meta.Put(db, objs[index.Inc()], nil); err != nil { + b.Fatal(err) + } + } + }) +} + func TestDB_PutBlobovnicaUpdate(t *testing.T) { db := newDB(t)