frostfs-node/pkg/local_object_storage/engine/lock_test.go
Aleksey Savchuk 432042c534
[#1527] engine: Add tests for handling expired objects on inhume and lock
Currently, it's allowed to inhume or lock an expired object.
Consider the following scenario:

1) An user inhumes or locks an object
2) The object expires
3) GC hasn't yet deleted the object
4) The node loses the associated tombstone or lock
5) Another node replicates tombstone or lock to the first node

In this case, the second node succeeds, which is the desired behavior.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-12-03 12:29:45 +03:00

347 lines
9.1 KiB
Go

package engine
import (
"context"
"strconv"
"testing"
"time"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
type tss struct {
expEpoch uint64
}
func (t tss) IsTombstoneAvailable(ctx context.Context, _ oid.Address, epoch uint64) bool {
return t.expEpoch >= epoch
}
func TestLockUserScenario(t *testing.T) {
t.Parallel()
// Tested user actions:
// 1. stores some object
// 2. locks the object
// 3. tries to inhume the object with tombstone and expects failure
// 4. saves tombstone for LOCK-object and receives error
// 5. waits for an epoch after the lock expiration one
// 6. tries to inhume the object and expects success
const lockerExpiresAfter = 13
cnr := cidtest.ID()
tombObj := testutil.GenerateObjectWithCID(cnr)
tombForLockID := oidtest.ID()
tombObj.SetID(tombForLockID)
testEngine := testNewEngine(t).
setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
return []shard.Option{
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
shard.WithTombstoneSource(tss{lockerExpiresAfter}),
}
}).
prepare(t)
e := testEngine.engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
lockerID := oidtest.ID()
tombID := oidtest.ID()
var err error
var objAddr oid.Address
objAddr.SetContainer(cnr)
var tombAddr oid.Address
tombAddr.SetContainer(cnr)
tombAddr.SetObject(tombID)
var lockerAddr oid.Address
lockerAddr.SetContainer(cnr)
lockerAddr.SetObject(lockerID)
var a objectSDK.Attribute
a.SetKey(objectV2.SysAttributeExpEpoch)
a.SetValue(strconv.Itoa(lockerExpiresAfter))
lockerObj := testutil.GenerateObjectWithCID(cnr)
lockerObj.SetID(lockerID)
lockerObj.SetAttributes(a)
var tombForLockAddr oid.Address
tombForLockAddr.SetContainer(cnr)
tombForLockAddr.SetObject(tombForLockID)
// 1.
obj := testutil.GenerateObjectWithCID(cnr)
id, _ := obj.ID()
objAddr.SetObject(id)
err = Put(context.Background(), e, obj, false)
require.NoError(t, err)
// 2.
var locker objectSDK.Lock
locker.WriteMembers([]oid.ID{id})
objectSDK.WriteLock(lockerObj, locker)
err = Put(context.Background(), e, lockerObj, false)
require.NoError(t, err)
err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id})
require.NoError(t, err)
// 3.
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombAddr, objAddr)
var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
// 4.
tombObj.SetType(objectSDK.TypeTombstone)
tombObj.SetID(tombForLockID)
tombObj.SetAttributes(a)
err = Put(context.Background(), e, tombObj, false)
require.NoError(t, err)
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
_, err = e.Inhume(context.Background(), inhumePrm)
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
// 5.
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
inhumePrm.WithTarget(tombAddr, objAddr)
require.Eventually(t, func() bool {
_, err = e.Inhume(context.Background(), inhumePrm)
return err == nil
}, 30*time.Second, time.Second)
}
func TestLockExpiration(t *testing.T) {
t.Parallel()
// Tested scenario:
// 1. some object is stored
// 2. lock object for it is stored, and the object is locked
// 3. lock expiration epoch is coming
// 4. after some delay the object is not locked anymore
testEngine := testNewEngine(t).
setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
return []shard.Option{
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
}
}).
prepare(t)
e := testEngine.engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
const lockerExpiresAfter = 13
cnr := cidtest.ID()
var err error
// 1.
obj := testutil.GenerateObjectWithCID(cnr)
err = Put(context.Background(), e, obj, false)
require.NoError(t, err)
// 2.
var a objectSDK.Attribute
a.SetKey(objectV2.SysAttributeExpEpoch)
a.SetValue(strconv.Itoa(lockerExpiresAfter))
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
lock.SetAttributes(a)
err = Put(context.Background(), e, lock, false)
require.NoError(t, err)
id, _ := obj.ID()
idLock, _ := lock.ID()
err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id})
require.NoError(t, err)
var inhumePrm InhumePrm
tombAddr := oidtest.Address()
tombAddr.SetContainer(cnr)
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
// 3.
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
// 4.
tombAddr = oidtest.Address()
tombAddr.SetContainer(cnr)
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
require.Eventually(t, func() bool {
_, err = e.Inhume(context.Background(), inhumePrm)
return err == nil
}, 30*time.Second, time.Second)
}
func TestLockForceRemoval(t *testing.T) {
t.Parallel()
// Tested scenario:
// 1. some object is stored
// 2. lock object for it is stored, and the object is locked
// 3. try to remove lock object and get error
// 4. force lock object removal
// 5. the object is not locked anymore
var e *StorageEngine
e = testNewEngine(t).
setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
return []shard.Option{
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
shard.WithDeletedLockCallback(e.processDeletedLocks),
}
}).
prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
cnr := cidtest.ID()
var err error
// 1.
obj := testutil.GenerateObjectWithCID(cnr)
err = Put(context.Background(), e, obj, false)
require.NoError(t, err)
// 2.
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
err = Put(context.Background(), e, lock, false)
require.NoError(t, err)
id, _ := obj.ID()
idLock, _ := lock.ID()
err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id})
require.NoError(t, err)
// 3.
var inhumePrm InhumePrm
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
_, err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
// 4.
var deletePrm DeletePrm
deletePrm.WithAddress(objectcore.AddressOf(lock))
deletePrm.WithForceRemoval()
_, err = e.Delete(context.Background(), deletePrm)
require.NoError(t, err)
// 5.
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
_, err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
}
func TestLockExpiredRegularObject(t *testing.T) {
const currEpoch = 42
const objectExpiresAfter = currEpoch - 1
engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
return []shard.Option{
shard.WithDisabledGC(),
shard.WithMetaBaseOptions(append(
testGetDefaultMetabaseOptions(t),
meta.WithEpochState(epochState{currEpoch}),
)...),
}
}).prepare(t).engine
cnr := cidtest.ID()
object := testutil.GenerateObjectWithCID(cnr)
testutil.AddAttribute(object, objectV2.SysAttributeExpEpoch, strconv.Itoa(objectExpiresAfter))
address := objectcore.AddressOf(object)
var putPrm PutPrm
putPrm.Object = object
require.NoError(t, engine.Put(context.Background(), putPrm))
var getPrm GetPrm
var errNotFound *apistatus.ObjectNotFound
getPrm.WithAddress(address)
_, err := engine.Get(context.Background(), getPrm)
require.ErrorAs(t, err, &errNotFound)
t.Run("lock expired regular object", func(t *testing.T) {
engine.Lock(context.Background(),
address.Container(),
oidtest.ID(),
[]oid.ID{address.Object()},
)
res, err := engine.IsLocked(context.Background(), objectcore.AddressOf(object))
require.NoError(t, err)
require.True(t, res)
})
t.Run("get expired and locked regular object", func(t *testing.T) {
getPrm.WithAddress(objectcore.AddressOf(object))
res, err := engine.Get(context.Background(), getPrm)
require.NoError(t, err)
require.Equal(t, res.Object(), object)
})
}