[#217] policer: Handler redundant local copy of the object

Detect redundant local copy of the object in Object Policer. Add redundant
copy callback (`WithRedundantCopyCallback` option). Pass address of the
redundant copy to callback.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-02-19 18:21:46 +03:00 committed by Alex Vanin
parent 35073fb61b
commit 277e3ca20a
2 changed files with 27 additions and 3 deletions

View file

@ -48,8 +48,9 @@ func (p *Policer) processObject(ctx context.Context, addr *object.Address) {
func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes netmap.Nodes, shortage uint32) { func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes netmap.Nodes, shortage uint32) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr) prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
redundantLocalCopy := false
for i := 0; shortage > 0 && i < len(nodes); i++ { for i := 0; i < len(nodes); i++ {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -68,8 +69,13 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes
} }
if network.IsLocalAddress(p.localAddrSrc, node) { if network.IsLocalAddress(p.localAddrSrc, node) {
shortage-- if shortage == 0 {
redundantLocalCopy = true
break
} else { } else {
shortage--
}
} else if shortage > 0 {
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
_, err = p.remoteHeader.Head(callCtx, prm.WithNodeAddress(node)) _, err = p.remoteHeader.Head(callCtx, prm.WithNodeAddress(node))
@ -107,5 +113,7 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes
WithNodes(nodes). WithNodes(nodes).
WithCopiesNumber(shortage), WithCopiesNumber(shortage),
) )
} else if redundantLocalCopy {
p.cbRedundantCopy(addr)
} }
} }

View file

@ -4,6 +4,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network"
@ -25,6 +26,10 @@ type Policer struct {
// Option is an option for Policer constructor. // Option is an option for Policer constructor.
type Option func(*cfg) type Option func(*cfg)
// RedundantCopyCallback is a callback to pass
// the redundant local copy of the object.
type RedundantCopyCallback func(*object.Address)
type cfg struct { type cfg struct {
headTimeout time.Duration headTimeout time.Duration
@ -45,6 +50,8 @@ type cfg struct {
localAddrSrc network.LocalAddressSource localAddrSrc network.LocalAddressSource
replicator *replicator.Replicator replicator *replicator.Replicator
cbRedundantCopy RedundantCopyCallback
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
@ -148,3 +155,12 @@ func WithReplicator(v *replicator.Replicator) Option {
c.replicator = v c.replicator = v
} }
} }
// WithRedundantCopyCallback returns option to set
// callback to pass redundant local object copies
// detected by Policer.
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
return func(c *cfg) {
c.cbRedundantCopy = cb
}
}