frostfs-node/pkg/local_object_storage/blobstor/blobstor_test.go

332 lines
9.0 KiB
Go

package blobstor
import (
"context"
"path/filepath"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/stretchr/testify/require"
)
func defaultTestStorages(p string, smallSizeLimit uint64) ([]SubStorage, *teststore.TestStore, *teststore.TestStore) {
smallFileStorage := teststore.New(teststore.WithSubstorage(blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
))
largeFileStorage := teststore.New(teststore.WithSubstorage(fstree.New(fstree.WithPath(p))))
return []SubStorage{
{
Storage: smallFileStorage,
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) <= smallSizeLimit
},
},
{
Storage: largeFileStorage,
},
}, smallFileStorage, largeFileStorage
}
func defaultStorages(p string, smallSizeLimit uint64) []SubStorage {
storages, _, _ := defaultTestStorages(p, smallSizeLimit)
return storages
}
func TestCompression(t *testing.T) {
dir := t.TempDir()
const (
smallSizeLimit = 512
objCount = 4
)
newBlobStor := func(t *testing.T, compress bool) *BlobStor {
bs := New(
WithCompressObjects(compress),
WithStorages(defaultStorages(dir, smallSizeLimit)))
require.NoError(t, bs.Open(context.Background(), mode.ReadWrite))
require.NoError(t, bs.Init())
return bs
}
bigObj := make([]*objectSDK.Object, objCount)
smallObj := make([]*objectSDK.Object, objCount)
for i := 0; i < objCount; i++ {
bigObj[i] = testObject(smallSizeLimit * 2)
smallObj[i] = testObject(smallSizeLimit / 2)
}
testGet := func(t *testing.T, b *BlobStor, i int) {
res1, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(smallObj[i])})
require.NoError(t, err)
require.Equal(t, smallObj[i], res1.Object)
res2, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(bigObj[i])})
require.NoError(t, err)
require.Equal(t, bigObj[i], res2.Object)
}
testPut := func(t *testing.T, b *BlobStor, i int) {
var prm common.PutPrm
prm.Object = smallObj[i]
_, err := b.Put(context.Background(), prm)
require.NoError(t, err)
prm = common.PutPrm{}
prm.Object = bigObj[i]
_, err = b.Put(context.Background(), prm)
require.NoError(t, err)
}
// Put and Get uncompressed object
blobStor := newBlobStor(t, false)
testPut(t, blobStor, 0)
testGet(t, blobStor, 0)
require.NoError(t, blobStor.Close())
blobStor = newBlobStor(t, true)
testGet(t, blobStor, 0) // get uncompressed object with compress enabled
testPut(t, blobStor, 1)
testGet(t, blobStor, 1)
require.NoError(t, blobStor.Close())
blobStor = newBlobStor(t, false)
testGet(t, blobStor, 0) // get old uncompressed object
testGet(t, blobStor, 1) // get compressed object with compression disabled
testPut(t, blobStor, 2)
testGet(t, blobStor, 2)
require.NoError(t, blobStor.Close())
}
func TestBlobstor_needsCompression(t *testing.T) {
const smallSizeLimit = 512
newBlobStor := func(t *testing.T, compress bool, ct ...string) *BlobStor {
dir := t.TempDir()
bs := New(
WithCompressObjects(compress),
WithUncompressableContentTypes(ct),
WithStorages([]SubStorage{
{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(dir, "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < smallSizeLimit
},
},
{
Storage: fstree.New(fstree.WithPath(dir)),
},
}))
require.NoError(t, bs.Open(context.Background(), mode.ReadWrite))
require.NoError(t, bs.Init())
return bs
}
newObjectWithCt := func(contentType string) *objectSDK.Object {
obj := testObject(smallSizeLimit + 1)
if contentType != "" {
var a objectSDK.Attribute
a.SetKey(objectSDK.AttributeContentType)
a.SetValue(contentType)
obj.SetAttributes(a)
}
return obj
}
t.Run("content-types specified", func(t *testing.T) {
b := newBlobStor(t, true, "audio/*", "*/x-mpeg", "*/mpeg", "application/x-midi")
obj := newObjectWithCt("video/mpeg")
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("audio/aiff")
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("application/x-midi")
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("text/plain")
require.True(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("")
require.True(t, b.compression.NeedsCompression(obj))
})
t.Run("content-types omitted", func(t *testing.T) {
b := newBlobStor(t, true)
obj := newObjectWithCt("video/mpeg")
require.True(t, b.compression.NeedsCompression(obj))
})
t.Run("compress disabled", func(t *testing.T) {
b := newBlobStor(t, false, "video/mpeg")
obj := newObjectWithCt("video/mpeg")
require.False(t, b.compression.NeedsCompression(obj))
obj = newObjectWithCt("text/plain")
require.False(t, b.compression.NeedsCompression(obj))
})
}
func TestConcurrentPut(t *testing.T) {
dir := t.TempDir()
const (
smallSizeLimit = 512
// concurrentPutCount is fstree implementation specific
concurrentPutCount = 5
)
blobStor := New(
WithStorages(defaultStorages(dir, smallSizeLimit)))
require.NoError(t, blobStor.Open(context.Background(), mode.ReadWrite))
require.NoError(t, blobStor.Init())
testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
res, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(obj)})
require.NoError(t, err)
require.Equal(t, obj, res.Object)
}
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
var prm common.PutPrm
prm.Object = obj
_, err := b.Put(context.Background(), prm)
require.NoError(t, err)
}
testPutFileExistsError := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
var prm common.PutPrm
prm.Object = obj
if _, err := b.Put(context.Background(), prm); err != nil {
require.ErrorContains(t, err, "file exists")
}
}
t.Run("put the same big object", func(t *testing.T) {
bigObj := testObject(smallSizeLimit * 2)
var wg sync.WaitGroup
for i := 0; i < concurrentPutCount; i++ {
wg.Add(1)
go func() {
testPut(t, blobStor, bigObj)
wg.Done()
}()
}
wg.Wait()
testGet(t, blobStor, bigObj)
})
t.Run("put the same big object with error", func(t *testing.T) {
bigObj := testObject(smallSizeLimit * 2)
var wg sync.WaitGroup
for i := 0; i < concurrentPutCount+1; i++ {
wg.Add(1)
go func() {
testPutFileExistsError(t, blobStor, bigObj)
wg.Done()
}()
}
wg.Wait()
testGet(t, blobStor, bigObj)
})
t.Run("put the same small object", func(t *testing.T) {
smallObj := testObject(smallSizeLimit / 2)
var wg sync.WaitGroup
for i := 0; i < concurrentPutCount; i++ {
wg.Add(1)
go func() {
testPut(t, blobStor, smallObj)
wg.Done()
}()
}
wg.Wait()
testGet(t, blobStor, smallObj)
})
}
func TestConcurrentDelete(t *testing.T) {
dir := t.TempDir()
const smallSizeLimit = 512
blobStor := New(
WithStorages(defaultStorages(dir, smallSizeLimit)))
require.NoError(t, blobStor.Open(context.Background(), mode.ReadWrite))
require.NoError(t, blobStor.Init())
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
var prm common.PutPrm
prm.Object = obj
_, err := b.Put(context.Background(), prm)
require.NoError(t, err)
}
testDelete := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
var prm common.DeletePrm
prm.Address = object.AddressOf(obj)
if _, err := b.Delete(context.Background(), prm); err != nil {
require.ErrorContains(t, err, "object not found")
}
}
testDeletedExists := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
var prm common.ExistsPrm
prm.Address = object.AddressOf(obj)
res, err := b.Exists(context.Background(), prm)
require.NoError(t, err)
require.False(t, res.Exists)
}
t.Run("delete the same big object", func(t *testing.T) {
bigObj := testObject(smallSizeLimit * 2)
testPut(t, blobStor, bigObj)
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
testDelete(t, blobStor, bigObj)
wg.Done()
}()
}
wg.Wait()
testDeletedExists(t, blobStor, bigObj)
})
t.Run("delete the same small object", func(t *testing.T) {
smallObj := testObject(smallSizeLimit / 2)
testPut(t, blobStor, smallObj)
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
testDelete(t, blobStor, smallObj)
wg.Done()
}()
}
wg.Wait()
testDeletedExists(t, blobStor, smallObj)
})
}