frostfs-node/pkg/local_object_storage/engine/lock_test.go

348 lines
9.1 KiB
Go
Raw Normal View History

package engine
import (
"context"
"strconv"
"testing"
"time"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
type tss struct {
expEpoch uint64
}
func (t tss) IsTombstoneAvailable(ctx context.Context, _ oid.Address, epoch uint64) bool {
return t.expEpoch >= epoch
}
func TestLockUserScenario(t *testing.T) {
t.Parallel()
// Tested user actions:
// 1. stores some object
// 2. locks the object
// 3. tries to inhume the object with tombstone and expects failure
// 4. saves tombstone for LOCK-object and receives error
// 5. waits for an epoch after the lock expiration one
// 6. tries to inhume the object and expects success
const lockerExpiresAfter = 13
cnr := cidtest.ID()
tombObj := testutil.GenerateObjectWithCID(cnr)
tombForLockID := oidtest.ID()
tombObj.SetID(tombForLockID)
testEngine := testNewEngine(t).
setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
return []shard.Option{
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
shard.WithTombstoneSource(tss{lockerExpiresAfter}),
}
}).
prepare(t)
e := testEngine.engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
lockerID := oidtest.ID()
tombID := oidtest.ID()
var err error
var objAddr oid.Address
objAddr.SetContainer(cnr)
var tombAddr oid.Address
tombAddr.SetContainer(cnr)
tombAddr.SetObject(tombID)
var lockerAddr oid.Address
lockerAddr.SetContainer(cnr)
lockerAddr.SetObject(lockerID)
var a objectSDK.Attribute
a.SetKey(objectV2.SysAttributeExpEpoch)
a.SetValue(strconv.Itoa(lockerExpiresAfter))
lockerObj := testutil.GenerateObjectWithCID(cnr)
lockerObj.SetID(lockerID)
lockerObj.SetAttributes(a)
var tombForLockAddr oid.Address
tombForLockAddr.SetContainer(cnr)
tombForLockAddr.SetObject(tombForLockID)
// 1.
obj := testutil.GenerateObjectWithCID(cnr)
id, _ := obj.ID()
objAddr.SetObject(id)
err = Put(context.Background(), e, obj, false)
require.NoError(t, err)
// 2.
var locker objectSDK.Lock
locker.WriteMembers([]oid.ID{id})
objectSDK.WriteLock(lockerObj, locker)
err = Put(context.Background(), e, lockerObj, false)
require.NoError(t, err)
err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id})
require.NoError(t, err)
// 3.
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombAddr, objAddr)
var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
// 4.
tombObj.SetType(objectSDK.TypeTombstone)
tombObj.SetID(tombForLockID)
tombObj.SetAttributes(a)
err = Put(context.Background(), e, tombObj, false)
require.NoError(t, err)
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
_, err = e.Inhume(context.Background(), inhumePrm)
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
// 5.
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
inhumePrm.WithTarget(tombAddr, objAddr)
require.Eventually(t, func() bool {
_, err = e.Inhume(context.Background(), inhumePrm)
return err == nil
}, 30*time.Second, time.Second)
}
func TestLockExpiration(t *testing.T) {
t.Parallel()
// Tested scenario:
// 1. some object is stored
// 2. lock object for it is stored, and the object is locked
// 3. lock expiration epoch is coming
// 4. after some delay the object is not locked anymore
testEngine := testNewEngine(t).
setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
return []shard.Option{
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
}
}).
prepare(t)
e := testEngine.engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
const lockerExpiresAfter = 13
cnr := cidtest.ID()
var err error
// 1.
obj := testutil.GenerateObjectWithCID(cnr)
err = Put(context.Background(), e, obj, false)
require.NoError(t, err)
// 2.
var a objectSDK.Attribute
a.SetKey(objectV2.SysAttributeExpEpoch)
a.SetValue(strconv.Itoa(lockerExpiresAfter))
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
lock.SetAttributes(a)
err = Put(context.Background(), e, lock, false)
require.NoError(t, err)
id, _ := obj.ID()
idLock, _ := lock.ID()
err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id})
require.NoError(t, err)
var inhumePrm InhumePrm
tombAddr := oidtest.Address()
tombAddr.SetContainer(cnr)
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
// 3.
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
// 4.
tombAddr = oidtest.Address()
tombAddr.SetContainer(cnr)
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
require.Eventually(t, func() bool {
_, err = e.Inhume(context.Background(), inhumePrm)
return err == nil
}, 30*time.Second, time.Second)
}
func TestLockForceRemoval(t *testing.T) {
t.Parallel()
// Tested scenario:
// 1. some object is stored
// 2. lock object for it is stored, and the object is locked
// 3. try to remove lock object and get error
// 4. force lock object removal
// 5. the object is not locked anymore
var e *StorageEngine
e = testNewEngine(t).
setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
return []shard.Option{
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
shard.WithDeletedLockCallback(e.processDeletedLocks),
}
}).
prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
cnr := cidtest.ID()
var err error
// 1.
obj := testutil.GenerateObjectWithCID(cnr)
err = Put(context.Background(), e, obj, false)
require.NoError(t, err)
// 2.
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
err = Put(context.Background(), e, lock, false)
require.NoError(t, err)
id, _ := obj.ID()
idLock, _ := lock.ID()
err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id})
require.NoError(t, err)
// 3.
var inhumePrm InhumePrm
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
_, err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr)
// 4.
var deletePrm DeletePrm
deletePrm.WithAddress(objectcore.AddressOf(lock))
deletePrm.WithForceRemoval()
_, err = e.Delete(context.Background(), deletePrm)
require.NoError(t, err)
// 5.
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
_, err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
}
func TestLockExpiredRegularObject(t *testing.T) {
const currEpoch = 42
const objectExpiresAfter = currEpoch - 1
engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
return []shard.Option{
shard.WithDisabledGC(),
shard.WithMetaBaseOptions(append(
testGetDefaultMetabaseOptions(t),
meta.WithEpochState(epochState{currEpoch}),
)...),
}
}).prepare(t).engine
cnr := cidtest.ID()
object := testutil.GenerateObjectWithCID(cnr)
testutil.AddAttribute(object, objectV2.SysAttributeExpEpoch, strconv.Itoa(objectExpiresAfter))
address := objectcore.AddressOf(object)
var putPrm PutPrm
putPrm.Object = object
require.NoError(t, engine.Put(context.Background(), putPrm))
var getPrm GetPrm
var errNotFound *apistatus.ObjectNotFound
getPrm.WithAddress(address)
_, err := engine.Get(context.Background(), getPrm)
require.ErrorAs(t, err, &errNotFound)
t.Run("lock expired regular object", func(t *testing.T) {
engine.Lock(context.Background(),
address.Container(),
oidtest.ID(),
[]oid.ID{address.Object()},
)
res, err := engine.IsLocked(context.Background(), objectcore.AddressOf(object))
require.NoError(t, err)
require.True(t, res)
})
t.Run("get expired and locked regular object", func(t *testing.T) {
getPrm.WithAddress(objectcore.AddressOf(object))
res, err := engine.Get(context.Background(), getPrm)
require.NoError(t, err)
require.Equal(t, res.Object(), object)
})
}