[#118] node: add unit concurrent tests for blobstor #233
1 changed files with 154 additions and 0 deletions
|
@ -2,6 +2,7 @@ package blobstor
|
|||
|
||||
import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
|
@ -173,3 +174,156 @@ func TestBlobstor_needsCompression(t *testing.T) {
|
|||
require.False(t, b.NeedsCompression(obj))
|
||||
})
|
||||
}
|
||||
|
||||
func TestConcurrentPut(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
const (
|
||||
smallSizeLimit = 512
|
||||
|
||||
// concurrentPutCount is fstree implementation specific
|
||||
concurrentPutCount = 5
|
||||
)
|
||||
|
||||
blobStor := New(
|
||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||
require.NoError(t, blobStor.Open(false))
|
||||
require.NoError(t, blobStor.Init())
|
||||
|
||||
testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||
res, err := b.Get(common.GetPrm{Address: object.AddressOf(obj)})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, obj, res.Object)
|
||||
}
|
||||
|
||||
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||
var prm common.PutPrm
|
||||
prm.Object = obj
|
||||
_, err := b.Put(prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
testPutFileExistsError := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||
var prm common.PutPrm
|
||||
prm.Object = obj
|
||||
if _, err := b.Put(prm); err != nil {
|
||||
require.ErrorContains(t, err, "file exists")
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("put the same big object", func(t *testing.T) {
|
||||
bigObj := testObject(smallSizeLimit * 2)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < concurrentPutCount; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
testPut(t, blobStor, bigObj)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
testGet(t, blobStor, bigObj)
|
||||
})
|
||||
|
||||
t.Run("put the same big object with error", func(t *testing.T) {
|
||||
bigObj := testObject(smallSizeLimit * 2)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < concurrentPutCount+1; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
testPutFileExistsError(t, blobStor, bigObj)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
testGet(t, blobStor, bigObj)
|
||||
})
|
||||
|
||||
t.Run("put the same small object", func(t *testing.T) {
|
||||
smallObj := testObject(smallSizeLimit / 2)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < concurrentPutCount; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
testPut(t, blobStor, smallObj)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
testGet(t, blobStor, smallObj)
|
||||
})
|
||||
}
|
||||
|
||||
func TestConcurrentDelete(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
const smallSizeLimit = 512
|
||||
|
||||
blobStor := New(
|
||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||
require.NoError(t, blobStor.Open(false))
|
||||
require.NoError(t, blobStor.Init())
|
||||
|
||||
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||
var prm common.PutPrm
|
||||
prm.Object = obj
|
||||
_, err := b.Put(prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
testDelete := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||
var prm common.DeletePrm
|
||||
prm.Address = object.AddressOf(obj)
|
||||
if _, err := b.Delete(prm); err != nil {
|
||||
require.ErrorContains(t, err, "object not found")
|
||||
}
|
||||
}
|
||||
|
||||
testDeletedExists := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||
var prm common.ExistsPrm
|
||||
prm.Address = object.AddressOf(obj)
|
||||
res, err := b.Exists(prm)
|
||||
require.NoError(t, err)
|
||||
require.False(t, res.Exists)
|
||||
}
|
||||
|
||||
t.Run("delete the same big object", func(t *testing.T) {
|
||||
bigObj := testObject(smallSizeLimit * 2)
|
||||
testPut(t, blobStor, bigObj)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 2; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
testDelete(t, blobStor, bigObj)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
testDeletedExists(t, blobStor, bigObj)
|
||||
})
|
||||
|
||||
t.Run("delete the same small object", func(t *testing.T) {
|
||||
smallObj := testObject(smallSizeLimit / 2)
|
||||
testPut(t, blobStor, smallObj)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 2; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
testDelete(t, blobStor, smallObj)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
testDeletedExists(t, blobStor, smallObj)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue