[#1445] engine/test: Add tests for GC handling expired object

Since the GC behavior is changing drastically. These test are needed
to nsure that the GC corretly deletes expired tombstones and graves,
as well as, lock objects and lock. Tests use graves and locks of both
old and new formats.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
Aleksey Savchuk 2024-12-19 13:47:31 +03:00
parent 5296f64df0
commit 5c0d9c1c08
Signed by: a-savchuk
GPG key ID: 70C0A7FF6F9C4639

View file

@ -0,0 +1,181 @@
package engine
import (
"context"
"strconv"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
func testPopulateWithObjects(t testing.TB, engine *StorageEngine, cnt cid.ID, objectCount int) (objects []*objectSDK.Object) {
require.Positive(t, objectCount)
var putPrm PutPrm
for range objectCount {
putPrm.Object = testutil.GenerateObjectWithCID(cnt)
require.NoError(t, engine.Put(context.Background(), putPrm))
objects = append(objects, putPrm.Object)
}
return objects
}
func testInhumeObjects(t testing.TB, engine *StorageEngine, objects []*objectSDK.Object, expEpoch uint64) (tombstone *objectSDK.Object) {
require.NotEmpty(t, objects)
cnt := objectCore.AddressOf(objects[0]).Container()
tombstone = testutil.GenerateObjectWithCID(cnt)
tombstone.SetType(objectSDK.TypeTombstone)
testutil.AddAttribute(tombstone, objectV2.SysAttributeExpEpoch, strconv.FormatUint(expEpoch, 10))
var putPrm PutPrm
putPrm.Object = tombstone
require.NoError(t, engine.Put(context.Background(), putPrm))
var addrs []oid.Address
for _, object := range objects {
addrs = append(addrs, objectCore.AddressOf(object))
}
tombstoneAddr := objectCore.AddressOf(tombstone)
pivot := len(objects) / 2
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneAddr, expEpoch, addrs[:pivot]...)
_, err := engine.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
inhumePrm.WithTarget(tombstoneAddr, meta.NoExpirationEpoch, addrs[pivot:]...)
_, err = engine.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
return
}
func testLockObjects(t testing.TB, engine *StorageEngine, objects []*objectSDK.Object, expEpoch uint64) (lockObject *objectSDK.Object) {
require.NotEmpty(t, objects)
require.NotEmpty(t, objects)
cnt := objectCore.AddressOf(objects[0]).Container()
lockObject = testutil.GenerateObjectWithCID(cnt)
lockObject.SetType(objectSDK.TypeLock)
testutil.AddAttribute(lockObject, objectV2.SysAttributeExpEpoch, strconv.FormatUint(expEpoch, 10))
var putPrm PutPrm
putPrm.Object = lockObject
require.NoError(t, engine.Put(context.Background(), putPrm))
var lockedIDs []oid.ID
for _, object := range objects {
lockedIDs = append(lockedIDs, objectCore.AddressOf(object).Object())
}
lockObjectID := objectCore.AddressOf(lockObject).Object()
pivot := len(objects) / 2
require.NoError(t, engine.Lock(context.Background(), cnt, lockObjectID, lockedIDs[:pivot], expEpoch))
require.NoError(t, engine.Lock(context.Background(), cnt, lockObjectID, lockedIDs[pivot:], meta.NoExpirationEpoch))
return
}
func TestGCHandleExpiredTombstones(t *testing.T) {
t.Parallel()
const (
numShards = 2
objectCount = 10 * numShards
expEpoch = 1
)
container := cidtest.ID()
engine := testNewEngine(t).
setShardsNumAdditionalOpts(t, numShards, func(_ int) []shard.Option {
return []shard.Option{
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
shard.WithTombstoneSource(tss{expEpoch}),
}
}).prepare(t).engine
defer func() { require.NoError(t, engine.Close(context.Background())) }()
objects := testPopulateWithObjects(t, engine, container, objectCount)
tombstone := testInhumeObjects(t, engine, objects, expEpoch)
engine.HandleNewEpoch(context.Background(), expEpoch+1)
var headPrm HeadPrm
require.Eventually(t, func() bool {
for _, object := range objects {
headPrm.WithAddress(objectCore.AddressOf(object))
_, err := engine.Head(context.Background(), headPrm)
if !client.IsErrObjectNotFound(err) {
return false
}
}
headPrm.WithAddress(objectCore.AddressOf(tombstone))
_, err := engine.Head(context.Background(), headPrm)
return client.IsErrObjectNotFound(err)
}, 10*time.Second, 1*time.Second)
}
func TestGCHandleExpiredLockObjects(t *testing.T) {
t.Parallel()
const (
numShards = 2
objectCount = 10 * numShards
expEpoch = 1
)
container := cidtest.ID()
engine := testNewEngine(t).
setShardsNumAdditionalOpts(t, numShards, func(_ int) []shard.Option {
return []shard.Option{
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
}
}).prepare(t).engine
defer func() { require.NoError(t, engine.Close(context.Background())) }()
objects := testPopulateWithObjects(t, engine, container, objectCount)
lockObject := testLockObjects(t, engine, objects, expEpoch)
engine.HandleNewEpoch(context.Background(), expEpoch+1)
var headPrm HeadPrm
require.Eventually(t, func() bool {
for _, object := range objects {
locked, err := engine.IsLocked(context.Background(), objectCore.AddressOf(object))
if err != nil || locked {
return false
}
}
headPrm.WithAddress(objectCore.AddressOf(lockObject))
_, err := engine.Head(context.Background(), headPrm)
return client.IsErrObjectNotFound(err)
}, 10*time.Second, 1*time.Second)
}