policer: Simplify processRepNodes() checks #1605
2 changed files with 55 additions and 60 deletions
|
@ -110,6 +110,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
|||
|
||||
// Number of copies that are stored on maintenance nodes.
|
||||
var uncheckedCopies int
|
||||
var candidates []netmap.NodeInfo
|
||||
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -117,75 +118,68 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
|||
default:
|
||||
}
|
||||
|
||||
if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) {
|
||||
requirements.needLocalCopy = true
|
||||
|
||||
shortage--
|
||||
} else if nodes[i].Status().IsMaintenance() {
|
||||
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
|
||||
} else {
|
||||
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
|
||||
if status == nodeHoldsObject {
|
||||
shortage--
|
||||
}
|
||||
if status == nodeIsUnderMaintenance {
|
||||
shortage--
|
||||
uncheckedCopies++
|
||||
}
|
||||
|
||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
||||
i--
|
||||
var err error
|
||||
st := checkedNodes.processStatus(nodes[i])
|
||||
if !st.Processed() {
|
||||
st, err = p.checkStatus(ctx, addr, nodes[i])
|
||||
checkedNodes.set(nodes[i], st)
|
||||
if st == nodeDoesNotHoldObject {
|
||||
// 1. This is the first time the node is encountered (`!st.Processed()`).
|
||||
// 2. The node does not hold object (`st == nodeDoesNotHoldObject`).
|
||||
// So we need to try to put an object to it.
|
||||
candidates = append(candidates, nodes[i])
|
||||
continue
|
||||
}
|
||||
|
||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||
|
||||
_, err := p.remoteHeader(callCtx, nodes[i], addr, false)
|
||||
|
||||
cancel()
|
||||
|
||||
if err == nil {
|
||||
shortage--
|
||||
checkedNodes.set(nodes[i], nodeHoldsObject)
|
||||
} else {
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
checkedNodes.set(nodes[i], nodeDoesNotHoldObject)
|
||||
continue
|
||||
} else if client.IsErrNodeUnderMaintenance(err) {
|
||||
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
|
||||
} else {
|
||||
p.log.Error(ctx, logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
|
||||
zap.Stringer("object", addr),
|
||||
zap.Error(err),
|
||||
)
|
||||
checkedNodes.set(nodes[i], nodeStatusUnknown)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
||||
i--
|
||||
switch st {
|
||||
case nodeIsLocal:
|
||||
requirements.needLocalCopy = true
|
||||
|
||||
shortage--
|
||||
case nodeIsUnderMaintenance:
|
||||
shortage--
|
||||
uncheckedCopies++
|
||||
|
||||
|
||||
p.log.Debug(ctx, logs.PolicerConsiderNodeUnderMaintenanceAsOK,
|
||||
zap.String("node", netmap.StringifyPublicKey(nodes[i])))
|
||||
case nodeHoldsObject:
|
||||
shortage--
|
||||
case nodeDoesNotHoldObject:
|
||||
case nodeStatusUnknown:
|
||||
p.log.Error(ctx, logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
|
||||
zap.Stringer("object", addr),
|
||||
zap.Error(err))
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
}
|
||||
|
||||
p.handleProcessNodesResult(ctx, addr, requirements, nodes, checkedNodes, shortage, uncheckedCopies)
|
||||
p.handleProcessNodesResult(ctx, addr, requirements, candidates, checkedNodes, shortage, uncheckedCopies)
|
||||
}
|
||||
|
||||
// handleMaintenance handles node in maintenance mode and returns new shortage and uncheckedCopies values
|
||||
//
|
||||
// consider remote nodes under maintenance as problem OK. Such
|
||||
// nodes MAY not respond with object, however, this is how we
|
||||
// prevent spam with new replicas.
|
||||
// However, additional copies should not be removed in this case,
|
||||
// because we can remove the only copy this way.
|
||||
func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
|
||||
checkedNodes.set(node, nodeIsUnderMaintenance)
|
||||
shortage--
|
||||
uncheckedCopies++
|
||||
func (p *Policer) checkStatus(ctx context.Context, addr oid.Address, node netmap.NodeInfo) (nodeProcessStatus, error) {
|
||||
if p.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
return nodeIsLocal, nil
|
||||
}
|
||||
if node.Status().IsMaintenance() {
|
||||
return nodeIsUnderMaintenance, nil
|
||||
}
|
||||
|
||||
p.log.Debug(ctx, logs.PolicerConsiderNodeUnderMaintenanceAsOK,
|
||||
zap.String("node", netmap.StringifyPublicKey(node)),
|
||||
)
|
||||
return shortage, uncheckedCopies
|
||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||
_, err := p.remoteHeader(callCtx, node, addr, false)
|
||||
cancel()
|
||||
|
||||
if err == nil {
|
||||
return nodeHoldsObject, nil
|
||||
}
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
return nodeDoesNotHoldObject, nil
|
||||
}
|
||||
if client.IsErrNodeUnderMaintenance(err) {
|
||||
return nodeIsUnderMaintenance, nil
|
||||
}
|
||||
return nodeStatusUnknown, err
|
||||
}
|
||||
|
||||
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,
|
||||
|
|
|
@ -10,6 +10,7 @@ const (
|
|||
nodeHoldsObject
|
||||
nodeStatusUnknown
|
||||
nodeIsUnderMaintenance
|
||||
nodeIsLocal
|
||||
)
|
||||
|
||||
func (st nodeProcessStatus) Processed() bool {
|
||||
|
|
Loading…
Add table
Reference in a new issue
While we're here, is it possible to get rid of this nested
if
? For example, I can't understand what this condition means.Why is it that when we receive the error "object not found" from a node (
st == nodeDoesNotHoldObject
), and we have not previously requested information on this node (false == cached
), we do not remove the node from the list of nodes?I don't like it too, have already tried get rid of this
if
.Will try again harder, stay tuned.
@dstepanov-yadro I have updated the code, please take a look.