[#1421] shard: Use LockPrm
to store Lock operation parameters
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
010eb7ec33
commit
c09fd4a66b
5 changed files with 67 additions and 12 deletions
|
@ -62,6 +62,9 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
|
||||||
// - 1: locking irregular object
|
// - 1: locking irregular object
|
||||||
// - 2: ok
|
// - 2: ok
|
||||||
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
||||||
|
var lockPrm shard.LockPrm
|
||||||
|
lockPrm.SetContainer(idCnr)
|
||||||
|
|
||||||
// code is pretty similar to inhumeAddr, maybe unify?
|
// code is pretty similar to inhumeAddr, maybe unify?
|
||||||
root := false
|
root := false
|
||||||
var addrLocked oid.Address
|
var addrLocked oid.Address
|
||||||
|
@ -95,7 +98,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
}
|
}
|
||||||
eclocked = append(eclocked, objID)
|
eclocked = append(eclocked, objID)
|
||||||
}
|
}
|
||||||
err = sh.Lock(ctx, idCnr, locker, eclocked)
|
lockPrm.SetTarget(locker, eclocked...)
|
||||||
|
_, err = sh.Lock(ctx, lockPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
|
@ -120,7 +124,8 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := sh.Lock(ctx, idCnr, locker, []oid.ID{locked})
|
lockPrm.SetTarget(locker, locked)
|
||||||
|
_, err := sh.Lock(ctx, lockPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
|
||||||
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
|
||||||
|
|
|
@ -283,7 +283,13 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
lockID, _ := lockObj.ID()
|
lockID, _ := lockObj.ID()
|
||||||
require.NoError(t, sh.Lock(context.Background(), cnrLocked, lockID, locked))
|
|
||||||
|
var lockPrm LockPrm
|
||||||
|
lockPrm.SetContainer(cnrLocked)
|
||||||
|
lockPrm.SetTarget(lockID, locked...)
|
||||||
|
|
||||||
|
_, err = sh.Lock(context.Background(), lockPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...)
|
inhumePrm.SetTarget(object.AddressOf(tombObj), tombMembers...)
|
||||||
|
|
|
@ -61,7 +61,11 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
|
var lockPrm LockPrm
|
||||||
|
lockPrm.SetContainer(cnr)
|
||||||
|
lockPrm.SetTarget(lockID, objID)
|
||||||
|
|
||||||
|
_, err = sh.Lock(context.Background(), lockPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
putPrm.SetObject(lock)
|
putPrm.SetObject(lock)
|
||||||
|
@ -150,7 +154,11 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = sh.Lock(context.Background(), cnr, lockID, append(childIDs, parentID, linkID))
|
var lockPrm LockPrm
|
||||||
|
lockPrm.SetContainer(cnr)
|
||||||
|
lockPrm.SetTarget(lockID, append(childIDs, parentID, linkID)...)
|
||||||
|
|
||||||
|
_, err = sh.Lock(context.Background(), lockPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
putPrm.SetObject(lock)
|
putPrm.SetObject(lock)
|
||||||
|
|
|
@ -12,13 +12,41 @@ import (
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// LockPrm contains parameters for Lock operation.
|
||||||
|
type LockPrm struct {
|
||||||
|
cnt cid.ID
|
||||||
|
lock oid.ID
|
||||||
|
locked []oid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetContainer sets the container ID for both the lock and locked objects
|
||||||
|
// since they must belong to the same container.
|
||||||
|
func (p *LockPrm) SetContainer(cnt cid.ID) {
|
||||||
|
p.cnt = cnt
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetTarget sets lock object ID and IDs of objects to be locked.
|
||||||
|
//
|
||||||
|
// The list of locked objects should be unique. Panics if the list is empty.
|
||||||
|
func (p *LockPrm) SetTarget(lock oid.ID, locked ...oid.ID) {
|
||||||
|
p.lock = lock
|
||||||
|
p.locked = locked
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockPrm contains results for Lock operation.
|
||||||
|
type LockRes struct{}
|
||||||
|
|
||||||
// Lock marks objects as locked with another object. All objects from the
|
// Lock marks objects as locked with another object. All objects from the
|
||||||
// specified container.
|
// specified container.
|
||||||
//
|
//
|
||||||
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
||||||
//
|
//
|
||||||
// Locked list should be unique. Panics if it is empty.
|
// Locked list should be unique. Panics if it is empty.
|
||||||
func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
func (s *Shard) Lock(ctx context.Context, prm LockPrm) (LockRes, error) {
|
||||||
|
idCnr := prm.cnt
|
||||||
|
locker := prm.lock
|
||||||
|
locked := prm.locked
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Lock",
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Lock",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("shard_id", s.ID().String()),
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
@ -33,9 +61,9 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []
|
||||||
|
|
||||||
m := s.info.Mode
|
m := s.info.Mode
|
||||||
if m.ReadOnly() {
|
if m.ReadOnly() {
|
||||||
return ErrReadOnlyMode
|
return LockRes{}, ErrReadOnlyMode
|
||||||
} else if m.NoMetabase() {
|
} else if m.NoMetabase() {
|
||||||
return ErrDegradedMode
|
return LockRes{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
var lockPrm meta.LockPrm
|
var lockPrm meta.LockPrm
|
||||||
|
@ -44,10 +72,10 @@ func (s *Shard) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []
|
||||||
|
|
||||||
_, err := s.metaBase.Lock(ctx, lockPrm)
|
_, err := s.metaBase.Lock(ctx, lockPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("metabase lock: %w", err)
|
return LockRes{}, fmt.Errorf("metabase lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return LockRes{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsLocked checks object locking relation of the provided object. Not found object is
|
// IsLocked checks object locking relation of the provided object. Not found object is
|
||||||
|
|
|
@ -82,7 +82,11 @@ func TestShard_Lock(t *testing.T) {
|
||||||
|
|
||||||
// lock the object
|
// lock the object
|
||||||
|
|
||||||
err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
|
var lockPrm LockPrm
|
||||||
|
lockPrm.SetContainer(cnr)
|
||||||
|
lockPrm.SetTarget(lockID, objID)
|
||||||
|
|
||||||
|
_, err = sh.Lock(context.Background(), lockPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
putPrm.SetObject(lock)
|
putPrm.SetObject(lock)
|
||||||
|
@ -173,8 +177,12 @@ func TestShard_IsLocked(t *testing.T) {
|
||||||
require.False(t, locked)
|
require.False(t, locked)
|
||||||
|
|
||||||
// locked object is locked
|
// locked object is locked
|
||||||
|
var lockPrm LockPrm
|
||||||
|
lockPrm.SetContainer(cnrID)
|
||||||
|
lockPrm.SetTarget(lockID, objID)
|
||||||
|
|
||||||
require.NoError(t, sh.Lock(context.Background(), cnrID, lockID, []oid.ID{objID}))
|
_, err = sh.Lock(context.Background(), lockPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
locked, err = sh.IsLocked(context.Background(), objectcore.AddressOf(obj))
|
locked, err = sh.IsLocked(context.Background(), objectcore.AddressOf(obj))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
Loading…
Add table
Reference in a new issue