policer: Simplify processRepNodes() checks #1605
2 changed files with 55 additions and 60 deletions
|
@ -110,6 +110,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
||||||
|
|
||||||
// Number of copies that are stored on maintenance nodes.
|
// Number of copies that are stored on maintenance nodes.
|
||||||
var uncheckedCopies int
|
var uncheckedCopies int
|
||||||
|
var candidates []netmap.NodeInfo
|
||||||
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -117,75 +118,68 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) {
|
var err error
|
||||||
requirements.needLocalCopy = true
|
st := checkedNodes.processStatus(nodes[i])
|
||||||
|
if !st.Processed() {
|
||||||
shortage--
|
st, err = p.checkStatus(ctx, addr, nodes[i])
|
||||||
} else if nodes[i].Status().IsMaintenance() {
|
checkedNodes.set(nodes[i], st)
|
||||||
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
|
if st == nodeDoesNotHoldObject {
|
||||||
} else {
|
// 1. This is the first time the node is encountered (`!st.Processed()`).
|
||||||
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
|
// 2. The node does not hold object (`st == nodeDoesNotHoldObject`).
|
||||||
if status == nodeHoldsObject {
|
// So we need to try to put an object to it.
|
||||||
shortage--
|
candidates = append(candidates, nodes[i])
|
||||||
}
|
|
||||||
if status == nodeIsUnderMaintenance {
|
|
||||||
shortage--
|
|
||||||
uncheckedCopies++
|
|
||||||
}
|
|
||||||
|
|
||||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
|
||||||
i--
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
|
||||||
|
|
||||||
_, err := p.remoteHeader(callCtx, nodes[i], addr, false)
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
shortage--
|
|
||||||
checkedNodes.set(nodes[i], nodeHoldsObject)
|
|
||||||
} else {
|
|
||||||
if client.IsErrObjectNotFound(err) {
|
|
||||||
checkedNodes.set(nodes[i], nodeDoesNotHoldObject)
|
|
||||||
continue
|
|
||||||
} else if client.IsErrNodeUnderMaintenance(err) {
|
|
||||||
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
|
|
||||||
} else {
|
|
||||||
p.log.Error(ctx, logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
|
|
||||||
zap.Stringer("object", addr),
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
checkedNodes.set(nodes[i], nodeStatusUnknown)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
switch st {
|
||||||
i--
|
case nodeIsLocal:
|
||||||
|
requirements.needLocalCopy = true
|
||||||
|
|
||||||
|
shortage--
|
||||||
|
case nodeIsUnderMaintenance:
|
||||||
|
shortage--
|
||||||
|
uncheckedCopies++
|
||||||
|
|||||||
|
|
||||||
|
p.log.Debug(ctx, logs.PolicerConsiderNodeUnderMaintenanceAsOK,
|
||||||
|
zap.String("node", netmap.StringifyPublicKey(nodes[i])))
|
||||||
|
case nodeHoldsObject:
|
||||||
|
shortage--
|
||||||
|
case nodeDoesNotHoldObject:
|
||||||
|
case nodeStatusUnknown:
|
||||||
|
p.log.Error(ctx, logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
|
||||||
|
zap.Stringer("object", addr),
|
||||||
|
zap.Error(err))
|
||||||
|
default:
|
||||||
|
panic("unreachable")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.handleProcessNodesResult(ctx, addr, requirements, nodes, checkedNodes, shortage, uncheckedCopies)
|
p.handleProcessNodesResult(ctx, addr, requirements, candidates, checkedNodes, shortage, uncheckedCopies)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleMaintenance handles node in maintenance mode and returns new shortage and uncheckedCopies values
|
func (p *Policer) checkStatus(ctx context.Context, addr oid.Address, node netmap.NodeInfo) (nodeProcessStatus, error) {
|
||||||
//
|
if p.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||||
// consider remote nodes under maintenance as problem OK. Such
|
return nodeIsLocal, nil
|
||||||
// nodes MAY not respond with object, however, this is how we
|
}
|
||||||
// prevent spam with new replicas.
|
if node.Status().IsMaintenance() {
|
||||||
// However, additional copies should not be removed in this case,
|
return nodeIsUnderMaintenance, nil
|
||||||
// because we can remove the only copy this way.
|
}
|
||||||
func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
|
|
||||||
checkedNodes.set(node, nodeIsUnderMaintenance)
|
|
||||||
shortage--
|
|
||||||
uncheckedCopies++
|
|
||||||
|
|
||||||
p.log.Debug(ctx, logs.PolicerConsiderNodeUnderMaintenanceAsOK,
|
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||||
zap.String("node", netmap.StringifyPublicKey(node)),
|
_, err := p.remoteHeader(callCtx, node, addr, false)
|
||||||
)
|
cancel()
|
||||||
return shortage, uncheckedCopies
|
|
||||||
|
if err == nil {
|
||||||
|
return nodeHoldsObject, nil
|
||||||
|
}
|
||||||
|
if client.IsErrObjectNotFound(err) {
|
||||||
|
return nodeDoesNotHoldObject, nil
|
||||||
|
}
|
||||||
|
if client.IsErrNodeUnderMaintenance(err) {
|
||||||
|
return nodeIsUnderMaintenance, nil
|
||||||
|
}
|
||||||
|
return nodeStatusUnknown, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,
|
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,
|
||||||
|
|
|
@ -10,6 +10,7 @@ const (
|
||||||
nodeHoldsObject
|
nodeHoldsObject
|
||||||
nodeStatusUnknown
|
nodeStatusUnknown
|
||||||
nodeIsUnderMaintenance
|
nodeIsUnderMaintenance
|
||||||
|
nodeIsLocal
|
||||||
)
|
)
|
||||||
|
|
||||||
func (st nodeProcessStatus) Processed() bool {
|
func (st nodeProcessStatus) Processed() bool {
|
||||||
|
|
Loading…
Add table
Reference in a new issue
While we're here, is it possible to get rid of this nested
if
? For example, I can't understand what this condition means.Why is it that when we receive the error "object not found" from a node (
st == nodeDoesNotHoldObject
), and we have not previously requested information on this node (false == cached
), we do not remove the node from the list of nodes?I don't like it too, have already tried get rid of this
if
.Will try again harder, stay tuned.
@dstepanov-yadro I have updated the code, please take a look.