From 1478cf631a7270e2165d7fd49cddc8bd1482a088 Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Mon, 10 Apr 2023 21:46:03 +0300 Subject: [PATCH] [#118] node: add unit concurrent tests for blobstor Signed-off-by: Airat Arifullin a.arifullin@yadro.com --- .../blobstor/blobstor_test.go | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) diff --git a/pkg/local_object_storage/blobstor/blobstor_test.go b/pkg/local_object_storage/blobstor/blobstor_test.go index df001a365..0b008e088 100644 --- a/pkg/local_object_storage/blobstor/blobstor_test.go +++ b/pkg/local_object_storage/blobstor/blobstor_test.go @@ -2,6 +2,7 @@ package blobstor import ( "path/filepath" + "sync" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -173,3 +174,156 @@ func TestBlobstor_needsCompression(t *testing.T) { require.False(t, b.NeedsCompression(obj)) }) } + +func TestConcurrentPut(t *testing.T) { + dir := t.TempDir() + + const ( + smallSizeLimit = 512 + + // concurrentPutCount is fstree implementation specific + concurrentPutCount = 5 + ) + + blobStor := New( + WithStorages(defaultStorages(dir, smallSizeLimit))) + require.NoError(t, blobStor.Open(false)) + require.NoError(t, blobStor.Init()) + + testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { + res, err := b.Get(common.GetPrm{Address: object.AddressOf(obj)}) + require.NoError(t, err) + require.Equal(t, obj, res.Object) + } + + testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { + var prm common.PutPrm + prm.Object = obj + _, err := b.Put(prm) + require.NoError(t, err) + } + + testPutFileExistsError := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { + var prm common.PutPrm + prm.Object = obj + if _, err := b.Put(prm); err != nil { + require.ErrorContains(t, err, "file exists") + } + } + + t.Run("put the same big object", func(t *testing.T) { + bigObj := testObject(smallSizeLimit * 2) + + var wg sync.WaitGroup + for i := 0; i < concurrentPutCount; i++ { + wg.Add(1) + go func() { + testPut(t, blobStor, bigObj) + wg.Done() + }() + } + wg.Wait() + + testGet(t, blobStor, bigObj) + }) + + t.Run("put the same big object with error", func(t *testing.T) { + bigObj := testObject(smallSizeLimit * 2) + + var wg sync.WaitGroup + for i := 0; i < concurrentPutCount+1; i++ { + wg.Add(1) + go func() { + testPutFileExistsError(t, blobStor, bigObj) + wg.Done() + }() + } + wg.Wait() + + testGet(t, blobStor, bigObj) + }) + + t.Run("put the same small object", func(t *testing.T) { + smallObj := testObject(smallSizeLimit / 2) + + var wg sync.WaitGroup + for i := 0; i < concurrentPutCount; i++ { + wg.Add(1) + go func() { + testPut(t, blobStor, smallObj) + wg.Done() + }() + } + wg.Wait() + + testGet(t, blobStor, smallObj) + }) +} + +func TestConcurrentDelete(t *testing.T) { + dir := t.TempDir() + + const smallSizeLimit = 512 + + blobStor := New( + WithStorages(defaultStorages(dir, smallSizeLimit))) + require.NoError(t, blobStor.Open(false)) + require.NoError(t, blobStor.Init()) + + testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { + var prm common.PutPrm + prm.Object = obj + _, err := b.Put(prm) + require.NoError(t, err) + } + + testDelete := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { + var prm common.DeletePrm + prm.Address = object.AddressOf(obj) + if _, err := b.Delete(prm); err != nil { + require.ErrorContains(t, err, "object not found") + } + } + + testDeletedExists := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { + var prm common.ExistsPrm + prm.Address = object.AddressOf(obj) + res, err := b.Exists(prm) + require.NoError(t, err) + require.False(t, res.Exists) + } + + t.Run("delete the same big object", func(t *testing.T) { + bigObj := testObject(smallSizeLimit * 2) + testPut(t, blobStor, bigObj) + + var wg sync.WaitGroup + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + testDelete(t, blobStor, bigObj) + wg.Done() + }() + } + wg.Wait() + + testDeletedExists(t, blobStor, bigObj) + }) + + t.Run("delete the same small object", func(t *testing.T) { + smallObj := testObject(smallSizeLimit / 2) + testPut(t, blobStor, smallObj) + + var wg sync.WaitGroup + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + testDelete(t, blobStor, smallObj) + wg.Done() + }() + } + wg.Wait() + + testDeletedExists(t, blobStor, smallObj) + }) +}