Use LockPrm
and LockRes
to store Lock operation parameters and results #1556
7 changed files with 77 additions and 20 deletions
|
@ -111,7 +111,11 @@ func TestInhumeLocked(t *testing.T) {
|
|||
|
||||
locked := oidtest.Address()
|
||||
|
||||
err := db.Lock(context.Background(), locked.Container(), oidtest.ID(), []oid.ID{locked.Object()})
|
||||
var lockPrm meta.LockPrm
|
||||
lockPrm.SetContainer(locked.Container())
|
||||
lockPrm.SetTarget(oidtest.ID(), locked.Object())
|
||||
|
||||
_, err := db.Lock(context.Background(), lockPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var prm meta.InhumePrm
|
||||
|
|
|
@ -35,9 +35,14 @@ func TestDB_IterateExpired(t *testing.T) {
|
|||
|
||||
expiredLocked := putWithExpiration(t, db, objectSDK.TypeRegular, epoch-1)
|
||||
|
||||
require.NoError(t, db.Lock(context.Background(), expiredLocked.Container(), oidtest.ID(), []oid.ID{expiredLocked.Object()}))
|
||||
var lockPrm meta.LockPrm
|
||||
lockPrm.SetContainer(expiredLocked.Container())
|
||||
lockPrm.SetTarget(oidtest.ID(), expiredLocked.Object())
|
||||
|
||||
err := db.IterateExpired(context.Background(), epoch, func(exp *meta.ExpiredObject) error {
|
||||
_, err := db.Lock(context.Background(), lockPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.IterateExpired(context.Background(), epoch, func(exp *meta.ExpiredObject) error {
|
||||
if addr, ok := mAlive[exp.Type()]; ok {
|
||||
require.NotEqual(t, addr, exp.Address())
|
||||
}
|
||||
|
|
|
@ -30,13 +30,35 @@ func bucketNameLockers(idCnr cid.ID, key []byte) []byte {
|
|||
return bucketName(idCnr, lockersPrefix, key)
|
||||
}
|
||||
|
||||
// LockPrm contains parameters for Lock operation.
|
||||
type LockPrm struct {
|
||||
cnt cid.ID
|
||||
lock oid.ID
|
||||
locked []oid.ID
|
||||
}
|
||||
|
||||
// SetContainer sets the container ID for both the lock and locked objects
|
||||
// since they must belong to the same container.
|
||||
func (p *LockPrm) SetContainer(cnt cid.ID) {
|
||||
p.cnt = cnt
|
||||
}
|
||||
|
||||
// SetTarget sets lock object ID and IDs of objects to be locked.
|
||||
func (p *LockPrm) SetTarget(lock oid.ID, locked ...oid.ID) {
|
||||
p.lock = lock
|
||||
|
||||
p.locked = locked
|
||||
}
|
||||
|
||||
// LockPrm contains results for Lock operation.
|
||||
type LockRes struct{}
|
||||
|
||||
// Lock marks objects as locked with another object. All objects are from the
|
||||
// specified container.
|
||||
//
|
||||
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
||||
//
|
||||
// Locked list should be unique. Panics if it is empty.
|
||||
func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
||||
func (db *DB) Lock(ctx context.Context, prm LockPrm) (LockRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
|
@ -47,9 +69,9 @@ func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.
|
|||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.Lock",
|
||||
trace.WithAttributes(
|
||||
attribute.String("container_id", cnr.EncodeToString()),
|
||||
attribute.String("locker", locker.EncodeToString()),
|
||||
attribute.Int("locked_count", len(locked)),
|
||||
attribute.String("container_id", prm.cnt.EncodeToString()),
|
||||
attribute.String("locker", prm.lock.EncodeToString()),
|
||||
attribute.Int("locked_count", len(prm.locked)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
|
@ -57,18 +79,18 @@ func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.
|
|||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
return LockRes{}, ErrDegradedMode
|
||||
} else if db.mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
return LockRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
if len(locked) == 0 {
|
||||
if len(prm.locked) == 0 {
|
||||
panic("empty locked list")
|
||||
}
|
||||
|
||||
err := db.lockInternal(locked, cnr, locker)
|
||||
err := db.lockInternal(prm.locked, prm.cnt, prm.lock)
|
||||
success = err == nil
|
||||
return err
|
||||
return LockRes{}, err
|
||||
}
|
||||
|
||||
func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
|
||||
|
|
|
@ -23,9 +23,12 @@ func TestDB_Lock(t *testing.T) {
|
|||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||
|
||||
var lockPrm meta.LockPrm
|
||||
lockPrm.SetContainer(cnr)
|
||||
|
||||
t.Run("empty locked list", func(t *testing.T) {
|
||||
require.Panics(t, func() { _ = db.Lock(context.Background(), cnr, oid.ID{}, nil) })
|
||||
require.Panics(t, func() { _ = db.Lock(context.Background(), cnr, oid.ID{}, []oid.ID{}) })
|
||||
lockPrm.SetTarget(oid.ID{})
|
||||
require.Panics(t, func() { _, _ = db.Lock(context.Background(), lockPrm) })
|
||||
})
|
||||
|
||||
t.Run("(ir)regular", func(t *testing.T) {
|
||||
|
@ -47,7 +50,8 @@ func TestDB_Lock(t *testing.T) {
|
|||
id, _ := obj.ID()
|
||||
|
||||
// try to lock it
|
||||
err = db.Lock(context.Background(), cnr, oidtest.ID(), []oid.ID{id})
|
||||
lockPrm.SetTarget(oidtest.ID(), id)
|
||||
_, err = db.Lock(context.Background(), lockPrm)
|
||||
if typ == objectSDK.TypeRegular {
|
||||
require.NoError(t, err, typ)
|
||||
} else {
|
||||
|
@ -198,7 +202,12 @@ func TestDB_Lock_Expired(t *testing.T) {
|
|||
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
||||
|
||||
// lock the obj
|
||||
require.NoError(t, db.Lock(context.Background(), addr.Container(), oidtest.ID(), []oid.ID{addr.Object()}))
|
||||
var lockPrm meta.LockPrm
|
||||
lockPrm.SetContainer(addr.Container())
|
||||
lockPrm.SetTarget(oidtest.ID(), addr.Object())
|
||||
|
||||
_, err = db.Lock(context.Background(), lockPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
// object is expired but locked, thus, must be available
|
||||
_, err = metaGet(db, addr, false)
|
||||
|
@ -277,7 +286,11 @@ func putAndLockObj(t *testing.T, db *meta.DB, numOfLockedObjs int) ([]*objectSDK
|
|||
err := putBig(db, lockObj)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.Lock(context.Background(), cnr, lockID, lockedObjIDs)
|
||||
var lockPrm meta.LockPrm
|
||||
lockPrm.SetContainer(cnr)
|
||||
lockPrm.SetTarget(lockID, lockedObjIDs...)
|
||||
|
||||
_, err = db.Lock(context.Background(), lockPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
return lockedObjs, lockObj
|
||||
|
|
|
@ -326,7 +326,12 @@ func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) err
|
|||
|
||||
cnr, _ := obj.ContainerID()
|
||||
id, _ := obj.ID()
|
||||
err := s.metaBase.Lock(ctx, cnr, id, locked)
|
||||
|
||||
var lockPrm meta.LockPrm
|
||||
lockPrm.SetContainer(cnr)
|
||||
lockPrm.SetTarget(id, locked...)
|
||||
|
||||
_, err := s.metaBase.Lock(ctx, lockPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not lock objects: %w", err)
|
||||
}
|
||||
|
|
|
@ -38,7 +38,11 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
err := s.metaBase.Lock(ctx, idCnr, locker, locked)
|
||||
var lockPrm meta.LockPrm
|
||||
lockPrm.SetContainer(idCnr)
|
||||
lockPrm.SetTarget(locker, locked...)
|
||||
|
||||
_, err := s.metaBase.Lock(ctx, lockPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("metabase lock: %w", err)
|
||||
}
|
||||
|
|
|
@ -253,7 +253,11 @@ func PopulateLocked(
|
|||
lockerOID, _ := locker.ID()
|
||||
|
||||
group.Go(func() error {
|
||||
if err := db.Lock(ctx, lockerCID, lockerOID, []oid.ID{id}); err != nil {
|
||||
var lockPrm meta.LockPrm
|
||||
lockPrm.SetContainer(lockerCID)
|
||||
lockPrm.SetTarget(lockerOID, id)
|
||||
|
||||
if _, err := db.Lock(ctx, lockPrm); err != nil {
|
||||
return fmt.Errorf("couldn't lock an object: %w", err)
|
||||
}
|
||||
return nil
|
||||
|
|
Loading…
Reference in a new issue
But there is no check if list is empty and no panic...