policer: Simplify processRepNodes() checks #1605
2 changed files with 55 additions and 60 deletions
|
@ -110,6 +110,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
|||
|
||||
// Number of copies that are stored on maintenance nodes.
|
||||
var uncheckedCopies int
|
||||
var candidates []netmap.NodeInfo
|
||||
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -117,75 +118,68 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
|||
default:
|
||||
}
|
||||
|
||||
if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) {
|
||||
var err error
|
||||
st := checkedNodes.processStatus(nodes[i])
|
||||
if !st.Processed() {
|
||||
st, err = p.checkStatus(ctx, addr, nodes[i])
|
||||
checkedNodes.set(nodes[i], st)
|
||||
if st == nodeDoesNotHoldObject {
|
||||
// 1. This is the first time the node is encountered (`!st.Processed()`).
|
||||
// 2. The node does not hold object (`st == nodeDoesNotHoldObject`).
|
||||
// So we need to try to put an object to it.
|
||||
candidates = append(candidates, nodes[i])
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
switch st {
|
||||
case nodeIsLocal:
|
||||
requirements.needLocalCopy = true
|
||||
|
||||
shortage--
|
||||
} else if nodes[i].Status().IsMaintenance() {
|
||||
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
|
||||
} else {
|
||||
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
|
||||
if status == nodeHoldsObject {
|
||||
shortage--
|
||||
}
|
||||
if status == nodeIsUnderMaintenance {
|
||||
shortage--
|
||||
uncheckedCopies++
|
||||
}
|
||||
|
||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
||||
i--
|
||||
continue
|
||||
}
|
||||
|
||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||
|
||||
_, err := p.remoteHeader(callCtx, nodes[i], addr, false)
|
||||
|
||||
cancel()
|
||||
|
||||
if err == nil {
|
||||
shortage--
|
||||
checkedNodes.set(nodes[i], nodeHoldsObject)
|
||||
} else {
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
checkedNodes.set(nodes[i], nodeDoesNotHoldObject)
|
||||
continue
|
||||
} else if client.IsErrNodeUnderMaintenance(err) {
|
||||
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
|
||||
} else {
|
||||
p.log.Error(ctx, logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
|
||||
zap.Stringer("object", addr),
|
||||
zap.Error(err),
|
||||
)
|
||||
checkedNodes.set(nodes[i], nodeStatusUnknown)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
||||
i--
|
||||
}
|
||||
|
||||
p.handleProcessNodesResult(ctx, addr, requirements, nodes, checkedNodes, shortage, uncheckedCopies)
|
||||
}
|
||||
|
||||
// handleMaintenance handles node in maintenance mode and returns new shortage and uncheckedCopies values
|
||||
//
|
||||
// consider remote nodes under maintenance as problem OK. Such
|
||||
// nodes MAY not respond with object, however, this is how we
|
||||
// prevent spam with new replicas.
|
||||
// However, additional copies should not be removed in this case,
|
||||
// because we can remove the only copy this way.
|
||||
func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
|
||||
checkedNodes.set(node, nodeIsUnderMaintenance)
|
||||
case nodeIsUnderMaintenance:
|
||||
shortage--
|
||||
uncheckedCopies++
|
||||
|
||||
|
||||
p.log.Debug(ctx, logs.PolicerConsiderNodeUnderMaintenanceAsOK,
|
||||
zap.String("node", netmap.StringifyPublicKey(node)),
|
||||
)
|
||||
return shortage, uncheckedCopies
|
||||
zap.String("node", netmap.StringifyPublicKey(nodes[i])))
|
||||
case nodeHoldsObject:
|
||||
shortage--
|
||||
case nodeDoesNotHoldObject:
|
||||
case nodeStatusUnknown:
|
||||
p.log.Error(ctx, logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
|
||||
zap.Stringer("object", addr),
|
||||
zap.Error(err))
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
}
|
||||
|
||||
p.handleProcessNodesResult(ctx, addr, requirements, candidates, checkedNodes, shortage, uncheckedCopies)
|
||||
}
|
||||
|
||||
func (p *Policer) checkStatus(ctx context.Context, addr oid.Address, node netmap.NodeInfo) (nodeProcessStatus, error) {
|
||||
if p.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
return nodeIsLocal, nil
|
||||
}
|
||||
if node.Status().IsMaintenance() {
|
||||
return nodeIsUnderMaintenance, nil
|
||||
}
|
||||
|
||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||
_, err := p.remoteHeader(callCtx, node, addr, false)
|
||||
cancel()
|
||||
|
||||
if err == nil {
|
||||
return nodeHoldsObject, nil
|
||||
}
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
return nodeDoesNotHoldObject, nil
|
||||
}
|
||||
if client.IsErrNodeUnderMaintenance(err) {
|
||||
return nodeIsUnderMaintenance, nil
|
||||
}
|
||||
return nodeStatusUnknown, err
|
||||
}
|
||||
|
||||
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,
|
||||
|
|
|
@ -10,6 +10,7 @@ const (
|
|||
nodeHoldsObject
|
||||
nodeStatusUnknown
|
||||
nodeIsUnderMaintenance
|
||||
nodeIsLocal
|
||||
)
|
||||
|
||||
func (st nodeProcessStatus) Processed() bool {
|
||||
|
|
Loading…
Add table
Reference in a new issue
While we're here, is it possible to get rid of this nested
if
? For example, I can't understand what this condition means.Why is it that when we receive the error "object not found" from a node (
st == nodeDoesNotHoldObject
), and we have not previously requested information on this node (false == cached
), we do not remove the node from the list of nodes?I don't like it too, have already tried get rid of this
if
.Will try again harder, stay tuned.
@dstepanov-yadro I have updated the code, please take a look.