diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 629f79207..f124f3cda 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -285,8 +285,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl var inhumePrm engine.InhumePrm inhumePrm.MarkAsGarbage(addr) - _, err := ls.Inhume(ctx, inhumePrm) - if err != nil { + if _, err := ls.Inhume(ctx, inhumePrm); err != nil { c.log.Warn(logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage, zap.String("error", err.Error()), ) @@ -295,6 +294,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl policer.WithPool(c.cfgObject.pool.replication), policer.WithMetrics(c.metricsCollector.PolicerMetrics()), policer.WithKeyStorage(keyStorage), + policer.WithRemoveOrphanLocksFunc(ls.RemoveOrphanLocks), ) c.workers = append(c.workers, worker{ diff --git a/internal/logs/logs.go b/internal/logs/logs.go index d0bac4d11..4e1f29604 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -50,6 +50,7 @@ const ( PolicerFailureAtObjectSelectForReplication = "failure at object select for replication" PolicerPoolSubmission = "pool submission" PolicerUnableToProcessObj = "unable to process object" + PolicerUnableToRemoveOrphanLocks = "unable to remove orphan locks" ReplicatorFinishWork = "finish work" ReplicatorCouldNotGetObjectFromLocalStorage = "could not get object from local storage" ReplicatorCouldNotReplicateObject = "could not replicate object" diff --git a/pkg/services/policer/option.go b/pkg/services/policer/option.go index 336f7a0ab..33be1cd93 100644 --- a/pkg/services/policer/option.go +++ b/pkg/services/policer/option.go @@ -51,6 +51,8 @@ type RemoteObjectGetFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) type LocalObjectGetFunc func(context.Context, oid.Address) (*objectSDK.Object, error) +type RemoveOrphanLocksFunc func(context.Context, oid.Address) error + type cfg struct { headTimeout time.Duration @@ -87,6 +89,8 @@ type cfg struct { localObject LocalObjectGetFunc keyStorage *util.KeyStorage + + removeOrphanLocksFn RemoveOrphanLocksFunc } func defaultCfg() *cfg { @@ -212,3 +216,9 @@ func WithKeyStorage(ks *util.KeyStorage) Option { c.keyStorage = ks } } + +func WithRemoveOrphanLocksFunc(v RemoveOrphanLocksFunc) Option { + return func(c *cfg) { + c.removeOrphanLocksFn = v + } +} diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index 2fa87c40f..bd1018263 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -75,6 +75,15 @@ func (p *Policer) submitPolicerTask(ctx context.Context, addr object.Info, skipM zap.String("error", err.Error())) } + if addr.IsLocked { + if err := p.removeOrphanLocksFn(ctx, addr.Address); err != nil { + p.log.Warn(logs.PolicerUnableToRemoveOrphanLocks, + zap.Stringer("object", addr.Address), + zap.Error(err), + ) + } + } + p.cache.Add(addr.Address, time.Now()) p.objsInWork.remove(addr.Address) p.metrics.IncProcessedObjects()