[#1445] local_object_storage: Append expiration epoch to locks
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
17d2dc5341
commit
d70e1d29da
17 changed files with 219 additions and 69 deletions
|
@ -480,8 +480,8 @@ func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Ad
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
|
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID, expEpoch uint64) error {
|
||||||
return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock)
|
return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock, expEpoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, indexedContainer bool) error {
|
func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, indexedContainer bool) error {
|
||||||
|
|
|
@ -256,15 +256,17 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
|
||||||
return locked, outErr
|
return locked, outErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Lock = shard.Lock
|
||||||
|
|
||||||
// GetLocks return lock id's if object is locked according to StorageEngine's state.
|
// GetLocks return lock id's if object is locked according to StorageEngine's state.
|
||||||
func (e *StorageEngine) GetLocks(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
|
func (e *StorageEngine) GetLocks(ctx context.Context, addr oid.Address) ([]Lock, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.GetLocks",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.GetLocks",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", addr.EncodeToString()),
|
attribute.String("address", addr.EncodeToString()),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
var allLocks []oid.ID
|
var allLocks []Lock
|
||||||
var outErr error
|
var outErr error
|
||||||
|
|
||||||
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||||
|
|
|
@ -24,7 +24,7 @@ var errLockFailed = errors.New("lock operation failed")
|
||||||
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
||||||
//
|
//
|
||||||
// Locked list should be unique. Panics if it is empty.
|
// Locked list should be unique. Panics if it is empty.
|
||||||
func (e *StorageEngine) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
func (e *StorageEngine) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID, expEpoch uint64) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Lock",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Lock",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("container_id", idCnr.EncodeToString()),
|
attribute.String("container_id", idCnr.EncodeToString()),
|
||||||
|
@ -35,17 +35,17 @@ func (e *StorageEngine) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
|
||||||
defer elapsed("Lock", e.metrics.AddMethodDuration)()
|
defer elapsed("Lock", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
return e.execIfNotBlocked(func() error {
|
return e.execIfNotBlocked(func() error {
|
||||||
return e.lock(ctx, idCnr, locker, locked)
|
return e.lock(ctx, idCnr, locker, locked, expEpoch)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID, expEpoch uint64) error {
|
||||||
for i := range locked {
|
for i := range locked {
|
||||||
switch e.lockSingle(ctx, idCnr, locker, locked[i], true) {
|
switch e.lockSingle(ctx, idCnr, locker, locked[i], expEpoch, true) {
|
||||||
case 1:
|
case 1:
|
||||||
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
|
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
|
||||||
case 0:
|
case 0:
|
||||||
switch e.lockSingle(ctx, idCnr, locker, locked[i], false) {
|
switch e.lockSingle(ctx, idCnr, locker, locked[i], expEpoch, false) {
|
||||||
case 1:
|
case 1:
|
||||||
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
|
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
|
||||||
case 0:
|
case 0:
|
||||||
|
@ -61,7 +61,7 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
|
||||||
// - 0: fail
|
// - 0: fail
|
||||||
// - 1: locking irregular object
|
// - 1: locking irregular object
|
||||||
// - 2: ok
|
// - 2: ok
|
||||||
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, expEpoch uint64, checkExists bool) (status uint8) {
|
||||||
// code is pretty similar to inhumeAddr, maybe unify?
|
// code is pretty similar to inhumeAddr, maybe unify?
|
||||||
root := false
|
root := false
|
||||||
var addrLocked oid.Address
|
var addrLocked oid.Address
|
||||||
|
@ -95,7 +95,7 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
}
|
}
|
||||||
eclocked = append(eclocked, objID)
|
eclocked = append(eclocked, objID)
|
||||||
}
|
}
|
||||||
err = sh.Lock(ctx, idCnr, locker, eclocked)
|
err = sh.Lock(ctx, idCnr, locker, eclocked, expEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
|
@ -120,7 +120,7 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
|
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked}, expEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
|
|
|
@ -108,7 +108,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
err = Put(context.Background(), e, lockerObj, false)
|
err = Put(context.Background(), e, lockerObj, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id})
|
err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id}, lockerExpiresAfter)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// 3.
|
// 3.
|
||||||
|
@ -194,7 +194,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
idLock, _ := lock.ID()
|
idLock, _ := lock.ID()
|
||||||
|
|
||||||
err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id})
|
err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id}, lockerExpiresAfter)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
|
@ -265,7 +265,7 @@ func TestLockForceRemoval(t *testing.T) {
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
idLock, _ := lock.ID()
|
idLock, _ := lock.ID()
|
||||||
|
|
||||||
err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id})
|
err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id}, rand.Uint64())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// 3.
|
// 3.
|
||||||
|
@ -299,6 +299,7 @@ func TestLockForceRemoval(t *testing.T) {
|
||||||
func TestLockExpiredRegularObject(t *testing.T) {
|
func TestLockExpiredRegularObject(t *testing.T) {
|
||||||
const currEpoch = 42
|
const currEpoch = 42
|
||||||
const objectExpiresAfter = currEpoch - 1
|
const objectExpiresAfter = currEpoch - 1
|
||||||
|
const lockExpiresAfter = currEpoch + 1
|
||||||
|
|
||||||
engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
|
engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
|
||||||
return []shard.Option{
|
return []shard.Option{
|
||||||
|
@ -333,6 +334,7 @@ func TestLockExpiredRegularObject(t *testing.T) {
|
||||||
address.Container(),
|
address.Container(),
|
||||||
oidtest.ID(),
|
oidtest.ID(),
|
||||||
[]oid.ID{address.Object()},
|
[]oid.ID{address.Object()},
|
||||||
|
lockExpiresAfter,
|
||||||
)
|
)
|
||||||
|
|
||||||
res, err := engine.IsLocked(context.Background(), objectcore.AddressOf(object))
|
res, err := engine.IsLocked(context.Background(), objectcore.AddressOf(object))
|
||||||
|
|
|
@ -90,7 +90,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, locker := range lockers {
|
for _, locker := range lockers {
|
||||||
err = e.lock(ctx, addr.Container(), locker, []oid.ID{addr.Object()})
|
err = e.lock(ctx, addr.Container(), locker.ID, []oid.ID{addr.Object()}, locker.ExpEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,7 +112,7 @@ func TestInhumeLocked(t *testing.T) {
|
||||||
|
|
||||||
locked := oidtest.Address()
|
locked := oidtest.Address()
|
||||||
|
|
||||||
err := db.Lock(context.Background(), locked.Container(), oidtest.ID(), []oid.ID{locked.Object()})
|
err := db.Lock(context.Background(), locked.Container(), oidtest.ID(), []oid.ID{locked.Object()}, rand.Uint64())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var prm meta.InhumePrm
|
var prm meta.InhumePrm
|
||||||
|
|
|
@ -20,6 +20,7 @@ func TestDB_IterateExpired(t *testing.T) {
|
||||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||||
|
|
||||||
const epoch = 13
|
const epoch = 13
|
||||||
|
const lockExpiresAfter = 1000
|
||||||
|
|
||||||
mAlive := map[objectSDK.Type]oid.Address{}
|
mAlive := map[objectSDK.Type]oid.Address{}
|
||||||
mExpired := map[objectSDK.Type]oid.Address{}
|
mExpired := map[objectSDK.Type]oid.Address{}
|
||||||
|
@ -35,7 +36,7 @@ func TestDB_IterateExpired(t *testing.T) {
|
||||||
|
|
||||||
expiredLocked := putWithExpiration(t, db, objectSDK.TypeRegular, epoch-1)
|
expiredLocked := putWithExpiration(t, db, objectSDK.TypeRegular, epoch-1)
|
||||||
|
|
||||||
require.NoError(t, db.Lock(context.Background(), expiredLocked.Container(), oidtest.ID(), []oid.ID{expiredLocked.Object()}))
|
require.NoError(t, db.Lock(context.Background(), expiredLocked.Container(), oidtest.ID(), []oid.ID{expiredLocked.Object()}, lockExpiresAfter))
|
||||||
|
|
||||||
err := db.IterateExpired(context.Background(), epoch, func(exp *meta.ExpiredObject) error {
|
err := db.IterateExpired(context.Background(), epoch, func(exp *meta.ExpiredObject) error {
|
||||||
if addr, ok := mAlive[exp.Type()]; ok {
|
if addr, ok := mAlive[exp.Type()]; ok {
|
||||||
|
|
|
@ -36,7 +36,7 @@ func bucketNameLockers(idCnr cid.ID, key []byte) []byte {
|
||||||
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
||||||
//
|
//
|
||||||
// Locked list should be unique. Panics if it is empty.
|
// Locked list should be unique. Panics if it is empty.
|
||||||
func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.ID, expEpoch uint64) error {
|
||||||
var (
|
var (
|
||||||
startedAt = time.Now()
|
startedAt = time.Now()
|
||||||
success = false
|
success = false
|
||||||
|
@ -66,12 +66,12 @@ func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.
|
||||||
panic("empty locked list")
|
panic("empty locked list")
|
||||||
}
|
}
|
||||||
|
|
||||||
err := db.lockInternal(locked, cnr, locker)
|
err := db.lockInternal(locked, cnr, locker, expEpoch)
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
|
func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID, expEpoch uint64) error {
|
||||||
bucketKeysLocked := make([][]byte, len(locked))
|
bucketKeysLocked := make([][]byte, len(locked))
|
||||||
for i := range locked {
|
for i := range locked {
|
||||||
bucketKeysLocked[i] = objectKey(locked[i], make([]byte, objectKeySize))
|
bucketKeysLocked[i] = objectKey(locked[i], make([]byte, objectKeySize))
|
||||||
|
@ -91,29 +91,44 @@ func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
|
||||||
return fmt.Errorf("create container bucket for locked objects %v: %w", cnr, err)
|
return fmt.Errorf("create container bucket for locked objects %v: %w", cnr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
keyLocker := objectKey(locker, key)
|
var newLock lockWithExpEpoch
|
||||||
var exLockers [][]byte
|
newLock.encode(locker, expEpoch)
|
||||||
var updLockers []byte
|
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for i := range bucketKeysLocked {
|
for i := range bucketKeysLocked {
|
||||||
exLockers, err = decodeList(bucketLockedContainer.Get(bucketKeysLocked[i]))
|
locks, err := decodeLockWithExpEpochList(bucketLockedContainer.Get(bucketKeysLocked[i]))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decode list of object lockers: %w", err)
|
return fmt.Errorf("decode list of object lockers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range exLockers {
|
var lockID oid.ID
|
||||||
if bytes.Equal(exLockers[i], keyLocker) {
|
var lockExpEpoch uint64
|
||||||
continue loop
|
var foundLock bool
|
||||||
|
for j := range locks {
|
||||||
|
if locks[j].id == newLock.id {
|
||||||
|
if err := locks[j].decode(&lockID, &lockExpEpoch); err != nil {
|
||||||
|
return fmt.Errorf("decode lock: %w", err)
|
||||||
|
}
|
||||||
|
if lockExpEpoch != NoExpirationEpoch {
|
||||||
|
continue loop
|
||||||
|
}
|
||||||
|
// Add expiration epoch to lock because it doesn't have it.
|
||||||
|
locks[j].encode(lockID, expEpoch)
|
||||||
|
foundLock = true
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
updLockers, err = encodeList(append(exLockers, keyLocker))
|
if !foundLock {
|
||||||
|
locks = append(locks, newLock)
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := encodeLockWithExpEpochList(locks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("encode list of object lockers: %w", err)
|
return fmt.Errorf("encode list of object lockers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = bucketLockedContainer.Put(bucketKeysLocked[i], updLockers)
|
err = bucketLockedContainer.Put(bucketKeysLocked[i], data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("update list of object lockers: %w", err)
|
return fmt.Errorf("update list of object lockers: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -176,24 +191,26 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// return `LOCK` id's if specified object is locked in the specified container.
|
// return `LOCK` id's if specified object is locked in the specified container.
|
||||||
func getLocks(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
|
func getLocks(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]Lock, error) {
|
||||||
var lockers []oid.ID
|
var lockers []Lock
|
||||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||||
if bucketLocked != nil {
|
if bucketLocked != nil {
|
||||||
key := make([]byte, cidSize)
|
key := make([]byte, cidSize)
|
||||||
idCnr.Encode(key)
|
idCnr.Encode(key)
|
||||||
bucketLockedContainer := bucketLocked.Bucket(key)
|
bucketLockedContainer := bucketLocked.Bucket(key)
|
||||||
if bucketLockedContainer != nil {
|
if bucketLockedContainer != nil {
|
||||||
binObjIDs, err := decodeList(bucketLockedContainer.Get(objectKey(idObj, key)))
|
binObjIDs, err := decodeLockWithExpEpochList(bucketLockedContainer.Get(objectKey(idObj, key)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("decode list of object lockers: %w", err)
|
return nil, fmt.Errorf("decode list of object lockers: %w", err)
|
||||||
}
|
}
|
||||||
for _, binObjID := range binObjIDs {
|
for _, binObjID := range binObjIDs {
|
||||||
var id oid.ID
|
var id oid.ID
|
||||||
if err = id.Decode(binObjID); err != nil {
|
var expEpoch uint64
|
||||||
|
|
||||||
|
if err = binObjID.decode(&id, &expEpoch); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
lockers = append(lockers, id)
|
lockers = append(lockers, Lock{ID: id, ExpEpoch: expEpoch})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -224,13 +241,13 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres
|
||||||
keyLocker := objectKey(locker, key)
|
keyLocker := objectKey(locker, key)
|
||||||
updates := make([]keyValue, 0)
|
updates := make([]keyValue, 0)
|
||||||
err := bucketLockedContainer.ForEach(func(k, v []byte) error {
|
err := bucketLockedContainer.ForEach(func(k, v []byte) error {
|
||||||
keyLockers, err := decodeList(v)
|
keyLockers, err := decodeLockWithExpEpochList(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
|
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range keyLockers {
|
for i := range keyLockers {
|
||||||
if bytes.Equal(keyLockers[i], keyLocker) {
|
if bytes.Equal(keyLockers[i].id[:], keyLocker) {
|
||||||
if len(keyLockers) == 1 {
|
if len(keyLockers) == 1 {
|
||||||
updates = append(updates, keyValue{
|
updates = append(updates, keyValue{
|
||||||
Key: k,
|
Key: k,
|
||||||
|
@ -238,8 +255,7 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres
|
||||||
})
|
})
|
||||||
|
|
||||||
var id oid.ID
|
var id oid.ID
|
||||||
err = id.Decode(k)
|
if err := id.Decode(k); err != nil {
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("decode unlocked object id error: %w", err)
|
return fmt.Errorf("decode unlocked object id error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,7 +268,7 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres
|
||||||
// exclude locker
|
// exclude locker
|
||||||
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
|
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
|
||||||
|
|
||||||
v, err = encodeList(keyLockers)
|
v, err = encodeLockWithExpEpochList(keyLockers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("encode updated list of lockers: %w", err)
|
return fmt.Errorf("encode updated list of lockers: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -351,11 +367,16 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Lock struct {
|
||||||
|
ID oid.ID
|
||||||
|
ExpEpoch uint64
|
||||||
|
}
|
||||||
|
|
||||||
// GetLocks return `LOCK` id's if provided object is locked by any `LOCK`. Not found
|
// GetLocks return `LOCK` id's if provided object is locked by any `LOCK`. Not found
|
||||||
// object is considered as non-locked.
|
// object is considered as non-locked.
|
||||||
//
|
//
|
||||||
// Returns only non-logical errors related to underlying database.
|
// Returns only non-logical errors related to underlying database.
|
||||||
func (db *DB) GetLocks(ctx context.Context, addr oid.Address) (res []oid.ID, err error) {
|
func (db *DB) GetLocks(ctx context.Context, addr oid.Address) (res []Lock, err error) {
|
||||||
var (
|
var (
|
||||||
startedAt = time.Now()
|
startedAt = time.Now()
|
||||||
success = false
|
success = false
|
||||||
|
|
|
@ -25,8 +25,8 @@ func TestDB_Lock(t *testing.T) {
|
||||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||||
|
|
||||||
t.Run("empty locked list", func(t *testing.T) {
|
t.Run("empty locked list", func(t *testing.T) {
|
||||||
require.Panics(t, func() { _ = db.Lock(context.Background(), cnr, oid.ID{}, nil) })
|
require.Panics(t, func() { _ = db.Lock(context.Background(), cnr, oid.ID{}, nil, rand.Uint64()) })
|
||||||
require.Panics(t, func() { _ = db.Lock(context.Background(), cnr, oid.ID{}, []oid.ID{}) })
|
require.Panics(t, func() { _ = db.Lock(context.Background(), cnr, oid.ID{}, []oid.ID{}, rand.Uint64()) })
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("(ir)regular", func(t *testing.T) {
|
t.Run("(ir)regular", func(t *testing.T) {
|
||||||
|
@ -48,7 +48,7 @@ func TestDB_Lock(t *testing.T) {
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
|
|
||||||
// try to lock it
|
// try to lock it
|
||||||
err = db.Lock(context.Background(), cnr, oidtest.ID(), []oid.ID{id})
|
err = db.Lock(context.Background(), cnr, oidtest.ID(), []oid.ID{id}, rand.Uint64())
|
||||||
if typ == objectSDK.TypeRegular {
|
if typ == objectSDK.TypeRegular {
|
||||||
require.NoError(t, err, typ)
|
require.NoError(t, err, typ)
|
||||||
} else {
|
} else {
|
||||||
|
@ -185,21 +185,26 @@ func TestDB_Lock(t *testing.T) {
|
||||||
func TestDB_Lock_Expired(t *testing.T) {
|
func TestDB_Lock_Expired(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
es := &epochState{e: 123}
|
const currEpoch = 123
|
||||||
|
const objectExpiresAfter = currEpoch + 1
|
||||||
|
const lockExpiresAfter = currEpoch + 1000
|
||||||
|
const nextEpoch = objectExpiresAfter + 1
|
||||||
|
|
||||||
|
es := &epochState{e: currEpoch}
|
||||||
|
|
||||||
db := newDB(t, meta.WithEpochState(es))
|
db := newDB(t, meta.WithEpochState(es))
|
||||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||||
|
|
||||||
// put an object
|
// put an object
|
||||||
addr := putWithExpiration(t, db, objectSDK.TypeRegular, 124)
|
addr := putWithExpiration(t, db, objectSDK.TypeRegular, objectExpiresAfter)
|
||||||
|
|
||||||
// expire the obj
|
// expire the obj
|
||||||
es.e = 125
|
es.e = nextEpoch
|
||||||
_, err := metaGet(db, addr, false)
|
_, err := metaGet(db, addr, false)
|
||||||
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
||||||
|
|
||||||
// lock the obj
|
// lock the obj
|
||||||
require.NoError(t, db.Lock(context.Background(), addr.Container(), oidtest.ID(), []oid.ID{addr.Object()}))
|
require.NoError(t, db.Lock(context.Background(), addr.Container(), oidtest.ID(), []oid.ID{addr.Object()}, lockExpiresAfter))
|
||||||
|
|
||||||
// object is expired but locked, thus, must be available
|
// object is expired but locked, thus, must be available
|
||||||
_, err = metaGet(db, addr, false)
|
_, err = metaGet(db, addr, false)
|
||||||
|
@ -278,7 +283,7 @@ func putAndLockObj(t *testing.T, db *meta.DB, numOfLockedObjs int) ([]*objectSDK
|
||||||
err := putBig(db, lockObj)
|
err := putBig(db, lockObj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = db.Lock(context.Background(), cnr, lockID, lockedObjIDs)
|
err = db.Lock(context.Background(), cnr, lockID, lockedObjIDs, rand.Uint64())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
return lockedObjs, lockObj
|
return lockedObjs, lockObj
|
||||||
|
|
|
@ -363,3 +363,92 @@ func decodeTombstoneWithExpEpoch(addr *oid.Address, expEpoch *uint64, src []byte
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// lockWithExpEpoch contains the ID and expiration epoch of the lock.
|
||||||
|
type lockWithExpEpoch struct {
|
||||||
|
id [objectKeySize]byte
|
||||||
|
expEpoch [epochSize]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// decode decodes the ID and expiration epoch of the lock.
|
||||||
|
//
|
||||||
|
// If the lock has no expiration epoch, uses [NoExpirationEpoch] instead.
|
||||||
|
func (l lockWithExpEpoch) decode(id *oid.ID, expEpoch *uint64) error {
|
||||||
|
if err := id.Decode(l.id[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*expEpoch = binary.LittleEndian.Uint64(l.expEpoch[:])
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// encode encodes the ID and expiration epoch of the lock.
|
||||||
|
func (l *lockWithExpEpoch) encode(id oid.ID, expEpoch uint64) {
|
||||||
|
id.Encode(l.id[:])
|
||||||
|
binary.LittleEndian.PutUint64(l.expEpoch[:], expEpoch)
|
||||||
|
}
|
||||||
|
|
||||||
|
// decodeLockWithExpEpochList decodes the lock list encoded with
|
||||||
|
// [encodeLockWithExpEpochList].
|
||||||
|
//
|
||||||
|
// If some locks have no expiration epoch, uses [NoExpirationEpoch] instead.
|
||||||
|
func decodeLockWithExpEpochList(data []byte) (locks []lockWithExpEpoch, err error) {
|
||||||
|
xs, err := decodeList(data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't decode list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var id []byte
|
||||||
|
var idMatched bool
|
||||||
|
|
||||||
|
for _, x := range xs {
|
||||||
|
switch size := len(x); {
|
||||||
|
case size == objectKeySize && !idMatched:
|
||||||
|
id = x
|
||||||
|
idMatched = true
|
||||||
|
case size == objectKeySize && idMatched:
|
||||||
|
var lock lockWithExpEpoch
|
||||||
|
copy(lock.id[:], id)
|
||||||
|
locks = append(locks, lock)
|
||||||
|
id = x
|
||||||
|
case size == epochSize && idMatched:
|
||||||
|
var lock lockWithExpEpoch
|
||||||
|
copy(lock.id[:], id)
|
||||||
|
copy(lock.expEpoch[:], x)
|
||||||
|
locks = append(locks, lock)
|
||||||
|
idMatched = false
|
||||||
|
case size == epochSize && !idMatched:
|
||||||
|
return nil, errors.New("found expiration epoch but expected lock")
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unexpected list element size %d", size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if idMatched {
|
||||||
|
var lock lockWithExpEpoch
|
||||||
|
copy(lock.id[:], id)
|
||||||
|
locks = append(locks, lock)
|
||||||
|
}
|
||||||
|
|
||||||
|
return locks, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// encodeLockWithExpEpochList encodes the lock list.
|
||||||
|
//
|
||||||
|
// If some locks have [NoExpirationEpoch], encodes only their IDs.
|
||||||
|
func encodeLockWithExpEpochList(locks []lockWithExpEpoch) (data []byte, err error) {
|
||||||
|
var noEpoch [epochSize]byte
|
||||||
|
|
||||||
|
var xs [][]byte
|
||||||
|
|
||||||
|
for _, lock := range locks {
|
||||||
|
xs = append(xs, lock.id[:])
|
||||||
|
if lock.expEpoch != noEpoch {
|
||||||
|
xs = append(xs, lock.expEpoch[:])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if data, err = encodeList(xs); err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't encode list: %w", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
@ -316,6 +316,14 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error {
|
func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
has, expEpoch, err := object.GetExpirationEpoch(obj)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get lock %s expiration epoch: %w", object.AddressOf(obj), err)
|
||||||
|
}
|
||||||
|
if !has {
|
||||||
|
return fmt.Errorf("lock %s has no expiration epoch", object.AddressOf(obj))
|
||||||
|
}
|
||||||
|
|
||||||
var lock objectSDK.Lock
|
var lock objectSDK.Lock
|
||||||
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
||||||
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
||||||
|
@ -326,7 +334,7 @@ func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) err
|
||||||
|
|
||||||
cnr, _ := obj.ContainerID()
|
cnr, _ := obj.ContainerID()
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
err := s.metaBase.Lock(ctx, cnr, id, locked)
|
err = s.metaBase.Lock(ctx, cnr, id, locked, expEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not lock objects: %w", err)
|
return fmt.Errorf("could not lock objects: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -361,6 +369,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
||||||
|
|
||||||
var inhumePrm meta.InhumePrm
|
var inhumePrm meta.InhumePrm
|
||||||
|
|
||||||
|
inhumePrm.SetTombstoneAddress(tombAddr, expEpoch)
|
||||||
inhumePrm.SetTombstoneAddress(tombAddr, expEpoch)
|
inhumePrm.SetTombstoneAddress(tombAddr, expEpoch)
|
||||||
inhumePrm.SetAddresses(tombMembers...)
|
inhumePrm.SetAddresses(tombMembers...)
|
||||||
|
|
||||||
|
|
|
@ -294,7 +294,7 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
lockID, _ := lockObj.ID()
|
lockID, _ := lockObj.ID()
|
||||||
require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked))
|
require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked, uint64(expirationEpoch)))
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.SetTarget(object.AddressOf(tombObj), uint64(expirationEpoch), tombMembers...)
|
inhumePrm.SetTarget(object.AddressOf(tombObj), uint64(expirationEpoch), tombMembers...)
|
||||||
|
|
|
@ -3,6 +3,7 @@ package shard
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
@ -24,8 +25,13 @@ import (
|
||||||
func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
const currEpoch = 100
|
||||||
|
const objectExpiresAfter = 101
|
||||||
|
const lockExpiresAfter = 103
|
||||||
|
const nextEpoch = 105
|
||||||
|
|
||||||
epoch := &epochState{
|
epoch := &epochState{
|
||||||
Value: 100,
|
Value: currEpoch,
|
||||||
}
|
}
|
||||||
|
|
||||||
sh := newCustomShard(t, false, shardOptions{
|
sh := newCustomShard(t, false, shardOptions{
|
||||||
|
@ -40,7 +46,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
|
|
||||||
var objExpirationAttr objectSDK.Attribute
|
var objExpirationAttr objectSDK.Attribute
|
||||||
objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
||||||
objExpirationAttr.SetValue("101")
|
objExpirationAttr.SetValue(strconv.Itoa(objectExpiresAfter))
|
||||||
|
|
||||||
obj := testutil.GenerateObjectWithCID(cnr)
|
obj := testutil.GenerateObjectWithCID(cnr)
|
||||||
obj.SetAttributes(objExpirationAttr)
|
obj.SetAttributes(objExpirationAttr)
|
||||||
|
@ -48,7 +54,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
|
|
||||||
var lockExpirationAttr objectSDK.Attribute
|
var lockExpirationAttr objectSDK.Attribute
|
||||||
lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
||||||
lockExpirationAttr.SetValue("103")
|
lockExpirationAttr.SetValue(strconv.Itoa(lockExpiresAfter))
|
||||||
|
|
||||||
lock := testutil.GenerateObjectWithCID(cnr)
|
lock := testutil.GenerateObjectWithCID(cnr)
|
||||||
lock.SetType(objectSDK.TypeLock)
|
lock.SetType(objectSDK.TypeLock)
|
||||||
|
@ -61,14 +67,14 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
|
err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID}, lockExpiresAfter)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
putPrm.SetObject(lock)
|
putPrm.SetObject(lock)
|
||||||
_, err = sh.Put(context.Background(), putPrm)
|
_, err = sh.Put(context.Background(), putPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
epoch.Value = 105
|
epoch.Value = nextEpoch
|
||||||
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
|
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
|
||||||
|
|
||||||
var getPrm GetPrm
|
var getPrm GetPrm
|
||||||
|
@ -80,8 +86,13 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
const currEpoch = 100
|
||||||
|
const objectExpiresAfter = 101
|
||||||
|
const lockExpiresAfter = 103
|
||||||
|
const nextEpoch = 105
|
||||||
|
|
||||||
epoch := &epochState{
|
epoch := &epochState{
|
||||||
Value: 100,
|
Value: currEpoch,
|
||||||
}
|
}
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
@ -90,11 +101,11 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
|
|
||||||
var objExpirationAttr objectSDK.Attribute
|
var objExpirationAttr objectSDK.Attribute
|
||||||
objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
||||||
objExpirationAttr.SetValue("101")
|
objExpirationAttr.SetValue(strconv.Itoa(objectExpiresAfter))
|
||||||
|
|
||||||
var lockExpirationAttr objectSDK.Attribute
|
var lockExpirationAttr objectSDK.Attribute
|
||||||
lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
||||||
lockExpirationAttr.SetValue("103")
|
lockExpirationAttr.SetValue(strconv.Itoa(lockExpiresAfter))
|
||||||
|
|
||||||
parent := testutil.GenerateObjectWithCID(cnr)
|
parent := testutil.GenerateObjectWithCID(cnr)
|
||||||
parent.SetID(parentID)
|
parent.SetID(parentID)
|
||||||
|
@ -150,7 +161,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = sh.Lock(context.Background(), cnr, lockID, append(childIDs, parentID, linkID))
|
err = sh.Lock(context.Background(), cnr, lockID, append(childIDs, parentID, linkID), lockExpiresAfter)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
putPrm.SetObject(lock)
|
putPrm.SetObject(lock)
|
||||||
|
@ -164,7 +175,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
var splitInfoError *objectSDK.SplitInfoError
|
var splitInfoError *objectSDK.SplitInfoError
|
||||||
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
|
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
|
||||||
|
|
||||||
epoch.Value = 105
|
epoch.Value = nextEpoch
|
||||||
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
|
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
|
||||||
|
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
|
|
|
@ -18,7 +18,7 @@ import (
|
||||||
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
||||||
//
|
//
|
||||||
// Locked list should be unique. Panics if it is empty.
|
// Locked list should be unique. Panics if it is empty.
|
||||||
func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID, expEpoch uint64) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Lock",
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Lock",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("shard_id", s.ID().String()),
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
@ -38,7 +38,7 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []
|
||||||
return ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.metaBase.Lock(ctx, idCnr, locker, locked)
|
err := s.metaBase.Lock(ctx, idCnr, locker, locked, expEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("metabase lock: %w", err)
|
return fmt.Errorf("metabase lock: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -72,9 +72,11 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
|
||||||
return res.Locked(), nil
|
return res.Locked(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Lock = meta.Lock
|
||||||
|
|
||||||
// GetLocks return lock id's of the provided object. Not found object is
|
// GetLocks return lock id's of the provided object. Not found object is
|
||||||
// considered as not locked. Requires healthy metabase, returns ErrDegradedMode otherwise.
|
// considered as not locked. Requires healthy metabase, returns ErrDegradedMode otherwise.
|
||||||
func (s *Shard) GetLocks(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
|
func (s *Shard) GetLocks(ctx context.Context, addr oid.Address) ([]Lock, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetLocks",
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetLocks",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("shard_id", s.ID().String()),
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
|
|
@ -83,7 +83,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
|
|
||||||
// lock the object
|
// lock the object
|
||||||
|
|
||||||
err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
|
err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID}, rand.Uint64())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
putPrm.SetObject(lock)
|
putPrm.SetObject(lock)
|
||||||
|
@ -175,7 +175,7 @@ func TestShard_IsLocked(t *testing.T) {
|
||||||
|
|
||||||
// locked object is locked
|
// locked object is locked
|
||||||
|
|
||||||
require.NoError(t, sh.Lock(context.Background(), cnrID, lockID, []oid.ID{objID}))
|
require.NoError(t, sh.Lock(context.Background(), cnrID, lockID, []oid.ID{objID}, rand.Uint64()))
|
||||||
|
|
||||||
locked, err = sh.IsLocked(context.Background(), objectcore.AddressOf(obj))
|
locked, err = sh.IsLocked(context.Background(), objectcore.AddressOf(obj))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -22,7 +22,7 @@ type ObjectStorage interface {
|
||||||
Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID, expEpoch uint64) error
|
Delete(ctx context.Context, tombstone oid.Address, toDelete []oid.ID, expEpoch uint64) error
|
||||||
// Lock must lock passed objects
|
// Lock must lock passed objects
|
||||||
// and return any appeared error.
|
// and return any appeared error.
|
||||||
Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error
|
Lock(ctx context.Context, locker oid.Address, toLock []oid.ID, expEpoch uint64) error
|
||||||
// IsLocked must clarify object's lock status.
|
// IsLocked must clarify object's lock status.
|
||||||
IsLocked(context.Context, oid.Address) (bool, error)
|
IsLocked(context.Context, oid.Address) (bool, error)
|
||||||
}
|
}
|
||||||
|
@ -52,7 +52,15 @@ func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met
|
||||||
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
|
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
|
||||||
}
|
}
|
||||||
case objectSDK.TypeLock:
|
case objectSDK.TypeLock:
|
||||||
err := t.Storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects())
|
has, expEpoch, err := objectCore.GetExpirationEpoch(obj)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get expiration epoch: %w", err)
|
||||||
|
}
|
||||||
|
if !has {
|
||||||
|
return errors.New("lock has no expiration epoch")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = t.Storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects(), expEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not lock object from lock objects locally: %w", err)
|
return fmt.Errorf("could not lock object from lock objects locally: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -253,7 +253,7 @@ func PopulateLocked(
|
||||||
lockerOID, _ := locker.ID()
|
lockerOID, _ := locker.ID()
|
||||||
|
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
if err := db.Lock(ctx, lockerCID, lockerOID, []oid.ID{id}); err != nil {
|
if err := db.Lock(ctx, lockerCID, lockerOID, []oid.ID{id}, rand.Uint64()); err != nil {
|
||||||
return fmt.Errorf("couldn't lock an object: %w", err)
|
return fmt.Errorf("couldn't lock an object: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in a new issue