[#1508] node: Do not replicate object twice
If placement contains two vectors with intersecting nodes it was possible to send the object to the nodes twice. Also optimizes requests: do not ask about storing the object twice from the same node. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
df8a3807fe
commit
256165045b
3 changed files with 45 additions and 5 deletions
|
@ -9,6 +9,8 @@ Changelog for NeoFS Node
|
|||
|
||||
### Fixed
|
||||
|
||||
- Do not replicate object twice to the same node (#1410)
|
||||
|
||||
### Removed
|
||||
|
||||
### Updated
|
||||
|
|
|
@ -14,6 +14,12 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type nodeCache map[uint64]bool
|
||||
|
||||
func (n nodeCache) SubmitSuccessfulReplication(id uint64) {
|
||||
n[id] = true
|
||||
}
|
||||
|
||||
func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
|
||||
idCnr := addr.Container()
|
||||
|
||||
|
@ -55,6 +61,14 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
|
|||
Context: ctx,
|
||||
}
|
||||
|
||||
var numOfContainerNodes int
|
||||
for i := range nn {
|
||||
numOfContainerNodes += len(nn[i])
|
||||
}
|
||||
|
||||
// cached info about already checked nodes
|
||||
var checkedNodes nodeCache = make(map[uint64]bool, numOfContainerNodes)
|
||||
|
||||
for i := range nn {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -62,7 +76,7 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
|
|||
default:
|
||||
}
|
||||
|
||||
p.processNodes(c, addr, nn[i], replicas[i].Count())
|
||||
p.processNodes(c, addr, nn[i], replicas[i].Count(), checkedNodes)
|
||||
}
|
||||
|
||||
if !c.needLocalCopy {
|
||||
|
@ -80,7 +94,8 @@ type processPlacementContext struct {
|
|||
needLocalCopy bool
|
||||
}
|
||||
|
||||
func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, nodes netmap.Nodes, shortage uint32) {
|
||||
func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address,
|
||||
nodes netmap.Nodes, shortage uint32, checkedNodes nodeCache) {
|
||||
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
|
||||
|
||||
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
||||
|
@ -95,6 +110,17 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, n
|
|||
|
||||
shortage--
|
||||
} else {
|
||||
if hasReplica, checked := checkedNodes[nodes[i].ID]; checked {
|
||||
if hasReplica {
|
||||
// node already contains replica, no need to replicate
|
||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
||||
i--
|
||||
shortage--
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||
|
||||
_, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i].NodeInfo))
|
||||
|
@ -107,6 +133,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, n
|
|||
}
|
||||
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
checkedNodes[nodes[i].ID] = false
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -117,6 +144,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, n
|
|||
)
|
||||
} else {
|
||||
shortage--
|
||||
checkedNodes[nodes[i].ID] = true
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -130,12 +158,11 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, n
|
|||
zap.Uint32("shortage", shortage),
|
||||
)
|
||||
|
||||
// TODO(@cthulhu-rider): replace to processObject in order to prevent repetitions
|
||||
// in nodes for single object, see #1410
|
||||
p.replicator.HandleTask(ctx, new(replicator.Task).
|
||||
WithObjectAddress(addr).
|
||||
WithNodes(nodes).
|
||||
WithCopiesNumber(shortage),
|
||||
checkedNodes,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,8 +9,17 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// TaskResult is a replication result interface.
|
||||
type TaskResult interface {
|
||||
// SubmitSuccessfulReplication must save successful
|
||||
// replication result. ID is a netmap identification
|
||||
// of a node that accepted the replica.
|
||||
SubmitSuccessfulReplication(id uint64)
|
||||
}
|
||||
|
||||
// HandleTask executes replication task inside invoking goroutine.
|
||||
func (p *Replicator) HandleTask(ctx context.Context, task *Task) {
|
||||
// Passes all the nodes that accepted the replication to the TaskResult.
|
||||
func (p *Replicator) HandleTask(ctx context.Context, task *Task, res TaskResult) {
|
||||
defer func() {
|
||||
p.log.Debug("finish work",
|
||||
zap.Uint32("amount of unfinished replicas", task.quantity),
|
||||
|
@ -55,6 +64,8 @@ func (p *Replicator) HandleTask(ctx context.Context, task *Task) {
|
|||
log.Debug("object successfully replicated")
|
||||
|
||||
task.quantity--
|
||||
|
||||
res.SubmitSuccessfulReplication(task.nodes[i].ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue