Fix maintenance node processing in policer #1604
4 changed files with 42 additions and 32 deletions
|
@ -126,12 +126,15 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
|||
} else {
|
||||
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
|
||||
if status == nodeHoldsObject {
|
||||
// node already contains replica, no need to replicate
|
||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
||||
i--
|
||||
shortage--
|
||||
}
|
||||
if status == nodeIsUnderMaintenance {
|
||||
shortage--
|
||||
uncheckedCopies++
|
||||
}
|
||||
|
||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
||||
i--
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -143,10 +146,10 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
|||
|
||||
if err == nil {
|
||||
shortage--
|
||||
checkedNodes.submitReplicaHolder(nodes[i])
|
||||
checkedNodes.set(nodes[i], nodeHoldsObject)
|
||||
} else {
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
checkedNodes.submitReplicaCandidate(nodes[i])
|
||||
checkedNodes.set(nodes[i], nodeDoesNotHoldObject)
|
||||
continue
|
||||
} else if client.IsErrNodeUnderMaintenance(err) {
|
||||
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
|
||||
|
@ -155,6 +158,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
|||
zap.Stringer("object", addr),
|
||||
zap.Error(err),
|
||||
)
|
||||
checkedNodes.set(nodes[i], nodeStatusUnknown)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -174,7 +178,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
|||
// However, additional copies should not be removed in this case,
|
||||
// because we can remove the only copy this way.
|
||||
func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
|
||||
checkedNodes.submitReplicaHolder(node)
|
||||
checkedNodes.set(node, nodeIsUnderMaintenance)
|
||||
shortage--
|
||||
uncheckedCopies++
|
||||
|
||||
|
|
|
@ -16,9 +16,9 @@ func TestNodeCache(t *testing.T) {
|
|||
cache.SubmitSuccessfulReplication(node)
|
||||
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
|
||||
|
||||
cache.submitReplicaCandidate(node)
|
||||
cache.set(node, nodeDoesNotHoldObject)
|
||||
require.Equal(t, cache.processStatus(node), nodeDoesNotHoldObject)
|
||||
|
||||
cache.submitReplicaHolder(node)
|
||||
cache.set(node, nodeHoldsObject)
|
||||
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
|
||||
}
|
||||
|
|
|
@ -8,6 +8,8 @@ const (
|
|||
nodeNotProcessed nodeProcessStatus = iota
|
||||
nodeDoesNotHoldObject
|
||||
nodeHoldsObject
|
||||
nodeStatusUnknown
|
||||
nodeIsUnderMaintenance
|
||||
)
|
||||
|
||||
func (st nodeProcessStatus) Processed() bool {
|
||||
|
@ -15,37 +17,19 @@ func (st nodeProcessStatus) Processed() bool {
|
|||
}
|
||||
|
||||
// nodeCache tracks Policer's check progress.
|
||||
type nodeCache map[uint64]bool
|
||||
type nodeCache map[uint64]nodeProcessStatus
|
||||
|
||||
func newNodeCache() nodeCache {
|
||||
return make(map[uint64]bool)
|
||||
return make(map[uint64]nodeProcessStatus)
|
||||
}
|
||||
|
||||
func (n nodeCache) set(node netmap.NodeInfo, val bool) {
|
||||
func (n nodeCache) set(node netmap.NodeInfo, val nodeProcessStatus) {
|
||||
n[node.Hash()] = val
|
||||
}
|
||||
|
||||
// submits storage node as a candidate to store the object replica in case of
|
||||
// shortage.
|
||||
func (n nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
|
||||
n.set(node, false)
|
||||
}
|
||||
|
||||
// submits storage node as a current object replica holder.
|
||||
func (n nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
|
||||
n.set(node, true)
|
||||
}
|
||||
|
||||
// processStatus returns current processing status of the storage node.
|
||||
func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
|
||||
switch val, ok := n[node.Hash()]; {
|
||||
case !ok:
|
||||
return nodeNotProcessed
|
||||
case val:
|
||||
return nodeHoldsObject
|
||||
default:
|
||||
return nodeDoesNotHoldObject
|
||||
}
|
||||
return n[node.Hash()]
|
||||
}
|
||||
|
||||
// SubmitSuccessfulReplication marks given storage node as a current object
|
||||
|
@ -53,5 +37,5 @@ func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
|
|||
//
|
||||
// SubmitSuccessfulReplication implements replicator.TaskResult.
|
||||
func (n nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
|
||||
n.submitReplicaHolder(node)
|
||||
n.set(node, nodeHoldsObject)
|
||||
}
|
||||
|
|
|
@ -78,6 +78,7 @@ func TestProcessObject(t *testing.T) {
|
|||
maintenanceNodes []int
|
||||
wantRemoveRedundant bool
|
||||
wantReplicateTo []int
|
||||
headResult map[int]error
|
||||
ecInfo *objectcore.ECInfo
|
||||
}{
|
||||
{
|
||||
|
@ -127,7 +128,7 @@ func TestProcessObject(t *testing.T) {
|
|||
nodeCount: 2,
|
||||
policy: `REP 2 REP 2`,
|
||||
placement: [][]int{{0, 1}, {0, 1}},
|
||||
wantReplicateTo: []int{1, 1}, // is this actually good?
|
||||
|
||||
wantReplicateTo: []int{1},
|
||||
},
|
||||
{
|
||||
desc: "lock object must be replicated to all nodes",
|
||||
|
@ -145,6 +146,14 @@ func TestProcessObject(t *testing.T) {
|
|||
objHolders: []int{1},
|
||||
maintenanceNodes: []int{2},
|
||||
},
|
||||
{
|
||||
desc: "preserve local copy when node response with MAINTENANCE",
|
||||
nodeCount: 3,
|
||||
policy: `REP 2`,
|
||||
placement: [][]int{{1, 2}},
|
||||
objHolders: []int{1},
|
||||
headResult: map[int]error{2: new(apistatus.NodeUnderMaintenance)},
|
||||
},
|
||||
{
|
||||
desc: "lock object must be replicated to all EC nodes",
|
||||
objType: objectSDK.TypeLock,
|
||||
|
@ -161,6 +170,14 @@ func TestProcessObject(t *testing.T) {
|
|||
placement: [][]int{{0, 1, 2}},
|
||||
wantReplicateTo: []int{1, 2},
|
||||
},
|
||||
{
|
||||
desc: "do not remove local copy when MAINTENANCE status is cached",
|
||||
objType: objectSDK.TypeRegular,
|
||||
nodeCount: 3,
|
||||
policy: `REP 1 REP 1`,
|
||||
placement: [][]int{{1, 2}, {1, 0}},
|
||||
headResult: map[int]error{1: new(apistatus.NodeUnderMaintenance)},
|
||||
},
|
||||
}
|
||||
|
||||
for i := range tests {
|
||||
|
@ -204,6 +221,11 @@ func TestProcessObject(t *testing.T) {
|
|||
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
|
||||
return nil, errors.New("unexpected object head")
|
||||
}
|
||||
if ti.headResult != nil {
|
||||
if err, ok := ti.headResult[index]; ok {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for _, i := range ti.objHolders {
|
||||
if index == i {
|
||||
return nil, nil
|
||||
|
|
Loading…
Add table
Reference in a new issue
The question of @ale64bit was finally answered: no, it is not :)