policer: Simplify processRepNodes() checks #1605

Merged
fyrchik merged 2 commits from fyrchik/frostfs-node:refactor-policer into master 2025-01-21 05:34:55 +00:00
2 changed files with 55 additions and 60 deletions

View file

@ -110,6 +110,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
// Number of copies that are stored on maintenance nodes.
var uncheckedCopies int
var candidates []netmap.NodeInfo
for i := 0; shortage > 0 && i < len(nodes); i++ {
select {
case <-ctx.Done():
@ -117,75 +118,68 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
default:
}
if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) {
var err error
st := checkedNodes.processStatus(nodes[i])
if !st.Processed() {
st, err = p.checkStatus(ctx, addr, nodes[i])
checkedNodes.set(nodes[i], st)
if st == nodeDoesNotHoldObject {
// 1. This is the first time the node is encountered (`!st.Processed()`).
// 2. The node does not hold object (`st == nodeDoesNotHoldObject`).
// So we need to try to put an object to it.
candidates = append(candidates, nodes[i])
continue
}
}
switch st {
case nodeIsLocal:
requirements.needLocalCopy = true
shortage--
} else if nodes[i].Status().IsMaintenance() {
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
} else {
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
if status == nodeHoldsObject {
shortage--
}
if status == nodeIsUnderMaintenance {
shortage--
uncheckedCopies++
}
nodes = append(nodes[:i], nodes[i+1:]...)
i--
continue
}
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
_, err := p.remoteHeader(callCtx, nodes[i], addr, false)
cancel()
if err == nil {
shortage--
checkedNodes.set(nodes[i], nodeHoldsObject)
} else {
if client.IsErrObjectNotFound(err) {
checkedNodes.set(nodes[i], nodeDoesNotHoldObject)
continue
} else if client.IsErrNodeUnderMaintenance(err) {
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
} else {
p.log.Error(ctx, logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
zap.Stringer("object", addr),
zap.Error(err),
)
checkedNodes.set(nodes[i], nodeStatusUnknown)
}
}
}
nodes = append(nodes[:i], nodes[i+1:]...)
i--
}
p.handleProcessNodesResult(ctx, addr, requirements, nodes, checkedNodes, shortage, uncheckedCopies)
}
// handleMaintenance handles node in maintenance mode and returns new shortage and uncheckedCopies values
//
// consider remote nodes under maintenance as problem OK. Such
// nodes MAY not respond with object, however, this is how we
// prevent spam with new replicas.
// However, additional copies should not be removed in this case,
// because we can remove the only copy this way.
func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
checkedNodes.set(node, nodeIsUnderMaintenance)
case nodeIsUnderMaintenance:
shortage--
uncheckedCopies++

While we're here, is it possible to get rid of this nested if? For example, I can't understand what this condition means.

While we're here, is it possible to get rid of this nested `if`? For example, I can't understand what this condition means.

Why is it that when we receive the error "object not found" from a node (st == nodeDoesNotHoldObject), and we have not previously requested information on this node (false == cached), we do not remove the node from the list of nodes?

Why is it that when we receive the error "object not found" from a node (`st == nodeDoesNotHoldObject`), and we have not previously requested information on this node (`false == cached`), we do not remove the node from the list of nodes?

I don't like it too, have already tried get rid of this if.
Will try again harder, stay tuned.

I don't like it too, have already tried get rid of this `if`. Will try again harder, stay tuned.

@dstepanov-yadro I have updated the code, please take a look.

@dstepanov-yadro I have updated the code, please take a look.
p.log.Debug(ctx, logs.PolicerConsiderNodeUnderMaintenanceAsOK,
zap.String("node", netmap.StringifyPublicKey(node)),
)
return shortage, uncheckedCopies
zap.String("node", netmap.StringifyPublicKey(nodes[i])))
case nodeHoldsObject:
shortage--
case nodeDoesNotHoldObject:
case nodeStatusUnknown:
p.log.Error(ctx, logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
zap.Stringer("object", addr),
zap.Error(err))
default:
panic("unreachable")
}
}
p.handleProcessNodesResult(ctx, addr, requirements, candidates, checkedNodes, shortage, uncheckedCopies)
}
func (p *Policer) checkStatus(ctx context.Context, addr oid.Address, node netmap.NodeInfo) (nodeProcessStatus, error) {
if p.netmapKeys.IsLocalKey(node.PublicKey()) {
return nodeIsLocal, nil
}
if node.Status().IsMaintenance() {
return nodeIsUnderMaintenance, nil
}
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
_, err := p.remoteHeader(callCtx, node, addr, false)
cancel()
if err == nil {
return nodeHoldsObject, nil
}
if client.IsErrObjectNotFound(err) {
return nodeDoesNotHoldObject, nil
}
if client.IsErrNodeUnderMaintenance(err) {
return nodeIsUnderMaintenance, nil
}
return nodeStatusUnknown, err
}
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,

View file

@ -10,6 +10,7 @@ const (
nodeHoldsObject
nodeStatusUnknown
nodeIsUnderMaintenance
nodeIsLocal
)
func (st nodeProcessStatus) Processed() bool {