From 277e3ca20a2dc48a4e79d63cd2c39c2e679e29f3 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 19 Feb 2021 18:21:46 +0300 Subject: [PATCH] [#217] policer: Handler redundant local copy of the object Detect redundant local copy of the object in Object Policer. Add redundant copy callback (`WithRedundantCopyCallback` option). Pass address of the redundant copy to callback. Signed-off-by: Leonard Lyubich --- pkg/services/policer/check.go | 14 +++++++++++--- pkg/services/policer/policer.go | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 837b547cd..6bef137c8 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -48,8 +48,9 @@ func (p *Policer) processObject(ctx context.Context, addr *object.Address) { func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes netmap.Nodes, shortage uint32) { prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr) + redundantLocalCopy := false - for i := 0; shortage > 0 && i < len(nodes); i++ { + for i := 0; i < len(nodes); i++ { select { case <-ctx.Done(): return @@ -68,8 +69,13 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes } if network.IsLocalAddress(p.localAddrSrc, node) { - shortage-- - } else { + if shortage == 0 { + redundantLocalCopy = true + break + } else { + shortage-- + } + } else if shortage > 0 { callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) _, err = p.remoteHeader.Head(callCtx, prm.WithNodeAddress(node)) @@ -107,5 +113,7 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes WithNodes(nodes). WithCopiesNumber(shortage), ) + } else if redundantLocalCopy { + p.cbRedundantCopy(addr) } } diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index cd92bfdc4..c091a7d79 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -4,6 +4,7 @@ import ( "sync" "time" + "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network" @@ -25,6 +26,10 @@ type Policer struct { // Option is an option for Policer constructor. type Option func(*cfg) +// RedundantCopyCallback is a callback to pass +// the redundant local copy of the object. +type RedundantCopyCallback func(*object.Address) + type cfg struct { headTimeout time.Duration @@ -45,6 +50,8 @@ type cfg struct { localAddrSrc network.LocalAddressSource replicator *replicator.Replicator + + cbRedundantCopy RedundantCopyCallback } func defaultCfg() *cfg { @@ -148,3 +155,12 @@ func WithReplicator(v *replicator.Replicator) Option { c.replicator = v } } + +// WithRedundantCopyCallback returns option to set +// callback to pass redundant local object copies +// detected by Policer. +func WithRedundantCopyCallback(cb RedundantCopyCallback) Option { + return func(c *cfg) { + c.cbRedundantCopy = cb + } +}