frostfs-node/pkg/local_object_storage/metabase/put_test.go

131 lines
3.4 KiB
Go
Raw Permalink Normal View History

package meta_test
import (
"context"
"runtime"
"strconv"
"sync/atomic"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func prepareObjects(n int) []*objectSDK.Object {
cnr := cidtest.ID()
parentID := objecttest.ID()
objs := make([]*objectSDK.Object, n)
for i := range objs {
objs[i] = testutil.GenerateObjectWithCID(cnr)
// FKBT indices.
attrs := make([]objectSDK.Attribute, 20)
for j := range attrs {
attrs[j].SetKey("abc" + strconv.FormatUint(rand.Uint64()%4, 16))
attrs[j].SetValue("xyz" + strconv.FormatUint(rand.Uint64()%4, 16))
}
objs[i].SetAttributes(attrs...)
// List indices.
if i%2 == 0 {
objs[i].SetParentID(parentID)
}
}
return objs
}
func BenchmarkPut(b *testing.B) {
b.Run("parallel", func(b *testing.B) {
db := newDB(b,
meta.WithMaxBatchDelay(time.Millisecond*10),
meta.WithMaxBatchSize(runtime.NumCPU()))
defer func() { require.NoError(b, db.Close()) }()
// Ensure the benchmark is bound by CPU and not waiting batch-delay time.
b.SetParallelism(1)
var index atomic.Int64
index.Store(-1)
objs := prepareObjects(b.N)
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if err := metaPut(db, objs[index.Add(1)], nil); err != nil {
b.Fatal(err)
}
}
})
})
b.Run("sequential", func(b *testing.B) {
db := newDB(b,
meta.WithMaxBatchDelay(time.Millisecond*10),
meta.WithMaxBatchSize(1))
defer func() { require.NoError(b, db.Close()) }()
var index atomic.Int64
index.Store(-1)
objs := prepareObjects(b.N)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if err := metaPut(db, objs[index.Add(1)], nil); err != nil {
b.Fatal(err)
}
}
})
}
func TestDB_PutBlobovniczaUpdate(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
raw1 := testutil.GenerateObject()
storageID := []byte{1, 2, 3, 4}
// put one object with storageID
err := metaPut(db, raw1, storageID)
require.NoError(t, err)
fetchedStorageID, err := metaStorageID(db, object.AddressOf(raw1))
require.NoError(t, err)
require.Equal(t, storageID, fetchedStorageID)
t.Run("update storageID", func(t *testing.T) {
newID := []byte{5, 6, 7, 8}
err := metaPut(db, raw1, newID)
require.NoError(t, err)
fetchedBlobovniczaID, err := metaStorageID(db, object.AddressOf(raw1))
require.NoError(t, err)
require.Equal(t, newID, fetchedBlobovniczaID)
})
t.Run("update storageID on bad object", func(t *testing.T) {
raw2 := testutil.GenerateObject()
err := putBig(db, raw2)
require.NoError(t, err)
fetchedBlobovniczaID, err := metaStorageID(db, object.AddressOf(raw2))
require.NoError(t, err)
require.Nil(t, fetchedBlobovniczaID)
})
}
func metaPut(db *meta.DB, obj *objectSDK.Object, id []byte) error {
var putPrm meta.PutPrm
putPrm.SetObject(obj)
putPrm.SetStorageID(id)
_, err := db.Put(context.Background(), putPrm)
return err
}