Use LockPrm
and LockRes
to store Lock operation parameters and results #1556
14 changed files with 198 additions and 50 deletions
|
@ -481,7 +481,12 @@ func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Ad
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
|
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
|
||||||
return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock)
|
var prm engine.LockPrm
|
||||||
|
prm.WithContainer(locker.Container())
|
||||||
|
prm.WithTarget(locker.Object(), toLock...)
|
||||||
|
|
||||||
|
_, err := e.engine.Lock(ctx, prm)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, indexedContainer bool) error {
|
func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, indexedContainer bool) error {
|
||||||
|
|
|
@ -153,6 +153,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||||
)...),
|
)...),
|
||||||
}
|
}
|
||||||
}).prepare(t).engine
|
}).prepare(t).engine
|
||||||
|
defer func() { require.NoError(t, engine.Close(context.Background())) }()
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,28 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// LockPrm contains parameters for Lock operation.
|
||||||
|
type LockPrm struct {
|
||||||
|
cnt cid.ID
|
||||||
|
lock oid.ID
|
||||||
|
locked []oid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithContainer sets the container ID for both the lock and locked objects
|
||||||
|
// since they must belong to the same container.
|
||||||
|
func (p *LockPrm) WithContainer(cnt cid.ID) {
|
||||||
|
p.cnt = cnt
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithTarget sets lock object ID and IDs of objects to be locked.
|
||||||
|
func (p *LockPrm) WithTarget(lock oid.ID, locked ...oid.ID) {
|
||||||
|
p.lock = lock
|
||||||
|
p.locked = locked
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockPrm contains results for Lock operation.
|
||||||
|
type LockRes struct{}
|
||||||
|
|
||||||
var errLockFailed = errors.New("lock operation failed")
|
var errLockFailed = errors.New("lock operation failed")
|
||||||
|
|
||||||
// Lock marks objects as locked with another object. All objects from the
|
// Lock marks objects as locked with another object. All objects from the
|
||||||
|
@ -24,18 +46,18 @@ var errLockFailed = errors.New("lock operation failed")
|
||||||
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
||||||
//
|
//
|
||||||
// Locked list should be unique. Panics if it is empty.
|
// Locked list should be unique. Panics if it is empty.
|
||||||
func (e *StorageEngine) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
func (e *StorageEngine) Lock(ctx context.Context, prm LockPrm) (LockRes, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Lock",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Lock",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("container_id", idCnr.EncodeToString()),
|
attribute.String("container_id", prm.cnt.EncodeToString()),
|
||||||
attribute.String("locker", locker.EncodeToString()),
|
attribute.String("locker", prm.lock.EncodeToString()),
|
||||||
attribute.Int("locked_count", len(locked)),
|
attribute.Int("locked_count", len(prm.locked)),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
defer elapsed("Lock", e.metrics.AddMethodDuration)()
|
defer elapsed("Lock", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
return e.execIfNotBlocked(func() error {
|
return LockRes{}, e.execIfNotBlocked(func() error {
|
||||||
return e.lock(ctx, idCnr, locker, locked)
|
return e.lock(ctx, prm.cnt, prm.lock, prm.locked)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,6 +84,9 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
|
||||||
// - 1: locking irregular object
|
// - 1: locking irregular object
|
||||||
// - 2: ok
|
// - 2: ok
|
||||||
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
||||||
|
var lockPrm shard.LockPrm
|
||||||
|
lockPrm.SetContainer(idCnr)
|
||||||
|
|
||||||
// code is pretty similar to inhumeAddr, maybe unify?
|
// code is pretty similar to inhumeAddr, maybe unify?
|
||||||
root := false
|
root := false
|
||||||
var addrLocked oid.Address
|
var addrLocked oid.Address
|
||||||
|
@ -95,7 +120,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
}
|
}
|
||||||
eclocked = append(eclocked, objID)
|
eclocked = append(eclocked, objID)
|
||||||
}
|
}
|
||||||
err = sh.Lock(ctx, idCnr, locker, eclocked)
|
lockPrm.SetTarget(locker, eclocked...)
|
||||||
|
_, err = sh.Lock(ctx, lockPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
|
@ -120,7 +146,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
|
lockPrm.SetTarget(locker, locked)
|
||||||
|
_, err := sh.Lock(ctx, lockPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
|
|
|
@ -106,7 +106,11 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
err = Put(context.Background(), e, lockerObj, false)
|
err = Put(context.Background(), e, lockerObj, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = e.Lock(context.Background(), cnr, lockerID, []oid.ID{id})
|
var lockPrm LockPrm
|
||||||
|
lockPrm.WithContainer(cnr)
|
||||||
|
lockPrm.WithTarget(lockerID, id)
|
||||||
|
|
||||||
|
_, err = e.Lock(context.Background(), lockPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// 3.
|
// 3.
|
||||||
|
@ -191,7 +195,11 @@ func TestLockExpiration(t *testing.T) {
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
idLock, _ := lock.ID()
|
idLock, _ := lock.ID()
|
||||||
|
|
||||||
err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id})
|
var lockPrm LockPrm
|
||||||
|
lockPrm.WithContainer(cnr)
|
||||||
|
lockPrm.WithTarget(idLock, id)
|
||||||
|
|
||||||
|
_, err = e.Lock(context.Background(), lockPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
|
@ -262,7 +270,11 @@ func TestLockForceRemoval(t *testing.T) {
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
idLock, _ := lock.ID()
|
idLock, _ := lock.ID()
|
||||||
|
|
||||||
err = e.Lock(context.Background(), cnr, idLock, []oid.ID{id})
|
var lockPrm LockPrm
|
||||||
|
lockPrm.WithContainer(cnr)
|
||||||
|
lockPrm.WithTarget(idLock, id)
|
||||||
|
|
||||||
|
_, err = e.Lock(context.Background(), lockPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// 3.
|
// 3.
|
||||||
|
@ -306,6 +318,7 @@ func TestLockExpiredRegularObject(t *testing.T) {
|
||||||
)...),
|
)...),
|
||||||
}
|
}
|
||||||
}).prepare(t).engine
|
}).prepare(t).engine
|
||||||
|
defer func() { require.NoError(t, engine.Close(context.Background())) }()
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
|
||||||
|
@ -326,11 +339,12 @@ func TestLockExpiredRegularObject(t *testing.T) {
|
||||||
require.ErrorAs(t, err, &errNotFound)
|
require.ErrorAs(t, err, &errNotFound)
|
||||||
|
|
||||||
t.Run("lock expired regular object", func(t *testing.T) {
|
t.Run("lock expired regular object", func(t *testing.T) {
|
||||||
engine.Lock(context.Background(),
|
var lockPrm LockPrm
|
||||||
address.Container(),
|
lockPrm.WithContainer(address.Container())
|
||||||
oidtest.ID(),
|
lockPrm.WithTarget(oidtest.ID(), address.Object())
|
||||||
[]oid.ID{address.Object()},
|
|
||||||
)
|
_, err := engine.Lock(context.Background(), lockPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
res, err := engine.IsLocked(context.Background(), objectcore.AddressOf(object))
|
res, err := engine.IsLocked(context.Background(), objectcore.AddressOf(object))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -111,7 +111,11 @@ func TestInhumeLocked(t *testing.T) {
|
||||||
|
|
||||||
locked := oidtest.Address()
|
locked := oidtest.Address()
|
||||||
|
|
||||||
err := db.Lock(context.Background(), locked.Container(), oidtest.ID(), []oid.ID{locked.Object()})
|
var lockPrm meta.LockPrm
|
||||||
|
lockPrm.SetContainer(locked.Container())
|
||||||
|
lockPrm.SetTarget(oidtest.ID(), locked.Object())
|
||||||
|
|
||||||
|
_, err := db.Lock(context.Background(), lockPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var prm meta.InhumePrm
|
var prm meta.InhumePrm
|
||||||
|
|
|
@ -35,9 +35,14 @@ func TestDB_IterateExpired(t *testing.T) {
|
||||||
|
|
||||||
expiredLocked := putWithExpiration(t, db, objectSDK.TypeRegular, epoch-1)
|
expiredLocked := putWithExpiration(t, db, objectSDK.TypeRegular, epoch-1)
|
||||||
|
|
||||||
require.NoError(t, db.Lock(context.Background(), expiredLocked.Container(), oidtest.ID(), []oid.ID{expiredLocked.Object()}))
|
var lockPrm meta.LockPrm
|
||||||
|
lockPrm.SetContainer(expiredLocked.Container())
|
||||||
|
lockPrm.SetTarget(oidtest.ID(), expiredLocked.Object())
|
||||||
|
|
||||||
err := db.IterateExpired(context.Background(), epoch, func(exp *meta.ExpiredObject) error {
|
_, err := db.Lock(context.Background(), lockPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.IterateExpired(context.Background(), epoch, func(exp *meta.ExpiredObject) error {
|
||||||
if addr, ok := mAlive[exp.Type()]; ok {
|
if addr, ok := mAlive[exp.Type()]; ok {
|
||||||
require.NotEqual(t, addr, exp.Address())
|
require.NotEqual(t, addr, exp.Address())
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,13 +30,35 @@ func bucketNameLockers(idCnr cid.ID, key []byte) []byte {
|
||||||
return bucketName(idCnr, lockersPrefix, key)
|
return bucketName(idCnr, lockersPrefix, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LockPrm contains parameters for Lock operation.
|
||||||
|
type LockPrm struct {
|
||||||
|
cnt cid.ID
|
||||||
|
lock oid.ID
|
||||||
|
locked []oid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetContainer sets the container ID for both the lock and locked objects
|
||||||
|
// since they must belong to the same container.
|
||||||
|
func (p *LockPrm) SetContainer(cnt cid.ID) {
|
||||||
|
p.cnt = cnt
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetTarget sets lock object ID and IDs of objects to be locked.
|
||||||
|
func (p *LockPrm) SetTarget(lock oid.ID, locked ...oid.ID) {
|
||||||
|
p.lock = lock
|
||||||
|
p.locked = locked
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockPrm contains results for Lock operation.
|
||||||
|
type LockRes struct{}
|
||||||
|
|
||||||
// Lock marks objects as locked with another object. All objects are from the
|
// Lock marks objects as locked with another object. All objects are from the
|
||||||
// specified container.
|
// specified container.
|
||||||
//
|
//
|
||||||
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
||||||
//
|
//
|
||||||
// Locked list should be unique. Panics if it is empty.
|
// Locked list should be unique. Panics if it is empty.
|
||||||
func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
func (db *DB) Lock(ctx context.Context, prm LockPrm) (LockRes, error) {
|
||||||
var (
|
var (
|
||||||
startedAt = time.Now()
|
startedAt = time.Now()
|
||||||
success = false
|
success = false
|
||||||
|
@ -47,9 +69,9 @@ func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.
|
||||||
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.Lock",
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.Lock",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("container_id", cnr.EncodeToString()),
|
attribute.String("container_id", prm.cnt.EncodeToString()),
|
||||||
attribute.String("locker", locker.EncodeToString()),
|
attribute.String("locker", prm.lock.EncodeToString()),
|
||||||
attribute.Int("locked_count", len(locked)),
|
attribute.Int("locked_count", len(prm.locked)),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -57,18 +79,18 @@ func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
if db.mode.NoMetabase() {
|
if db.mode.NoMetabase() {
|
||||||
return ErrDegradedMode
|
return LockRes{}, ErrDegradedMode
|
||||||
} else if db.mode.ReadOnly() {
|
} else if db.mode.ReadOnly() {
|
||||||
return ErrReadOnlyMode
|
return LockRes{}, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(locked) == 0 {
|
if len(prm.locked) == 0 {
|
||||||
panic("empty locked list")
|
panic("empty locked list")
|
||||||
}
|
}
|
||||||
|
|
||||||
err := db.lockInternal(locked, cnr, locker)
|
err := db.lockInternal(prm.locked, prm.cnt, prm.lock)
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return err
|
return LockRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
|
func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
|
||||||
|
|
|
@ -23,9 +23,12 @@ func TestDB_Lock(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||||
|
|
||||||
|
var lockPrm meta.LockPrm
|
||||||
|
lockPrm.SetContainer(cnr)
|
||||||
|
|
||||||
t.Run("empty locked list", func(t *testing.T) {
|
t.Run("empty locked list", func(t *testing.T) {
|
||||||
require.Panics(t, func() { _ = db.Lock(context.Background(), cnr, oid.ID{}, nil) })
|
lockPrm.SetTarget(oid.ID{})
|
||||||
require.Panics(t, func() { _ = db.Lock(context.Background(), cnr, oid.ID{}, []oid.ID{}) })
|
require.Panics(t, func() { _, _ = db.Lock(context.Background(), lockPrm) })
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("(ir)regular", func(t *testing.T) {
|
t.Run("(ir)regular", func(t *testing.T) {
|
||||||
|
@ -47,7 +50,8 @@ func TestDB_Lock(t *testing.T) {
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
|
|
||||||
// try to lock it
|
// try to lock it
|
||||||
err = db.Lock(context.Background(), cnr, oidtest.ID(), []oid.ID{id})
|
lockPrm.SetTarget(oidtest.ID(), id)
|
||||||
|
_, err = db.Lock(context.Background(), lockPrm)
|
||||||
if typ == objectSDK.TypeRegular {
|
if typ == objectSDK.TypeRegular {
|
||||||
require.NoError(t, err, typ)
|
require.NoError(t, err, typ)
|
||||||
} else {
|
} else {
|
||||||
|
@ -198,7 +202,12 @@ func TestDB_Lock_Expired(t *testing.T) {
|
||||||
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
||||||
|
|
||||||
// lock the obj
|
// lock the obj
|
||||||
require.NoError(t, db.Lock(context.Background(), addr.Container(), oidtest.ID(), []oid.ID{addr.Object()}))
|
var lockPrm meta.LockPrm
|
||||||
|
lockPrm.SetContainer(addr.Container())
|
||||||
|
lockPrm.SetTarget(oidtest.ID(), addr.Object())
|
||||||
|
|
||||||
|
_, err = db.Lock(context.Background(), lockPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// object is expired but locked, thus, must be available
|
// object is expired but locked, thus, must be available
|
||||||
_, err = metaGet(db, addr, false)
|
_, err = metaGet(db, addr, false)
|
||||||
|
@ -277,7 +286,11 @@ func putAndLockObj(t *testing.T, db *meta.DB, numOfLockedObjs int) ([]*objectSDK
|
||||||
err := putBig(db, lockObj)
|
err := putBig(db, lockObj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = db.Lock(context.Background(), cnr, lockID, lockedObjIDs)
|
var lockPrm meta.LockPrm
|
||||||
|
lockPrm.SetContainer(cnr)
|
||||||
|
lockPrm.SetTarget(lockID, lockedObjIDs...)
|
||||||
|
|
||||||
|
_, err = db.Lock(context.Background(), lockPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
return lockedObjs, lockObj
|
return lockedObjs, lockObj
|
||||||
|
|
|
@ -326,7 +326,12 @@ func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) err
|
||||||
|
|
||||||
cnr, _ := obj.ContainerID()
|
cnr, _ := obj.ContainerID()
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
err := s.metaBase.Lock(ctx, cnr, id, locked)
|
|
||||||
|
var lockPrm meta.LockPrm
|
||||||
|
lockPrm.SetContainer(cnr)
|
||||||
|
lockPrm.SetTarget(id, locked...)
|
||||||
|
|
||||||
|
_, err := s.metaBase.Lock(ctx, lockPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not lock objects: %w", err)
|
return fmt.Errorf("could not lock objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -283,7 +283,13 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
lockID, _ := lockObj.ID()
|
lockID, _ := lockObj.ID()
|
||||||
require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked))
|
|
||||||
|
var lockPrm LockPrm
|
||||||
|
lockPrm.SetContainer(cnrLocked)
|
||||||
|
lockPrm.SetTarget(lockID, locked...)
|
||||||
|
|
||||||
|
_, err = sh.Lock(context.Background(), lockPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...)
|
inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...)
|
||||||
|
|
|
@ -61,7 +61,11 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
|
var lockPrm LockPrm
|
||||||
|
lockPrm.SetContainer(cnr)
|
||||||
|
lockPrm.SetTarget(lockID, objID)
|
||||||
|
|
||||||
|
_, err = sh.Lock(context.Background(), lockPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
putPrm.SetObject(lock)
|
putPrm.SetObject(lock)
|
||||||
|
@ -150,7 +154,11 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = sh.Lock(context.Background(), cnr, lockID, append(childIDs, parentID, linkID))
|
var lockPrm LockPrm
|
||||||
|
lockPrm.SetContainer(cnr)
|
||||||
|
lockPrm.SetTarget(lockID, append(childIDs, parentID, linkID)...)
|
||||||
|
|
||||||
|
_, err = sh.Lock(context.Background(), lockPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
putPrm.SetObject(lock)
|
putPrm.SetObject(lock)
|
||||||
|
|
|
@ -12,19 +12,41 @@ import (
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// LockPrm contains parameters for Lock operation.
|
||||||
|
type LockPrm struct {
|
||||||
|
cnt cid.ID
|
||||||
|
lock oid.ID
|
||||||
|
locked []oid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetContainer sets the container ID for both the lock and locked objects
|
||||||
|
// since they must belong to the same container.
|
||||||
|
func (p *LockPrm) SetContainer(cnt cid.ID) {
|
||||||
|
p.cnt = cnt
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetTarget sets lock object ID and IDs of objects to be locked.
|
||||||
|
func (p *LockPrm) SetTarget(lock oid.ID, locked ...oid.ID) {
|
||||||
|
p.lock = lock
|
||||||
|
p.locked = locked
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockPrm contains results for Lock operation.
|
||||||
|
type LockRes struct{}
|
||||||
|
|
||||||
// Lock marks objects as locked with another object. All objects from the
|
// Lock marks objects as locked with another object. All objects from the
|
||||||
// specified container.
|
// specified container.
|
||||||
//
|
//
|
||||||
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
||||||
//
|
//
|
||||||
// Locked list should be unique. Panics if it is empty.
|
// Locked list should be unique. Panics if it is empty.
|
||||||
func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
func (s *Shard) Lock(ctx context.Context, prm LockPrm) (LockRes, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Lock",
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Lock",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("shard_id", s.ID().String()),
|
attribute.String("shard_id", s.ID().String()),
|
||||||
attribute.String("container_id", idCnr.EncodeToString()),
|
attribute.String("container_id", prm.cnt.EncodeToString()),
|
||||||
attribute.String("locker", locker.EncodeToString()),
|
attribute.String("locker", prm.lock.EncodeToString()),
|
||||||
attribute.Int("locked_count", len(locked)),
|
attribute.Int("locked_count", len(prm.locked)),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -33,17 +55,21 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []
|
||||||
|
|
||||||
m := s.info.Mode
|
m := s.info.Mode
|
||||||
if m.ReadOnly() {
|
if m.ReadOnly() {
|
||||||
return ErrReadOnlyMode
|
return LockRes{}, ErrReadOnlyMode
|
||||||
} else if m.NoMetabase() {
|
} else if m.NoMetabase() {
|
||||||
return ErrDegradedMode
|
return LockRes{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.metaBase.Lock(ctx, idCnr, locker, locked)
|
var lockPrm meta.LockPrm
|
||||||
|
lockPrm.SetContainer(prm.cnt)
|
||||||
|
lockPrm.SetTarget(prm.lock, prm.locked...)
|
||||||
|
|
||||||
|
_, err := s.metaBase.Lock(ctx, lockPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("metabase lock: %w", err)
|
return LockRes{}, fmt.Errorf("metabase lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return LockRes{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsLocked checks object locking relation of the provided object. Not found object is
|
// IsLocked checks object locking relation of the provided object. Not found object is
|
||||||
|
|
|
@ -82,7 +82,11 @@ func TestShard_Lock(t *testing.T) {
|
||||||
|
|
||||||
// lock the object
|
// lock the object
|
||||||
|
|
||||||
err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
|
var lockPrm LockPrm
|
||||||
|
lockPrm.SetContainer(cnr)
|
||||||
|
lockPrm.SetTarget(lockID, objID)
|
||||||
|
|
||||||
|
_, err = sh.Lock(context.Background(), lockPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
putPrm.SetObject(lock)
|
putPrm.SetObject(lock)
|
||||||
|
@ -173,8 +177,12 @@ func TestShard_IsLocked(t *testing.T) {
|
||||||
require.False(t, locked)
|
require.False(t, locked)
|
||||||
|
|
||||||
// locked object is locked
|
// locked object is locked
|
||||||
|
var lockPrm LockPrm
|
||||||
|
lockPrm.SetContainer(cnrID)
|
||||||
|
lockPrm.SetTarget(lockID, objID)
|
||||||
|
|
||||||
require.NoError(t, sh.Lock(context.Background(), cnrID, lockID, []oid.ID{objID}))
|
_, err = sh.Lock(context.Background(), lockPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
locked, err = sh.IsLocked(context.Background(), objectcore.AddressOf(obj))
|
locked, err = sh.IsLocked(context.Background(), objectcore.AddressOf(obj))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -253,7 +253,11 @@ func PopulateLocked(
|
||||||
lockerOID, _ := locker.ID()
|
lockerOID, _ := locker.ID()
|
||||||
|
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
if err := db.Lock(ctx, lockerCID, lockerOID, []oid.ID{id}); err != nil {
|
var lockPrm meta.LockPrm
|
||||||
|
lockPrm.SetContainer(lockerCID)
|
||||||
|
lockPrm.SetTarget(lockerOID, id)
|
||||||
|
|
||||||
|
if _, err := db.Lock(ctx, lockPrm); err != nil {
|
||||||
return fmt.Errorf("couldn't lock an object: %w", err)
|
return fmt.Errorf("couldn't lock an object: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in a new issue