[#1421] shard: Use LockPrm to store Lock operation parameters

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
Aleksey Savchuk 2024-12-11 16:06:18 +03:00
parent 0b7860100f
commit 0c8f031080
Signed by: a-savchuk
GPG key ID: 70C0A7FF6F9C4639
5 changed files with 66 additions and 17 deletions

View file

@ -62,6 +62,9 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
// - 1: locking irregular object
// - 2: ok
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
var lockPrm shard.LockPrm
lockPrm.SetContainer(idCnr)
// code is pretty similar to inhumeAddr, maybe unify?
root := false
var addrLocked oid.Address
@ -95,7 +98,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
}
eclocked = append(eclocked, objID)
}
err = sh.Lock(ctx, idCnr, locker, eclocked)
lockPrm.SetTarget(locker, eclocked...)
_, err = sh.Lock(ctx, lockPrm)
if err != nil {
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
@ -120,7 +124,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
}
}
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
lockPrm.SetTarget(locker, locked)
_, err := sh.Lock(ctx, lockPrm)
if err != nil {
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))

View file

@ -283,7 +283,13 @@ func TestRefillMetabase(t *testing.T) {
require.NoError(t, err)
lockID, _ := lockObj.ID()
require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked))
var lockPrm LockPrm
lockPrm.SetContainer(cnrLocked)
lockPrm.SetTarget(lockID, locked...)
_, err = sh.Lock(context.Background(), lockPrm)
require.NoError(t, err)
var inhumePrm InhumePrm
inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...)

View file

@ -61,7 +61,11 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
var lockPrm LockPrm
lockPrm.SetContainer(cnr)
lockPrm.SetTarget(lockID, objID)
_, err = sh.Lock(context.Background(), lockPrm)
require.NoError(t, err)
putPrm.SetObject(lock)
@ -150,7 +154,11 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
err = sh.Lock(context.Background(), cnr, lockID, append(childIDs, parentID, linkID))
var lockPrm LockPrm
lockPrm.SetContainer(cnr)
lockPrm.SetTarget(lockID, append(childIDs, parentID, linkID)...)
_, err = sh.Lock(context.Background(), lockPrm)
require.NoError(t, err)
putPrm.SetObject(lock)

View file

@ -12,19 +12,41 @@ import (
"go.opentelemetry.io/otel/trace"
)
// LockPrm contains parameters for Lock operation.
type LockPrm struct {
cnt cid.ID
lock oid.ID
locked []oid.ID
}
// SetContainer sets the container ID for both the lock and locked objects
// since they must belong to the same container.
func (p *LockPrm) SetContainer(cnt cid.ID) {
p.cnt = cnt
}
// SetTarget sets lock object ID and IDs of objects to be locked.
func (p *LockPrm) SetTarget(lock oid.ID, locked ...oid.ID) {
p.lock = lock
p.locked = locked
}
// LockPrm contains results for Lock operation.
type LockRes struct{}
// Lock marks objects as locked with another object. All objects from the
// specified container.
//
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
//
// Locked list should be unique. Panics if it is empty.
func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
func (s *Shard) Lock(ctx context.Context, prm LockPrm) (LockRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Lock",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", idCnr.EncodeToString()),
attribute.String("locker", locker.EncodeToString()),
attribute.Int("locked_count", len(locked)),
attribute.String("container_id", prm.cnt.EncodeToString()),
attribute.String("locker", prm.lock.EncodeToString()),
attribute.Int("locked_count", len(prm.locked)),
))
defer span.End()
@ -33,21 +55,21 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []
m := s.info.Mode
if m.ReadOnly() {
return ErrReadOnlyMode
return LockRes{}, ErrReadOnlyMode
} else if m.NoMetabase() {
return ErrDegradedMode
return LockRes{}, ErrDegradedMode
}
var lockPrm meta.LockPrm
lockPrm.SetContainer(idCnr)
lockPrm.SetTarget(locker, locked...)
lockPrm.SetContainer(prm.cnt)
lockPrm.SetTarget(prm.lock, prm.locked...)
_, err := s.metaBase.Lock(ctx, lockPrm)
if err != nil {
return fmt.Errorf("metabase lock: %w", err)
return LockRes{}, fmt.Errorf("metabase lock: %w", err)
}
return nil
return LockRes{}, nil
}
// IsLocked checks object locking relation of the provided object. Not found object is

View file

@ -82,7 +82,11 @@ func TestShard_Lock(t *testing.T) {
// lock the object
err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
var lockPrm LockPrm
lockPrm.SetContainer(cnr)
lockPrm.SetTarget(lockID, objID)
_, err = sh.Lock(context.Background(), lockPrm)
require.NoError(t, err)
putPrm.SetObject(lock)
@ -173,8 +177,12 @@ func TestShard_IsLocked(t *testing.T) {
require.False(t, locked)
// locked object is locked
var lockPrm LockPrm
lockPrm.SetContainer(cnrID)
lockPrm.SetTarget(lockID, objID)
require.NoError(t, sh.Lock(context.Background(), cnrID, lockID, []oid.ID{objID}))
_, err = sh.Lock(context.Background(), lockPrm)
require.NoError(t, err)
locked, err = sh.IsLocked(context.Background(), objectcore.AddressOf(obj))
require.NoError(t, err)