policer: Simplify processRepNodes() checks #1605
2 changed files with 55 additions and 60 deletions
|
@ -110,6 +110,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
||||||
|
|
||||||
// Number of copies that are stored on maintenance nodes.
|
// Number of copies that are stored on maintenance nodes.
|
||||||
var uncheckedCopies int
|
var uncheckedCopies int
|
||||||
|
var candidates []netmap.NodeInfo
|
||||||
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -117,75 +118,68 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) {
|
var err error
|
||||||
|
st := checkedNodes.processStatus(nodes[i])
|
||||||
|
if !st.Processed() {
|
||||||
|
st, err = p.checkStatus(ctx, addr, nodes[i])
|
||||||
|
checkedNodes.set(nodes[i], st)
|
||||||
|
if st == nodeDoesNotHoldObject {
|
||||||
|
// 1. This is the first time the node is encountered (`!st.Processed()`).
|
||||||
|
// 2. The node does not hold object (`st == nodeDoesNotHoldObject`).
|
||||||
|
// So we need to try to put an object to it.
|
||||||
|
candidates = append(candidates, nodes[i])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
switch st {
|
||||||
|
case nodeIsLocal:
|
||||||
requirements.needLocalCopy = true
|
requirements.needLocalCopy = true
|
||||||
|
|
||||||
shortage--
|
shortage--
|
||||||
} else if nodes[i].Status().IsMaintenance() {
|
case nodeIsUnderMaintenance:
|
||||||
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
|
|
||||||
} else {
|
|
||||||
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
|
|
||||||
if status == nodeHoldsObject {
|
|
||||||
shortage--
|
|
||||||
}
|
|
||||||
if status == nodeIsUnderMaintenance {
|
|
||||||
shortage--
|
|
||||||
uncheckedCopies++
|
|
||||||
}
|
|
||||||
|
|
||||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
|
||||||
i--
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
|
||||||
|
|
||||||
_, err := p.remoteHeader(callCtx, nodes[i], addr, false)
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
shortage--
|
|
||||||
checkedNodes.set(nodes[i], nodeHoldsObject)
|
|
||||||
} else {
|
|
||||||
if client.IsErrObjectNotFound(err) {
|
|
||||||
checkedNodes.set(nodes[i], nodeDoesNotHoldObject)
|
|
||||||
continue
|
|
||||||
} else if client.IsErrNodeUnderMaintenance(err) {
|
|
||||||
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
|
|
||||||
} else {
|
|
||||||
p.log.Error(ctx, logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
|
|
||||||
zap.Stringer("object", addr),
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
checkedNodes.set(nodes[i], nodeStatusUnknown)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
|
||||||
i--
|
|
||||||
}
|
|
||||||
|
|
||||||
p.handleProcessNodesResult(ctx, addr, requirements, nodes, checkedNodes, shortage, uncheckedCopies)
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleMaintenance handles node in maintenance mode and returns new shortage and uncheckedCopies values
|
|
||||||
//
|
|
||||||
// consider remote nodes under maintenance as problem OK. Such
|
|
||||||
// nodes MAY not respond with object, however, this is how we
|
|
||||||
// prevent spam with new replicas.
|
|
||||||
// However, additional copies should not be removed in this case,
|
|
||||||
// because we can remove the only copy this way.
|
|
||||||
func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
|
|
||||||
checkedNodes.set(node, nodeIsUnderMaintenance)
|
|
||||||
shortage--
|
shortage--
|
||||||
uncheckedCopies++
|
uncheckedCopies++
|
||||||
|
|
||||||
p.log.Debug(ctx, logs.PolicerConsiderNodeUnderMaintenanceAsOK,
|
p.log.Debug(ctx, logs.PolicerConsiderNodeUnderMaintenanceAsOK,
|
||||||
zap.String("node", netmap.StringifyPublicKey(node)),
|
zap.String("node", netmap.StringifyPublicKey(nodes[i])))
|
||||||
)
|
case nodeHoldsObject:
|
||||||
return shortage, uncheckedCopies
|
shortage--
|
||||||
|
case nodeDoesNotHoldObject:
|
||||||
|
case nodeStatusUnknown:
|
||||||
|
p.log.Error(ctx, logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
|
||||||
|
zap.Stringer("object", addr),
|
||||||
|
zap.Error(err))
|
||||||
|
default:
|
||||||
|
panic("unreachable")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p.handleProcessNodesResult(ctx, addr, requirements, candidates, checkedNodes, shortage, uncheckedCopies)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) checkStatus(ctx context.Context, addr oid.Address, node netmap.NodeInfo) (nodeProcessStatus, error) {
|
||||||
|
if p.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||||
|
return nodeIsLocal, nil
|
||||||
|
}
|
||||||
|
if node.Status().IsMaintenance() {
|
||||||
|
return nodeIsUnderMaintenance, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||||
|
_, err := p.remoteHeader(callCtx, node, addr, false)
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
return nodeHoldsObject, nil
|
||||||
|
}
|
||||||
|
if client.IsErrObjectNotFound(err) {
|
||||||
|
return nodeDoesNotHoldObject, nil
|
||||||
|
}
|
||||||
|
if client.IsErrNodeUnderMaintenance(err) {
|
||||||
|
return nodeIsUnderMaintenance, nil
|
||||||
|
}
|
||||||
|
return nodeStatusUnknown, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,
|
func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements,
|
||||||
|
|
|
@ -10,6 +10,7 @@ const (
|
||||||
nodeHoldsObject
|
nodeHoldsObject
|
||||||
nodeStatusUnknown
|
nodeStatusUnknown
|
||||||
nodeIsUnderMaintenance
|
nodeIsUnderMaintenance
|
||||||
|
nodeIsLocal
|
||||||
)
|
)
|
||||||
|
|
||||||
func (st nodeProcessStatus) Processed() bool {
|
func (st nodeProcessStatus) Processed() bool {
|
||||||
|
|
Loading…
Add table
Reference in a new issue