package engine import ( "context" "math/rand" "strconv" "testing" "time" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" ) type tss struct { expEpoch uint64 } func (t tss) IsTombstoneAvailable(ctx context.Context, _ oid.Address, epoch uint64) bool { return t.expEpoch >= epoch } func TestLockUserScenario(t *testing.T) { t.Parallel() // Tested user actions: // 1. stores some object // 2. locks the object // 3. tries to inhume the object with tombstone and expects failure // 4. saves tombstone for LOCK-object and receives error // 5. waits for an epoch after the lock expiration one // 6. tries to inhume the object and expects success const lockerExpiresAfter = 13 const tombstoneExpiresAfter = 1000 cnr := cidtest.ID() tombObj := testutil.GenerateObjectWithCID(cnr) tombForLockID := oidtest.ID() tombObj.SetID(tombForLockID) testEngine := testNewEngine(t). setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option { return []shard.Option{ shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) require.NoError(t, err) return pool }), shard.WithTombstoneSource(tss{lockerExpiresAfter}), } }). prepare(t) e := testEngine.engine defer func() { require.NoError(t, e.Close(context.Background())) }() lockerID := oidtest.ID() tombID := oidtest.ID() var err error var objAddr oid.Address objAddr.SetContainer(cnr) var tombAddr oid.Address tombAddr.SetContainer(cnr) tombAddr.SetObject(tombID) var lockerAddr oid.Address lockerAddr.SetContainer(cnr) lockerAddr.SetObject(lockerID) var a objectSDK.Attribute a.SetKey(objectV2.SysAttributeExpEpoch) a.SetValue(strconv.Itoa(lockerExpiresAfter)) lockerObj := testutil.GenerateObjectWithCID(cnr) lockerObj.SetID(lockerID) lockerObj.SetAttributes(a) var tombForLockAddr oid.Address tombForLockAddr.SetContainer(cnr) tombForLockAddr.SetObject(tombForLockID) // 1. obj := testutil.GenerateObjectWithCID(cnr) id, _ := obj.ID() objAddr.SetObject(id) err = Put(context.Background(), e, obj, false) require.NoError(t, err) // 2. var locker objectSDK.Lock locker.WriteMembers([]oid.ID{id}) objectSDK.WriteLock(lockerObj, locker) err = Put(context.Background(), e, lockerObj, false) require.NoError(t, err) err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id}, lockerExpiresAfter) require.NoError(t, err) // 3. var inhumePrm InhumePrm inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objAddr) var objLockedErr *apistatus.ObjectLocked _, err = e.Inhume(context.Background(), inhumePrm) require.ErrorAs(t, err, &objLockedErr) // 4. tombObj.SetType(objectSDK.TypeTombstone) tombObj.SetID(tombForLockID) tombObj.SetAttributes(a) err = Put(context.Background(), e, tombObj, false) require.NoError(t, err) inhumePrm.WithTarget(tombForLockAddr, tombstoneExpiresAfter, lockerAddr) _, err = e.Inhume(context.Background(), inhumePrm) require.ErrorIs(t, err, meta.ErrLockObjectRemoval) // 5. e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1) inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objAddr) require.Eventually(t, func() bool { _, err = e.Inhume(context.Background(), inhumePrm) return err == nil }, 30*time.Second, time.Second) } func TestLockExpiration(t *testing.T) { t.Parallel() // Tested scenario: // 1. some object is stored // 2. lock object for it is stored, and the object is locked // 3. lock expiration epoch is coming // 4. after some delay the object is not locked anymore testEngine := testNewEngine(t). setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option { return []shard.Option{ shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) require.NoError(t, err) return pool }), } }). prepare(t) e := testEngine.engine defer func() { require.NoError(t, e.Close(context.Background())) }() const lockerExpiresAfter = 13 const tombstoneExpiresAfter = 1000 cnr := cidtest.ID() var err error // 1. obj := testutil.GenerateObjectWithCID(cnr) err = Put(context.Background(), e, obj, false) require.NoError(t, err) // 2. var a objectSDK.Attribute a.SetKey(objectV2.SysAttributeExpEpoch) a.SetValue(strconv.Itoa(lockerExpiresAfter)) lock := testutil.GenerateObjectWithCID(cnr) lock.SetType(objectSDK.TypeLock) lock.SetAttributes(a) err = Put(context.Background(), e, lock, false) require.NoError(t, err) id, _ := obj.ID() idLock, _ := lock.ID() err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id}, lockerExpiresAfter) require.NoError(t, err) var inhumePrm InhumePrm tombAddr := oidtest.Address() tombAddr.SetContainer(cnr) inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objectcore.AddressOf(obj)) var objLockedErr *apistatus.ObjectLocked _, err = e.Inhume(context.Background(), inhumePrm) require.ErrorAs(t, err, &objLockedErr) // 3. e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1) // 4. tombAddr = oidtest.Address() tombAddr.SetContainer(cnr) inhumePrm.WithTarget(tombAddr, tombstoneExpiresAfter, objectcore.AddressOf(obj)) require.Eventually(t, func() bool { _, err = e.Inhume(context.Background(), inhumePrm) return err == nil }, 30*time.Second, time.Second) } func TestLockForceRemoval(t *testing.T) { t.Parallel() // Tested scenario: // 1. some object is stored // 2. lock object for it is stored, and the object is locked // 3. try to remove lock object and get error // 4. force lock object removal // 5. the object is not locked anymore var e *StorageEngine e = testNewEngine(t). setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option { return []shard.Option{ shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) require.NoError(t, err) return pool }), shard.WithDeletedLockCallback(e.processDeletedLocks), } }). prepare(t).engine defer func() { require.NoError(t, e.Close(context.Background())) }() cnr := cidtest.ID() var err error // 1. obj := testutil.GenerateObjectWithCID(cnr) err = Put(context.Background(), e, obj, false) require.NoError(t, err) // 2. lock := testutil.GenerateObjectWithCID(cnr) lock.SetType(objectSDK.TypeLock) err = Put(context.Background(), e, lock, false) require.NoError(t, err) id, _ := obj.ID() idLock, _ := lock.ID() err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id}, rand.Uint64()) require.NoError(t, err) // 3. var inhumePrm InhumePrm inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj)) var objLockedErr *apistatus.ObjectLocked _, err = e.Inhume(context.Background(), inhumePrm) require.ErrorAs(t, err, &objLockedErr) inhumePrm.WithTarget(oidtest.Address(), rand.Uint64(), objectcore.AddressOf(obj)) _, err = e.Inhume(context.Background(), inhumePrm) require.ErrorAs(t, err, &objLockedErr) // 4. var deletePrm DeletePrm deletePrm.WithAddress(objectcore.AddressOf(lock)) deletePrm.WithForceRemoval() _, err = e.Delete(context.Background(), deletePrm) require.NoError(t, err) // 5. inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj)) _, err = e.Inhume(context.Background(), inhumePrm) require.NoError(t, err) } func TestLockExpiredRegularObject(t *testing.T) { const currEpoch = 42 const objectExpiresAfter = currEpoch - 1 const lockExpiresAfter = currEpoch + 1 engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option { return []shard.Option{ shard.WithDisabledGC(), shard.WithMetaBaseOptions(append( testGetDefaultMetabaseOptions(t), meta.WithEpochState(epochState{currEpoch}), )...), } }).prepare(t).engine cnr := cidtest.ID() object := testutil.GenerateObjectWithCID(cnr) testutil.AddAttribute(object, objectV2.SysAttributeExpEpoch, strconv.Itoa(objectExpiresAfter)) address := objectcore.AddressOf(object) var putPrm PutPrm putPrm.Object = object require.NoError(t, engine.Put(context.Background(), putPrm)) var getPrm GetPrm var errNotFound *apistatus.ObjectNotFound getPrm.WithAddress(address) _, err := engine.Get(context.Background(), getPrm) require.ErrorAs(t, err, &errNotFound) t.Run("lock expired regular object", func(t *testing.T) { engine.Lock(context.Background(), address.Container(), oidtest.ID(), []oid.ID{address.Object()}, lockExpiresAfter, ) res, err := engine.IsLocked(context.Background(), objectcore.AddressOf(object)) require.NoError(t, err) require.True(t, res) }) t.Run("get expired and locked regular object", func(t *testing.T) { getPrm.WithAddress(objectcore.AddressOf(object)) res, err := engine.Get(context.Background(), getPrm) require.NoError(t, err) require.Equal(t, res.Object(), object) }) }