[] meta: Add lock tests

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-06-06 15:17:21 +03:00 committed by fyrchik
parent c6cf8e5c0b
commit eaf96bccf7

View file

@ -52,52 +52,150 @@ func TestDB_Lock(t *testing.T) {
}
})
t.Run("removing lock object", func(t *testing.T) {
objs, lockObj := putAndLockObj(t, db, 1)
objAddr := objectcore.AddressOf(objs[0])
lockAddr := objectcore.AddressOf(lockObj)
var inhumePrm meta.InhumePrm
inhumePrm.WithGCMark()
// check locking relation
inhumePrm.WithAddresses(objAddr)
_, err := db.Inhume(inhumePrm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
inhumePrm.WithTombstoneAddress(oidtest.Address())
_, err = db.Inhume(inhumePrm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
// try to remove lock object
inhumePrm.WithAddresses(lockAddr)
_, err = db.Inhume(inhumePrm)
require.Error(t, err)
// check that locking relation has not been
// dropped
inhumePrm.WithAddresses(objAddr)
_, err = db.Inhume(inhumePrm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
inhumePrm.WithTombstoneAddress(oidtest.Address())
_, err = db.Inhume(inhumePrm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
})
t.Run("lock-unlock scenario", func(t *testing.T) {
cnr := cidtest.ID()
objs, lockObj := putAndLockObj(t, db, 1)
obj := generateObjectWithCID(t, cnr)
var err error
err = putBig(db, obj)
require.NoError(t, err)
lockID := oidtest.ID()
id, _ := obj.ID()
// lock the object
err = db.Lock(cnr, lockID, []oid.ID{id})
require.NoError(t, err)
var tombAddr oid.Address
tombAddr.SetContainer(cnr)
tombAddr.SetObject(lockID)
objAddr := objectcore.AddressOf(objs[0])
lockAddr := objectcore.AddressOf(lockObj)
// try to inhume locked object using tombstone
err = meta.Inhume(db, objectcore.AddressOf(obj), tombAddr)
err := meta.Inhume(db, objAddr, lockAddr)
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
// free locked object
var inhumePrm meta.InhumePrm
inhumePrm.WithAddresses(tombAddr)
inhumePrm.WithGCMark()
inhumePrm.WithAddresses(lockAddr)
inhumePrm.WithForceGCMark()
inhumePrm.WithLockObjectHandling()
_, err = db.Inhume(inhumePrm)
res, err := db.Inhume(inhumePrm)
require.NoError(t, err)
var lockAddr oid.Address
lockAddr.SetObject(lockID)
lockAddr.SetContainer(cnr)
require.Len(t, res.DeletedLockObjects(), 1)
require.Equal(t, objectcore.AddressOf(lockObj), res.DeletedLockObjects()[0])
err = db.FreeLockedBy([]oid.Address{lockAddr})
require.NoError(t, err)
inhumePrm.WithAddresses(tombAddr)
inhumePrm.WithAddresses(objAddr)
inhumePrm.WithGCMark()
// now we can inhume the object
_, err = db.Inhume(inhumePrm)
require.NoError(t, err)
})
t.Run("force removing lock objects", func(t *testing.T) {
const objsNum = 3
// put and lock `objsNum` objects
objs, lockObj := putAndLockObj(t, db, objsNum)
// force remove objects
var inhumePrm meta.InhumePrm
inhumePrm.WithForceGCMark()
inhumePrm.WithAddresses(objectcore.AddressOf(lockObj))
inhumePrm.WithLockObjectHandling()
res, err := db.Inhume(inhumePrm)
require.NoError(t, err)
require.Len(t, res.DeletedLockObjects(), 1)
require.Equal(t, objectcore.AddressOf(lockObj), res.DeletedLockObjects()[0])
// unlock just objects that were locked by
// just removed locker
err = db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]})
require.NoError(t, err)
// removing objects after unlock
inhumePrm.WithGCMark()
for i := 0; i < objsNum; i++ {
inhumePrm.WithAddresses(objectcore.AddressOf(objs[i]))
res, err = db.Inhume(inhumePrm)
require.NoError(t, err)
require.Len(t, res.DeletedLockObjects(), 0)
}
})
t.Run("skipping lock object handling", func(t *testing.T) {
_, lockObj := putAndLockObj(t, db, 1)
var inhumePrm meta.InhumePrm
inhumePrm.WithForceGCMark()
inhumePrm.WithAddresses(objectcore.AddressOf(lockObj))
res, err := db.Inhume(inhumePrm)
require.NoError(t, err)
require.Len(t, res.DeletedLockObjects(), 0)
})
}
// putAndLockObj puts object, returns it and its locker.
func putAndLockObj(t *testing.T, db *meta.DB, numOfLockedObjs int) ([]*object.Object, *object.Object) {
cnr := cidtest.ID()
lockedObjs := make([]*object.Object, 0, numOfLockedObjs)
lockedObjIDs := make([]oid.ID, 0, numOfLockedObjs)
for i := 0; i < numOfLockedObjs; i++ {
obj := generateObjectWithCID(t, cnr)
err := putBig(db, obj)
require.NoError(t, err)
id, _ := obj.ID()
lockedObjs = append(lockedObjs, obj)
lockedObjIDs = append(lockedObjIDs, id)
}
lockObj := generateObjectWithCID(t, cnr)
lockID, _ := lockObj.ID()
lockObj.SetType(object.TypeLock)
err := putBig(db, lockObj)
require.NoError(t, err)
err = db.Lock(cnr, lockID, lockedObjIDs)
require.NoError(t, err)
return lockedObjs, lockObj
}