[#118] node: add unit concurrent tests for blobstor
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
Signed-off-by: Airat Arifullin a.arifullin@yadro.com
This commit is contained in:
parent
56282edf02
commit
1478cf631a
1 changed files with 154 additions and 0 deletions
|
@ -2,6 +2,7 @@ package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
@ -173,3 +174,156 @@ func TestBlobstor_needsCompression(t *testing.T) {
|
||||||
require.False(t, b.NeedsCompression(obj))
|
require.False(t, b.NeedsCompression(obj))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcurrentPut(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
const (
|
||||||
|
smallSizeLimit = 512
|
||||||
|
|
||||||
|
// concurrentPutCount is fstree implementation specific
|
||||||
|
concurrentPutCount = 5
|
||||||
|
)
|
||||||
|
|
||||||
|
blobStor := New(
|
||||||
|
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||||
|
require.NoError(t, blobStor.Open(false))
|
||||||
|
require.NoError(t, blobStor.Init())
|
||||||
|
|
||||||
|
testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||||
|
res, err := b.Get(common.GetPrm{Address: object.AddressOf(obj)})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, obj, res.Object)
|
||||||
|
}
|
||||||
|
|
||||||
|
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||||
|
var prm common.PutPrm
|
||||||
|
prm.Object = obj
|
||||||
|
_, err := b.Put(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
testPutFileExistsError := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||||
|
var prm common.PutPrm
|
||||||
|
prm.Object = obj
|
||||||
|
if _, err := b.Put(prm); err != nil {
|
||||||
|
require.ErrorContains(t, err, "file exists")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("put the same big object", func(t *testing.T) {
|
||||||
|
bigObj := testObject(smallSizeLimit * 2)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < concurrentPutCount; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
testPut(t, blobStor, bigObj)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
testGet(t, blobStor, bigObj)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("put the same big object with error", func(t *testing.T) {
|
||||||
|
bigObj := testObject(smallSizeLimit * 2)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < concurrentPutCount+1; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
testPutFileExistsError(t, blobStor, bigObj)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
testGet(t, blobStor, bigObj)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("put the same small object", func(t *testing.T) {
|
||||||
|
smallObj := testObject(smallSizeLimit / 2)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < concurrentPutCount; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
testPut(t, blobStor, smallObj)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
testGet(t, blobStor, smallObj)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConcurrentDelete(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
const smallSizeLimit = 512
|
||||||
|
|
||||||
|
blobStor := New(
|
||||||
|
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||||
|
require.NoError(t, blobStor.Open(false))
|
||||||
|
require.NoError(t, blobStor.Init())
|
||||||
|
|
||||||
|
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||||
|
var prm common.PutPrm
|
||||||
|
prm.Object = obj
|
||||||
|
_, err := b.Put(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
testDelete := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||||
|
var prm common.DeletePrm
|
||||||
|
prm.Address = object.AddressOf(obj)
|
||||||
|
if _, err := b.Delete(prm); err != nil {
|
||||||
|
require.ErrorContains(t, err, "object not found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
testDeletedExists := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||||
|
var prm common.ExistsPrm
|
||||||
|
prm.Address = object.AddressOf(obj)
|
||||||
|
res, err := b.Exists(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, res.Exists)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("delete the same big object", func(t *testing.T) {
|
||||||
|
bigObj := testObject(smallSizeLimit * 2)
|
||||||
|
testPut(t, blobStor, bigObj)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
testDelete(t, blobStor, bigObj)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
testDeletedExists(t, blobStor, bigObj)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("delete the same small object", func(t *testing.T) {
|
||||||
|
smallObj := testObject(smallSizeLimit / 2)
|
||||||
|
testPut(t, blobStor, smallObj)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
testDelete(t, blobStor, smallObj)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
testDeletedExists(t, blobStor, smallObj)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue