From 84f545dacccaba104d43af54bf96ed882cef8b8c Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Sat, 12 Nov 2022 15:27:54 +0300 Subject: [PATCH] [#1502] engine: Check all shards for `LOCK`'ing before inhuming It allows keeping all the locked objects safe after metabase resynchronization. Currently, all `LOCK` objects are broadcast to all nodes in a container, it guarantees `LOCK` object presence in a regular situation. Signed-off-by: Pavel Karpy --- pkg/local_object_storage/engine/inhume.go | 36 +++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 2a3bdef9a3..a81a3243d8 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -9,6 +9,7 @@ import ( apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" ) // InhumePrm encapsulates parameters for inhume operation. @@ -79,6 +80,18 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) { } for i := range prm.addrs { + if !prm.forceRemoval { + locked, err := e.isLocked(prm.addrs[i]) + if err != nil { + e.log.Warn("removing an object without full locking check", + zap.Error(err), + zap.Stringer("addr", prm.addrs[i])) + } else if locked { + var lockedErr apistatus.ObjectLocked + return InhumeRes{}, lockedErr + } + } + if prm.tombstone != nil { shPrm.SetTarget(*prm.tombstone, prm.addrs[i]) } else { @@ -166,6 +179,29 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE return ok, retErr } +func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) { + var locked bool + var err error + var outErr error + + e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) { + locked, err = h.Shard.IsLocked(addr) + if err != nil { + e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr)) + outErr = err + return false + } + + return locked + }) + + if locked { + return locked, nil + } + + return locked, outErr +} + func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { sh.HandleExpiredTombstones(addrs)