forked from TrueCloudLab/frostfs-node
330 lines
8.6 KiB
Go
330 lines
8.6 KiB
Go
package blobstor
|
|
|
|
import (
|
|
"context"
|
|
"path/filepath"
|
|
"sync"
|
|
"testing"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func defaultTestStorages(p string, smallSizeLimit uint64) ([]SubStorage, *teststore.TestStore, *teststore.TestStore) {
|
|
smallFileStorage := teststore.New(teststore.WithSubstorage(blobovniczatree.NewBlobovniczaTree(
|
|
blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
|
|
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
|
|
))
|
|
largeFileStorage := teststore.New(teststore.WithSubstorage(fstree.New(fstree.WithPath(p))))
|
|
return []SubStorage{
|
|
{
|
|
Storage: smallFileStorage,
|
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
|
return uint64(len(data)) <= smallSizeLimit
|
|
},
|
|
},
|
|
{
|
|
Storage: largeFileStorage,
|
|
},
|
|
}, smallFileStorage, largeFileStorage
|
|
}
|
|
|
|
func defaultStorages(p string, smallSizeLimit uint64) []SubStorage {
|
|
storages, _, _ := defaultTestStorages(p, smallSizeLimit)
|
|
return storages
|
|
}
|
|
|
|
func TestCompression(t *testing.T) {
|
|
dir := t.TempDir()
|
|
|
|
const (
|
|
smallSizeLimit = 512
|
|
objCount = 4
|
|
)
|
|
|
|
newBlobStor := func(t *testing.T, compress bool) *BlobStor {
|
|
bs := New(
|
|
WithCompressObjects(compress),
|
|
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
|
require.NoError(t, bs.Open(false))
|
|
require.NoError(t, bs.Init())
|
|
return bs
|
|
}
|
|
|
|
bigObj := make([]*objectSDK.Object, objCount)
|
|
smallObj := make([]*objectSDK.Object, objCount)
|
|
for i := 0; i < objCount; i++ {
|
|
bigObj[i] = testObject(smallSizeLimit * 2)
|
|
smallObj[i] = testObject(smallSizeLimit / 2)
|
|
}
|
|
|
|
testGet := func(t *testing.T, b *BlobStor, i int) {
|
|
res1, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(smallObj[i])})
|
|
require.NoError(t, err)
|
|
require.Equal(t, smallObj[i], res1.Object)
|
|
|
|
res2, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(bigObj[i])})
|
|
require.NoError(t, err)
|
|
require.Equal(t, bigObj[i], res2.Object)
|
|
}
|
|
|
|
testPut := func(t *testing.T, b *BlobStor, i int) {
|
|
var prm common.PutPrm
|
|
prm.Object = smallObj[i]
|
|
_, err := b.Put(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
|
|
prm = common.PutPrm{}
|
|
prm.Object = bigObj[i]
|
|
_, err = b.Put(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Put and Get uncompressed object
|
|
blobStor := newBlobStor(t, false)
|
|
testPut(t, blobStor, 0)
|
|
testGet(t, blobStor, 0)
|
|
require.NoError(t, blobStor.Close())
|
|
|
|
blobStor = newBlobStor(t, true)
|
|
testGet(t, blobStor, 0) // get uncompressed object with compress enabled
|
|
testPut(t, blobStor, 1)
|
|
testGet(t, blobStor, 1)
|
|
require.NoError(t, blobStor.Close())
|
|
|
|
blobStor = newBlobStor(t, false)
|
|
testGet(t, blobStor, 0) // get old uncompressed object
|
|
testGet(t, blobStor, 1) // get compressed object with compression disabled
|
|
testPut(t, blobStor, 2)
|
|
testGet(t, blobStor, 2)
|
|
require.NoError(t, blobStor.Close())
|
|
}
|
|
|
|
func TestBlobstor_needsCompression(t *testing.T) {
|
|
const smallSizeLimit = 512
|
|
newBlobStor := func(t *testing.T, compress bool, ct ...string) *BlobStor {
|
|
dir := t.TempDir()
|
|
|
|
bs := New(
|
|
WithCompressObjects(compress),
|
|
WithUncompressableContentTypes(ct),
|
|
WithStorages([]SubStorage{
|
|
{
|
|
Storage: blobovniczatree.NewBlobovniczaTree(
|
|
blobovniczatree.WithRootPath(filepath.Join(dir, "blobovnicza")),
|
|
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
|
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
|
return uint64(len(data)) < smallSizeLimit
|
|
},
|
|
},
|
|
{
|
|
Storage: fstree.New(fstree.WithPath(dir)),
|
|
},
|
|
}))
|
|
require.NoError(t, bs.Open(false))
|
|
require.NoError(t, bs.Init())
|
|
return bs
|
|
}
|
|
|
|
newObjectWithCt := func(contentType string) *objectSDK.Object {
|
|
obj := testObject(smallSizeLimit + 1)
|
|
if contentType != "" {
|
|
var a objectSDK.Attribute
|
|
a.SetKey(objectSDK.AttributeContentType)
|
|
a.SetValue(contentType)
|
|
obj.SetAttributes(a)
|
|
}
|
|
return obj
|
|
}
|
|
|
|
t.Run("content-types specified", func(t *testing.T) {
|
|
b := newBlobStor(t, true, "audio/*", "*/x-mpeg", "*/mpeg", "application/x-midi")
|
|
|
|
obj := newObjectWithCt("video/mpeg")
|
|
require.False(t, b.NeedsCompression(obj))
|
|
|
|
obj = newObjectWithCt("audio/aiff")
|
|
require.False(t, b.NeedsCompression(obj))
|
|
|
|
obj = newObjectWithCt("application/x-midi")
|
|
require.False(t, b.NeedsCompression(obj))
|
|
|
|
obj = newObjectWithCt("text/plain")
|
|
require.True(t, b.NeedsCompression(obj))
|
|
|
|
obj = newObjectWithCt("")
|
|
require.True(t, b.NeedsCompression(obj))
|
|
})
|
|
t.Run("content-types omitted", func(t *testing.T) {
|
|
b := newBlobStor(t, true)
|
|
obj := newObjectWithCt("video/mpeg")
|
|
require.True(t, b.NeedsCompression(obj))
|
|
})
|
|
t.Run("compress disabled", func(t *testing.T) {
|
|
b := newBlobStor(t, false, "video/mpeg")
|
|
|
|
obj := newObjectWithCt("video/mpeg")
|
|
require.False(t, b.NeedsCompression(obj))
|
|
|
|
obj = newObjectWithCt("text/plain")
|
|
require.False(t, b.NeedsCompression(obj))
|
|
})
|
|
}
|
|
|
|
func TestConcurrentPut(t *testing.T) {
|
|
dir := t.TempDir()
|
|
|
|
const (
|
|
smallSizeLimit = 512
|
|
|
|
// concurrentPutCount is fstree implementation specific
|
|
concurrentPutCount = 5
|
|
)
|
|
|
|
blobStor := New(
|
|
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
|
require.NoError(t, blobStor.Open(false))
|
|
require.NoError(t, blobStor.Init())
|
|
|
|
testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
|
res, err := b.Get(common.GetPrm{Address: object.AddressOf(obj)})
|
|
require.NoError(t, err)
|
|
require.Equal(t, obj, res.Object)
|
|
}
|
|
|
|
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
|
var prm common.PutPrm
|
|
prm.Object = obj
|
|
_, err := b.Put(prm)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
testPutFileExistsError := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
|
var prm common.PutPrm
|
|
prm.Object = obj
|
|
if _, err := b.Put(prm); err != nil {
|
|
require.ErrorContains(t, err, "file exists")
|
|
}
|
|
}
|
|
|
|
t.Run("put the same big object", func(t *testing.T) {
|
|
bigObj := testObject(smallSizeLimit * 2)
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < concurrentPutCount; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
testPut(t, blobStor, bigObj)
|
|
wg.Done()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
testGet(t, blobStor, bigObj)
|
|
})
|
|
|
|
t.Run("put the same big object with error", func(t *testing.T) {
|
|
bigObj := testObject(smallSizeLimit * 2)
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < concurrentPutCount+1; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
testPutFileExistsError(t, blobStor, bigObj)
|
|
wg.Done()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
testGet(t, blobStor, bigObj)
|
|
})
|
|
|
|
t.Run("put the same small object", func(t *testing.T) {
|
|
smallObj := testObject(smallSizeLimit / 2)
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < concurrentPutCount; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
testPut(t, blobStor, smallObj)
|
|
wg.Done()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
testGet(t, blobStor, smallObj)
|
|
})
|
|
}
|
|
|
|
func TestConcurrentDelete(t *testing.T) {
|
|
dir := t.TempDir()
|
|
|
|
const smallSizeLimit = 512
|
|
|
|
blobStor := New(
|
|
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
|
require.NoError(t, blobStor.Open(false))
|
|
require.NoError(t, blobStor.Init())
|
|
|
|
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
|
var prm common.PutPrm
|
|
prm.Object = obj
|
|
_, err := b.Put(prm)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
testDelete := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
|
var prm common.DeletePrm
|
|
prm.Address = object.AddressOf(obj)
|
|
if _, err := b.Delete(prm); err != nil {
|
|
require.ErrorContains(t, err, "object not found")
|
|
}
|
|
}
|
|
|
|
testDeletedExists := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
|
var prm common.ExistsPrm
|
|
prm.Address = object.AddressOf(obj)
|
|
res, err := b.Exists(prm)
|
|
require.NoError(t, err)
|
|
require.False(t, res.Exists)
|
|
}
|
|
|
|
t.Run("delete the same big object", func(t *testing.T) {
|
|
bigObj := testObject(smallSizeLimit * 2)
|
|
testPut(t, blobStor, bigObj)
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 2; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
testDelete(t, blobStor, bigObj)
|
|
wg.Done()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
testDeletedExists(t, blobStor, bigObj)
|
|
})
|
|
|
|
t.Run("delete the same small object", func(t *testing.T) {
|
|
smallObj := testObject(smallSizeLimit / 2)
|
|
testPut(t, blobStor, smallObj)
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 2; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
testDelete(t, blobStor, smallObj)
|
|
wg.Done()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
testDeletedExists(t, blobStor, smallObj)
|
|
})
|
|
}
|