From 545861d1bcf43adec2285ecc3d164024e0256cca Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 16 Feb 2022 01:21:44 +0300 Subject: [PATCH] [#1175] storage engine: Implement Lock operation Implement `StorageEngine.Lock` method which works similar to `Inhume` but calls `Lock` on the processing shards. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/engine/lock.go | 106 ++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 pkg/local_object_storage/engine/lock.go diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go new file mode 100644 index 000000000..9b2ac7c62 --- /dev/null +++ b/pkg/local_object_storage/engine/lock.go @@ -0,0 +1,106 @@ +package engine + +import ( + "errors" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/nspcc-dev/neofs-sdk-go/object/address" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" +) + +var errLockFailed = errors.New("lock operation failed") + +// Lock marks objects as locked with another object. All objects from the +// specified container. +// +// Allows locking regular objects only (otherwise returns apistatus.IrregularObjectLock). +// +// Locked list should be unique. Panics if it is empty. +func (e *StorageEngine) Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error { + return e.execIfNotBlocked(func() error { + return e.lock(idCnr, locker, locked) + }) +} + +func (e *StorageEngine) lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error { + var addr address.Address + addr.SetContainerID(&idCnr) + + for i := range locked { + switch e.lockSingle(idCnr, locker, locked[i], true) { + case 1: + return apistatus.ObjectLocked{} + case 0: + switch e.lockSingle(idCnr, locker, locked[i], true) { + case 1: + return apistatus.ObjectLocked{} + case 0: + return errLockFailed + } + } + } + + return nil +} + +// Returns: +// 0 - fail +// 1 - object locked +// 2 - ok +func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) { + // code is pretty similar to inhumeAddr, maybe unify? + root := false + var errLocked apistatus.ObjectLocked + + var addrLocked address.Address + addrLocked.SetContainerID(&idCnr) + addrLocked.SetObjectID(&locked) + + e.iterateOverSortedShards(&addrLocked, func(_ int, sh hashedShard) (stop bool) { + defer func() { + // if object is root we continue since information about it + // can be presented in other shards + if checkExists && root { + stop = false + } + }() + + if checkExists { + exRes, err := sh.Exists(new(shard.ExistsPrm). + WithAddress(&addrLocked), + ) + if err != nil { + var siErr *objectSDK.SplitInfoError + if !errors.As(err, &siErr) { + e.reportShardError(sh, "could not check locked object for presence in shard", err) + return + } + + root = true + } else if !exRes.Exists() { + return + } + } + + err := sh.Lock(idCnr, locker, []oid.ID{locked}) + if err != nil { + e.reportShardError(sh, "could not lock object in shard", err) + + if errors.As(err, &errLocked) { + status = 1 + return true + } + + return false + } + + status = 2 + + return true + }) + + return +}