diff --git a/CHANGELOG.md b/CHANGELOG.md index 4941572667..8f8dc25e8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Changelog for NeoFS Node - Basic income transfer's incorrect log message (#1374) - Listen to subnet removal events in notary-enabled env (#1224) - Update/remove nodes whose subnet has been removed (#1162) +- Potential removal of local object when policy isn't complied (#1335) ## [0.28.1] - 2022-05-05 diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index e687cb6880..a92e80e292 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -2,12 +2,12 @@ package policer import ( "context" - "strings" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" + "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/netmap" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" "go.uber.org/zap" @@ -50,6 +50,9 @@ func (p *Policer) processObject(ctx context.Context, addr *addressSDK.Address) { } replicas := policy.Replicas() + c := &processPlacementContext{ + Context: ctx, + } for i := range nn { select { @@ -58,19 +61,32 @@ func (p *Policer) processObject(ctx context.Context, addr *addressSDK.Address) { default: } - p.processNodes(ctx, addr, nn[i], replicas[i].Count()) + p.processNodes(c, addr, nn[i], replicas[i].Count()) + } + + if !c.needLocalCopy { + p.log.Info("redundant local object copy detected", + zap.Stringer("object", addr), + ) + + p.cbRedundantCopy(addr) } } -func (p *Policer) processNodes(ctx context.Context, addr *addressSDK.Address, nodes netmap.Nodes, shortage uint32) { +type processPlacementContext struct { + context.Context + + needLocalCopy bool +} + +func (p *Policer) processNodes(ctx *processPlacementContext, addr *addressSDK.Address, nodes netmap.Nodes, shortage uint32) { log := p.log.With( zap.Stringer("object", addr), ) prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr) - redundantLocalCopy := false - for i := 0; i < len(nodes); i++ { + for i := 0; shortage > 0 && i < len(nodes); i++ { select { case <-ctx.Done(): return @@ -78,35 +94,29 @@ func (p *Policer) processNodes(ctx context.Context, addr *addressSDK.Address, no } if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) { - if shortage == 0 { - // we can call the redundant copy callback - // here to slightly improve the performance - // instead of readability. - redundantLocalCopy = true - break - } else { - shortage-- - } - } else if shortage > 0 { - callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) + ctx.needLocalCopy = true - _, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i].NodeInfo)) + shortage-- - cancel() + continue + } + callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) + + _, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i].NodeInfo)) + + cancel() + + if !client.IsErrObjectNotFound(err) { if err != nil { - if strings.Contains(err.Error(), headsvc.ErrNotFound.Error()) { - continue - } else { - log.Debug("could not receive object header", - zap.String("error", err.Error()), - ) - - continue - } + log.Error("receive object header to check policy compliance", + zap.String("error", err.Error()), + ) } else { shortage-- } + + continue } nodes = append(nodes[:i], nodes[i+1:]...) @@ -118,14 +128,12 @@ func (p *Policer) processNodes(ctx context.Context, addr *addressSDK.Address, no zap.Uint32("shortage", shortage), ) + // TODO(@cthulhu-rider): replace to processObject in order to prevent repetitions + // in nodes for single object, see #1410 p.replicator.HandleTask(ctx, new(replicator.Task). WithObjectAddress(addr). WithNodes(nodes). WithCopiesNumber(shortage), ) - } else if redundantLocalCopy { - log.Info("redundant local object copy detected") - - p.cbRedundantCopy(addr) } }