From 256165045b01e931dccb5d4056512ff5605f1b63 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 9 Jun 2022 22:11:35 +0300 Subject: [PATCH] [#1508] node: Do not replicate object twice If placement contains two vectors with intersecting nodes it was possible to send the object to the nodes twice. Also optimizes requests: do not ask about storing the object twice from the same node. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 2 ++ pkg/services/policer/check.go | 35 ++++++++++++++++++++++++++---- pkg/services/replicator/process.go | 13 ++++++++++- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c5d5fe56a..f294cb7b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ Changelog for NeoFS Node ### Fixed +- Do not replicate object twice to the same node (#1410) + ### Removed ### Updated diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 35202c35e..cb8e179f9 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -14,6 +14,12 @@ import ( "go.uber.org/zap" ) +type nodeCache map[uint64]bool + +func (n nodeCache) SubmitSuccessfulReplication(id uint64) { + n[id] = true +} + func (p *Policer) processObject(ctx context.Context, addr oid.Address) { idCnr := addr.Container() @@ -55,6 +61,14 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) { Context: ctx, } + var numOfContainerNodes int + for i := range nn { + numOfContainerNodes += len(nn[i]) + } + + // cached info about already checked nodes + var checkedNodes nodeCache = make(map[uint64]bool, numOfContainerNodes) + for i := range nn { select { case <-ctx.Done(): @@ -62,7 +76,7 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) { default: } - p.processNodes(c, addr, nn[i], replicas[i].Count()) + p.processNodes(c, addr, nn[i], replicas[i].Count(), checkedNodes) } if !c.needLocalCopy { @@ -80,7 +94,8 @@ type processPlacementContext struct { needLocalCopy bool } -func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, nodes netmap.Nodes, shortage uint32) { +func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, + nodes netmap.Nodes, shortage uint32, checkedNodes nodeCache) { prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr) for i := 0; shortage > 0 && i < len(nodes); i++ { @@ -95,6 +110,17 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, n shortage-- } else { + if hasReplica, checked := checkedNodes[nodes[i].ID]; checked { + if hasReplica { + // node already contains replica, no need to replicate + nodes = append(nodes[:i], nodes[i+1:]...) + i-- + shortage-- + } + + continue + } + callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) _, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i].NodeInfo)) @@ -107,6 +133,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, n } if client.IsErrObjectNotFound(err) { + checkedNodes[nodes[i].ID] = false continue } @@ -117,6 +144,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, n ) } else { shortage-- + checkedNodes[nodes[i].ID] = true } } @@ -130,12 +158,11 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, n zap.Uint32("shortage", shortage), ) - // TODO(@cthulhu-rider): replace to processObject in order to prevent repetitions - // in nodes for single object, see #1410 p.replicator.HandleTask(ctx, new(replicator.Task). WithObjectAddress(addr). WithNodes(nodes). WithCopiesNumber(shortage), + checkedNodes, ) } } diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index e29af250c..033f4fdc5 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -9,8 +9,17 @@ import ( "go.uber.org/zap" ) +// TaskResult is a replication result interface. +type TaskResult interface { + // SubmitSuccessfulReplication must save successful + // replication result. ID is a netmap identification + // of a node that accepted the replica. + SubmitSuccessfulReplication(id uint64) +} + // HandleTask executes replication task inside invoking goroutine. -func (p *Replicator) HandleTask(ctx context.Context, task *Task) { +// Passes all the nodes that accepted the replication to the TaskResult. +func (p *Replicator) HandleTask(ctx context.Context, task *Task, res TaskResult) { defer func() { p.log.Debug("finish work", zap.Uint32("amount of unfinished replicas", task.quantity), @@ -55,6 +64,8 @@ func (p *Replicator) HandleTask(ctx context.Context, task *Task) { log.Debug("object successfully replicated") task.quantity-- + + res.SubmitSuccessfulReplication(task.nodes[i].ID) } } }