frostfs-node/pkg/local_object_storage/engine/inhume_test.go
Aleksey Savchuk 487cb34c5d
[#1689] engine/test: Refactor TestInhumeIfObjectDoesntExist test
- Use the same storage engine in multiple parallel tests
- Move `Lock`, `Inhume`, `Head` calls to separate functions

Change-Id: I00849c1f068f0ab8d92061719d67d6fe786200db
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2025-04-21 15:32:04 +03:00

318 lines
9.1 KiB
Go

package engine
import (
"context"
"fmt"
"strconv"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestStorageEngine_Inhume(t *testing.T) {
cnr := cidtest.ID()
splitID := objectSDK.NewSplitID()
fs := objectSDK.SearchFilters{}
fs.AddRootFilter()
tombstoneID := object.AddressOf(testutil.GenerateObjectWithCID(cnr))
parent := testutil.GenerateObjectWithCID(cnr)
child := testutil.GenerateObjectWithCID(cnr)
child.SetParent(parent)
idParent, _ := parent.ID()
child.SetParentID(idParent)
child.SetSplitID(splitID)
link := testutil.GenerateObjectWithCID(cnr)
link.SetParent(parent)
link.SetParentID(idParent)
idChild, _ := child.ID()
link.SetChildren(idChild)
link.SetSplitID(splitID)
t.Run("delete small object", func(t *testing.T) {
t.Parallel()
e := testNewEngine(t).setShardsNum(t, 1).prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
err := Put(context.Background(), e, parent, false)
require.NoError(t, err)
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
addrs, err := Select(context.Background(), e, cnr, false, fs)
require.NoError(t, err)
require.Empty(t, addrs)
})
t.Run("delete big object", func(t *testing.T) {
t.Parallel()
te := testNewEngine(t).setShardsNum(t, 2).prepare(t)
e := te.engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
s1, s2 := te.shards[0], te.shards[1]
var putChild shard.PutPrm
putChild.SetObject(child)
_, err := s1.Put(context.Background(), putChild)
require.NoError(t, err)
var putLink shard.PutPrm
putLink.SetObject(link)
_, err = s2.Put(context.Background(), putLink)
require.NoError(t, err)
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
addrs, err := Select(context.Background(), e, cnr, false, fs)
require.NoError(t, err)
require.Empty(t, addrs)
})
}
func TestStorageEngine_ECInhume(t *testing.T) {
parentObjectAddress := oidtest.Address()
containerID := parentObjectAddress.Container()
chunkObject0 := testutil.GenerateObjectWithCID(containerID)
chunkObject0.SetECHeader(objectSDK.NewECHeader(
objectSDK.ECParentInfo{
ID: parentObjectAddress.Object(),
}, 0, 4, []byte{}, 0))
chunkObject1 := testutil.GenerateObjectWithCID(containerID)
chunkObject1.SetECHeader(objectSDK.NewECHeader(
objectSDK.ECParentInfo{
ID: parentObjectAddress.Object(),
}, 1, 4, []byte{}, 0))
tombstone := objectSDK.NewTombstone()
tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()})
payload, err := tombstone.Marshal()
require.NoError(t, err)
tombstoneObject := testutil.GenerateObjectWithCID(containerID)
tombstoneObject.SetType(objectSDK.TypeTombstone)
tombstoneObject.SetPayload(payload)
tombstoneObjectAddress := object.AddressOf(tombstoneObject)
e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
require.NoError(t, Put(context.Background(), e, chunkObject0, false))
require.NoError(t, Put(context.Background(), e, tombstoneObject, false))
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)
require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
}
func TestInhumeExpiredRegularObject(t *testing.T) {
t.Parallel()
const currEpoch = 42
const objectExpiresAfter = currEpoch - 1
engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
return []shard.Option{
shard.WithDisabledGC(),
shard.WithMetaBaseOptions(append(
testGetDefaultMetabaseOptions(t),
meta.WithEpochState(epochState{currEpoch}),
)...),
}
}).prepare(t).engine
cnr := cidtest.ID()
generateAndPutObject := func() *objectSDK.Object {
obj := testutil.GenerateObjectWithCID(cnr)
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(objectExpiresAfter))
var putPrm PutPrm
putPrm.Object = obj
require.NoError(t, engine.Put(context.Background(), putPrm))
return obj
}
t.Run("inhume with tombstone", func(t *testing.T) {
obj := generateAndPutObject()
ts := oidtest.Address()
ts.SetContainer(cnr)
var prm InhumePrm
prm.WithTarget(ts, object.AddressOf(obj))
err := engine.Inhume(context.Background(), prm)
require.NoError(t, err)
})
t.Run("inhume without tombstone", func(t *testing.T) {
obj := generateAndPutObject()
var prm InhumePrm
prm.MarkAsGarbage(object.AddressOf(obj))
err := engine.Inhume(context.Background(), prm)
require.NoError(t, err)
})
}
func BenchmarkInhumeMultipart(b *testing.B) {
// The benchmark result insignificantly depends on the number of shards,
// so do not use it as a benchmark parameter, just set it big enough.
numShards := 100
for numObjects := 1; numObjects <= 10000; numObjects *= 10 {
b.Run(
fmt.Sprintf("objects=%d", numObjects),
func(b *testing.B) {
benchmarkInhumeMultipart(b, numShards, numObjects)
},
)
}
}
func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
b.StopTimer()
engine := testNewEngine(b).
setShardsNum(b, numShards).prepare(b).engine
defer func() { require.NoError(b, engine.Close(context.Background())) }()
cnt := cidtest.ID()
eg := errgroup.Group{}
for range b.N {
addrs := make([]oid.Address, numObjects)
for i := range numObjects {
prm := PutPrm{}
prm.Object = objecttest.Object().Parent()
prm.Object.SetContainerID(cnt)
prm.Object.SetType(objectSDK.TypeRegular)
addrs[i] = object.AddressOf(prm.Object)
eg.Go(func() error {
return engine.Put(context.Background(), prm)
})
}
require.NoError(b, eg.Wait())
ts := oidtest.Address()
ts.SetContainer(cnt)
prm := InhumePrm{}
prm.WithTarget(ts, addrs...)
b.StartTimer()
err := engine.Inhume(context.Background(), prm)
require.NoError(b, err)
b.StopTimer()
}
}
func TestInhumeIfObjectDoesntExist(t *testing.T) {
const numShards = 4
engine := testNewEngine(t).setShardsNum(t, numShards).prepare(t).engine
t.Cleanup(func() { require.NoError(t, engine.Close(context.Background())) })
t.Run("object is locked", func(t *testing.T) {
t.Run("inhume without tombstone", func(t *testing.T) {
testInhumeLockedIfObjectDoesntExist(t, engine, false, false)
})
t.Run("inhume with tombstone", func(t *testing.T) {
testInhumeLockedIfObjectDoesntExist(t, engine, true, false)
})
t.Run("force inhume", func(t *testing.T) {
testInhumeLockedIfObjectDoesntExist(t, engine, false, true)
})
})
}
func testInhumeLockedIfObjectDoesntExist(t *testing.T, e *StorageEngine, withTombstone, withForce bool) {
t.Parallel()
object := oidtest.Address()
require.NoError(t, testLockObject(e, object))
err := testInhumeObject(t, e, object, withTombstone, withForce)
if !withForce {
var errLocked *apistatus.ObjectLocked
require.ErrorAs(t, err, &errLocked)
return
}
require.NoError(t, err)
err = testHeadObject(e, object)
if withTombstone {
require.True(t, client.IsErrObjectAlreadyRemoved(err))
} else {
require.True(t, client.IsErrObjectNotFound(err))
}
}
func testLockObject(e *StorageEngine, obj oid.Address) error {
return e.Lock(context.Background(), obj.Container(), oidtest.ID(), []oid.ID{obj.Object()})
}
func testInhumeObject(t testing.TB, e *StorageEngine, obj oid.Address, withTombstone, withForce bool) error {
tombstone := oidtest.Address()
tombstone.SetContainer(obj.Container())
// Due to the tests design it is possible to set both the options,
// however removal with tombstone and force removal are exclusive.
require.False(t, withTombstone && withForce)
var inhumePrm InhumePrm
if withTombstone {
inhumePrm.WithTarget(tombstone, obj)
} else {
inhumePrm.MarkAsGarbage(obj)
}
if withForce {
inhumePrm.WithForceRemoval()
}
return e.Inhume(context.Background(), inhumePrm)
}
func testHeadObject(e *StorageEngine, obj oid.Address) error {
var headPrm HeadPrm
headPrm.WithAddress(obj)
_, err := e.Head(context.Background(), headPrm)
return err
}