package blobstor import ( "context" "path/filepath" "sync" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/stretchr/testify/require" ) func defaultTestStorages(p string, smallSizeLimit uint64) ([]SubStorage, *teststore.TestStore, *teststore.TestStore) { smallFileStorage := teststore.New(teststore.WithSubstorage(blobovniczatree.NewBlobovniczaTree( blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")), blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init )) largeFileStorage := teststore.New(teststore.WithSubstorage(fstree.New(fstree.WithPath(p)))) return []SubStorage{ { Storage: smallFileStorage, Policy: func(_ *objectSDK.Object, data []byte) bool { return uint64(len(data)) <= smallSizeLimit }, }, { Storage: largeFileStorage, }, }, smallFileStorage, largeFileStorage } func defaultStorages(p string, smallSizeLimit uint64) []SubStorage { storages, _, _ := defaultTestStorages(p, smallSizeLimit) return storages } func TestCompression(t *testing.T) { dir := t.TempDir() const ( smallSizeLimit = 512 objCount = 4 ) newBlobStor := func(t *testing.T, compress bool) *BlobStor { bs := New( WithCompressObjects(compress), WithStorages(defaultStorages(dir, smallSizeLimit))) require.NoError(t, bs.Open(false)) require.NoError(t, bs.Init()) return bs } bigObj := make([]*objectSDK.Object, objCount) smallObj := make([]*objectSDK.Object, objCount) for i := 0; i < objCount; i++ { bigObj[i] = testObject(smallSizeLimit * 2) smallObj[i] = testObject(smallSizeLimit / 2) } testGet := func(t *testing.T, b *BlobStor, i int) { res1, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(smallObj[i])}) require.NoError(t, err) require.Equal(t, smallObj[i], res1.Object) res2, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(bigObj[i])}) require.NoError(t, err) require.Equal(t, bigObj[i], res2.Object) } testPut := func(t *testing.T, b *BlobStor, i int) { var prm common.PutPrm prm.Object = smallObj[i] _, err := b.Put(context.Background(), prm) require.NoError(t, err) prm = common.PutPrm{} prm.Object = bigObj[i] _, err = b.Put(context.Background(), prm) require.NoError(t, err) } // Put and Get uncompressed object blobStor := newBlobStor(t, false) testPut(t, blobStor, 0) testGet(t, blobStor, 0) require.NoError(t, blobStor.Close()) blobStor = newBlobStor(t, true) testGet(t, blobStor, 0) // get uncompressed object with compress enabled testPut(t, blobStor, 1) testGet(t, blobStor, 1) require.NoError(t, blobStor.Close()) blobStor = newBlobStor(t, false) testGet(t, blobStor, 0) // get old uncompressed object testGet(t, blobStor, 1) // get compressed object with compression disabled testPut(t, blobStor, 2) testGet(t, blobStor, 2) require.NoError(t, blobStor.Close()) } func TestBlobstor_needsCompression(t *testing.T) { const smallSizeLimit = 512 newBlobStor := func(t *testing.T, compress bool, ct ...string) *BlobStor { dir := t.TempDir() bs := New( WithCompressObjects(compress), WithUncompressableContentTypes(ct), WithStorages([]SubStorage{ { Storage: blobovniczatree.NewBlobovniczaTree( blobovniczatree.WithRootPath(filepath.Join(dir, "blobovnicza")), blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init Policy: func(_ *objectSDK.Object, data []byte) bool { return uint64(len(data)) < smallSizeLimit }, }, { Storage: fstree.New(fstree.WithPath(dir)), }, })) require.NoError(t, bs.Open(false)) require.NoError(t, bs.Init()) return bs } newObjectWithCt := func(contentType string) *objectSDK.Object { obj := testObject(smallSizeLimit + 1) if contentType != "" { var a objectSDK.Attribute a.SetKey(objectSDK.AttributeContentType) a.SetValue(contentType) obj.SetAttributes(a) } return obj } t.Run("content-types specified", func(t *testing.T) { b := newBlobStor(t, true, "audio/*", "*/x-mpeg", "*/mpeg", "application/x-midi") obj := newObjectWithCt("video/mpeg") require.False(t, b.compression.NeedsCompression(obj)) obj = newObjectWithCt("audio/aiff") require.False(t, b.compression.NeedsCompression(obj)) obj = newObjectWithCt("application/x-midi") require.False(t, b.compression.NeedsCompression(obj)) obj = newObjectWithCt("text/plain") require.True(t, b.compression.NeedsCompression(obj)) obj = newObjectWithCt("") require.True(t, b.compression.NeedsCompression(obj)) }) t.Run("content-types omitted", func(t *testing.T) { b := newBlobStor(t, true) obj := newObjectWithCt("video/mpeg") require.True(t, b.compression.NeedsCompression(obj)) }) t.Run("compress disabled", func(t *testing.T) { b := newBlobStor(t, false, "video/mpeg") obj := newObjectWithCt("video/mpeg") require.False(t, b.compression.NeedsCompression(obj)) obj = newObjectWithCt("text/plain") require.False(t, b.compression.NeedsCompression(obj)) }) } func TestConcurrentPut(t *testing.T) { dir := t.TempDir() const ( smallSizeLimit = 512 // concurrentPutCount is fstree implementation specific concurrentPutCount = 5 ) blobStor := New( WithStorages(defaultStorages(dir, smallSizeLimit))) require.NoError(t, blobStor.Open(false)) require.NoError(t, blobStor.Init()) testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { res, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(obj)}) require.NoError(t, err) require.Equal(t, obj, res.Object) } testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { var prm common.PutPrm prm.Object = obj _, err := b.Put(context.Background(), prm) require.NoError(t, err) } testPutFileExistsError := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { var prm common.PutPrm prm.Object = obj if _, err := b.Put(context.Background(), prm); err != nil { require.ErrorContains(t, err, "file exists") } } t.Run("put the same big object", func(t *testing.T) { bigObj := testObject(smallSizeLimit * 2) var wg sync.WaitGroup for i := 0; i < concurrentPutCount; i++ { wg.Add(1) go func() { testPut(t, blobStor, bigObj) wg.Done() }() } wg.Wait() testGet(t, blobStor, bigObj) }) t.Run("put the same big object with error", func(t *testing.T) { bigObj := testObject(smallSizeLimit * 2) var wg sync.WaitGroup for i := 0; i < concurrentPutCount+1; i++ { wg.Add(1) go func() { testPutFileExistsError(t, blobStor, bigObj) wg.Done() }() } wg.Wait() testGet(t, blobStor, bigObj) }) t.Run("put the same small object", func(t *testing.T) { smallObj := testObject(smallSizeLimit / 2) var wg sync.WaitGroup for i := 0; i < concurrentPutCount; i++ { wg.Add(1) go func() { testPut(t, blobStor, smallObj) wg.Done() }() } wg.Wait() testGet(t, blobStor, smallObj) }) } func TestConcurrentDelete(t *testing.T) { dir := t.TempDir() const smallSizeLimit = 512 blobStor := New( WithStorages(defaultStorages(dir, smallSizeLimit))) require.NoError(t, blobStor.Open(false)) require.NoError(t, blobStor.Init()) testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { var prm common.PutPrm prm.Object = obj _, err := b.Put(context.Background(), prm) require.NoError(t, err) } testDelete := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { var prm common.DeletePrm prm.Address = object.AddressOf(obj) if _, err := b.Delete(context.Background(), prm); err != nil { require.ErrorContains(t, err, "object not found") } } testDeletedExists := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { var prm common.ExistsPrm prm.Address = object.AddressOf(obj) res, err := b.Exists(context.Background(), prm) require.NoError(t, err) require.False(t, res.Exists) } t.Run("delete the same big object", func(t *testing.T) { bigObj := testObject(smallSizeLimit * 2) testPut(t, blobStor, bigObj) var wg sync.WaitGroup for i := 0; i < 2; i++ { wg.Add(1) go func() { testDelete(t, blobStor, bigObj) wg.Done() }() } wg.Wait() testDeletedExists(t, blobStor, bigObj) }) t.Run("delete the same small object", func(t *testing.T) { smallObj := testObject(smallSizeLimit / 2) testPut(t, blobStor, smallObj) var wg sync.WaitGroup for i := 0; i < 2; i++ { wg.Add(1) go func() { testDelete(t, blobStor, smallObj) wg.Done() }() } wg.Wait() testDeletedExists(t, blobStor, smallObj) }) }