[#1445] policer: Make policer remove orphan locks
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 1m34s
DCO action / DCO (pull_request) Successful in 1m48s
Vulncheck / Vulncheck (pull_request) Successful in 2m13s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m30s
Build / Build Components (pull_request) Successful in 2m47s
Tests and linters / Staticcheck (pull_request) Successful in 2m57s
Tests and linters / gopls check (pull_request) Successful in 3m1s
Tests and linters / Lint (pull_request) Successful in 3m57s
Tests and linters / Tests (pull_request) Successful in 4m42s
Tests and linters / Tests with -race (pull_request) Successful in 6m7s
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 1m34s
DCO action / DCO (pull_request) Successful in 1m48s
Vulncheck / Vulncheck (pull_request) Successful in 2m13s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m30s
Build / Build Components (pull_request) Successful in 2m47s
Tests and linters / Staticcheck (pull_request) Successful in 2m57s
Tests and linters / gopls check (pull_request) Successful in 3m1s
Tests and linters / Lint (pull_request) Successful in 3m57s
Tests and linters / Tests (pull_request) Successful in 4m42s
Tests and linters / Tests with -race (pull_request) Successful in 6m7s
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
aa5816bdc6
commit
0013294c1b
4 changed files with 22 additions and 2 deletions
|
@ -287,8 +287,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
|||
var inhumePrm engine.InhumePrm
|
||||
inhumePrm.MarkAsGarbage(addr)
|
||||
|
||||
_, err := ls.Inhume(ctx, inhumePrm)
|
||||
if err != nil {
|
||||
if _, err := ls.Inhume(ctx, inhumePrm); err != nil {
|
||||
c.log.Warn(logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
@ -297,6 +296,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
|||
policer.WithPool(c.cfgObject.pool.replication),
|
||||
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
|
||||
policer.WithKeyStorage(keyStorage),
|
||||
policer.WithRemoveOrphanLocksFunc(ls.RemoveOrphanLocks),
|
||||
)
|
||||
|
||||
c.workers = append(c.workers, worker{
|
||||
|
|
|
@ -50,6 +50,7 @@ const (
|
|||
PolicerFailureAtObjectSelectForReplication = "failure at object select for replication"
|
||||
PolicerPoolSubmission = "pool submission"
|
||||
PolicerUnableToProcessObj = "unable to process object"
|
||||
PolicerUnableToRemoveOrphanLocks = "unable to remove orphan locks"
|
||||
ReplicatorFinishWork = "finish work"
|
||||
ReplicatorCouldNotGetObjectFromLocalStorage = "could not get object from local storage"
|
||||
ReplicatorCouldNotReplicateObject = "could not replicate object"
|
||||
|
|
|
@ -51,6 +51,8 @@ type RemoteObjectGetFunc func(context.Context, netmapSDK.NodeInfo, oid.Address)
|
|||
|
||||
type LocalObjectGetFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
|
||||
|
||||
type RemoveOrphanLocksFunc func(context.Context, oid.Address) error
|
||||
|
||||
type cfg struct {
|
||||
headTimeout time.Duration
|
||||
|
||||
|
@ -87,6 +89,8 @@ type cfg struct {
|
|||
localObject LocalObjectGetFunc
|
||||
|
||||
keyStorage *util.KeyStorage
|
||||
|
||||
removeOrphanLocksFn RemoveOrphanLocksFunc
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
|
@ -212,3 +216,9 @@ func WithKeyStorage(ks *util.KeyStorage) Option {
|
|||
c.keyStorage = ks
|
||||
}
|
||||
}
|
||||
|
||||
func WithRemoveOrphanLocksFunc(v RemoveOrphanLocksFunc) Option {
|
||||
return func(c *cfg) {
|
||||
c.removeOrphanLocksFn = v
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,6 +75,15 @@ func (p *Policer) submitPolicerTask(ctx context.Context, addr object.Info, skipM
|
|||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
if addr.IsLocked {
|
||||
if err := p.removeOrphanLocksFn(ctx, addr.Address); err != nil {
|
||||
p.log.Warn(logs.PolicerUnableToRemoveOrphanLocks,
|
||||
zap.Stringer("object", addr.Address),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
p.cache.Add(addr.Address, time.Now())
|
||||
p.objsInWork.remove(addr.Address)
|
||||
p.metrics.IncProcessedObjects()
|
||||
|
|
Loading…
Reference in a new issue