Fix maintenance node processing in policer #1604
4 changed files with 42 additions and 32 deletions
|
@ -126,12 +126,15 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
||||||
} else {
|
} else {
|
||||||
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
|
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
|
||||||
if status == nodeHoldsObject {
|
if status == nodeHoldsObject {
|
||||||
// node already contains replica, no need to replicate
|
|
||||||
nodes = append(nodes[:i], nodes[i+1:]...)
|
|
||||||
i--
|
|
||||||
shortage--
|
shortage--
|
||||||
}
|
}
|
||||||
|
if status == nodeIsUnderMaintenance {
|
||||||
|
shortage--
|
||||||
|
uncheckedCopies++
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes = append(nodes[:i], nodes[i+1:]...)
|
||||||
|
i--
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,10 +146,10 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
shortage--
|
shortage--
|
||||||
checkedNodes.submitReplicaHolder(nodes[i])
|
checkedNodes.set(nodes[i], nodeHoldsObject)
|
||||||
} else {
|
} else {
|
||||||
if client.IsErrObjectNotFound(err) {
|
if client.IsErrObjectNotFound(err) {
|
||||||
checkedNodes.submitReplicaCandidate(nodes[i])
|
checkedNodes.set(nodes[i], nodeDoesNotHoldObject)
|
||||||
continue
|
continue
|
||||||
} else if client.IsErrNodeUnderMaintenance(err) {
|
} else if client.IsErrNodeUnderMaintenance(err) {
|
||||||
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
|
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
|
||||||
|
@ -155,6 +158,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
||||||
zap.Stringer("object", addr),
|
zap.Stringer("object", addr),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
checkedNodes.set(nodes[i], nodeStatusUnknown)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -174,7 +178,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
|
||||||
// However, additional copies should not be removed in this case,
|
// However, additional copies should not be removed in this case,
|
||||||
// because we can remove the only copy this way.
|
// because we can remove the only copy this way.
|
||||||
func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
|
func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
|
||||||
checkedNodes.submitReplicaHolder(node)
|
checkedNodes.set(node, nodeIsUnderMaintenance)
|
||||||
shortage--
|
shortage--
|
||||||
uncheckedCopies++
|
uncheckedCopies++
|
||||||
|
|
||||||
|
|
|
@ -16,9 +16,9 @@ func TestNodeCache(t *testing.T) {
|
||||||
cache.SubmitSuccessfulReplication(node)
|
cache.SubmitSuccessfulReplication(node)
|
||||||
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
|
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
|
||||||
|
|
||||||
cache.submitReplicaCandidate(node)
|
cache.set(node, nodeDoesNotHoldObject)
|
||||||
require.Equal(t, cache.processStatus(node), nodeDoesNotHoldObject)
|
require.Equal(t, cache.processStatus(node), nodeDoesNotHoldObject)
|
||||||
|
|
||||||
cache.submitReplicaHolder(node)
|
cache.set(node, nodeHoldsObject)
|
||||||
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
|
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,8 @@ const (
|
||||||
nodeNotProcessed nodeProcessStatus = iota
|
nodeNotProcessed nodeProcessStatus = iota
|
||||||
nodeDoesNotHoldObject
|
nodeDoesNotHoldObject
|
||||||
nodeHoldsObject
|
nodeHoldsObject
|
||||||
|
nodeStatusUnknown
|
||||||
|
nodeIsUnderMaintenance
|
||||||
)
|
)
|
||||||
|
|
||||||
func (st nodeProcessStatus) Processed() bool {
|
func (st nodeProcessStatus) Processed() bool {
|
||||||
|
@ -15,37 +17,19 @@ func (st nodeProcessStatus) Processed() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodeCache tracks Policer's check progress.
|
// nodeCache tracks Policer's check progress.
|
||||||
type nodeCache map[uint64]bool
|
type nodeCache map[uint64]nodeProcessStatus
|
||||||
|
|
||||||
func newNodeCache() nodeCache {
|
func newNodeCache() nodeCache {
|
||||||
return make(map[uint64]bool)
|
return make(map[uint64]nodeProcessStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n nodeCache) set(node netmap.NodeInfo, val bool) {
|
func (n nodeCache) set(node netmap.NodeInfo, val nodeProcessStatus) {
|
||||||
n[node.Hash()] = val
|
n[node.Hash()] = val
|
||||||
}
|
}
|
||||||
|
|
||||||
// submits storage node as a candidate to store the object replica in case of
|
|
||||||
// shortage.
|
|
||||||
func (n nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
|
|
||||||
n.set(node, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
// submits storage node as a current object replica holder.
|
|
||||||
func (n nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
|
|
||||||
n.set(node, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
// processStatus returns current processing status of the storage node.
|
// processStatus returns current processing status of the storage node.
|
||||||
func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
|
func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
|
||||||
switch val, ok := n[node.Hash()]; {
|
return n[node.Hash()]
|
||||||
case !ok:
|
|
||||||
return nodeNotProcessed
|
|
||||||
case val:
|
|
||||||
return nodeHoldsObject
|
|
||||||
default:
|
|
||||||
return nodeDoesNotHoldObject
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubmitSuccessfulReplication marks given storage node as a current object
|
// SubmitSuccessfulReplication marks given storage node as a current object
|
||||||
|
@ -53,5 +37,5 @@ func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
|
||||||
//
|
//
|
||||||
// SubmitSuccessfulReplication implements replicator.TaskResult.
|
// SubmitSuccessfulReplication implements replicator.TaskResult.
|
||||||
func (n nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
|
func (n nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
|
||||||
n.submitReplicaHolder(node)
|
n.set(node, nodeHoldsObject)
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,6 +78,7 @@ func TestProcessObject(t *testing.T) {
|
||||||
maintenanceNodes []int
|
maintenanceNodes []int
|
||||||
wantRemoveRedundant bool
|
wantRemoveRedundant bool
|
||||||
wantReplicateTo []int
|
wantReplicateTo []int
|
||||||
|
headResult map[int]error
|
||||||
ecInfo *objectcore.ECInfo
|
ecInfo *objectcore.ECInfo
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
@ -127,7 +128,7 @@ func TestProcessObject(t *testing.T) {
|
||||||
nodeCount: 2,
|
nodeCount: 2,
|
||||||
policy: `REP 2 REP 2`,
|
policy: `REP 2 REP 2`,
|
||||||
placement: [][]int{{0, 1}, {0, 1}},
|
placement: [][]int{{0, 1}, {0, 1}},
|
||||||
wantReplicateTo: []int{1, 1}, // is this actually good?
|
wantReplicateTo: []int{1},
|
||||||
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "lock object must be replicated to all nodes",
|
desc: "lock object must be replicated to all nodes",
|
||||||
|
@ -145,6 +146,14 @@ func TestProcessObject(t *testing.T) {
|
||||||
objHolders: []int{1},
|
objHolders: []int{1},
|
||||||
maintenanceNodes: []int{2},
|
maintenanceNodes: []int{2},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "preserve local copy when node response with MAINTENANCE",
|
||||||
|
nodeCount: 3,
|
||||||
|
policy: `REP 2`,
|
||||||
|
placement: [][]int{{1, 2}},
|
||||||
|
objHolders: []int{1},
|
||||||
|
headResult: map[int]error{2: new(apistatus.NodeUnderMaintenance)},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
desc: "lock object must be replicated to all EC nodes",
|
desc: "lock object must be replicated to all EC nodes",
|
||||||
objType: objectSDK.TypeLock,
|
objType: objectSDK.TypeLock,
|
||||||
|
@ -161,6 +170,14 @@ func TestProcessObject(t *testing.T) {
|
||||||
placement: [][]int{{0, 1, 2}},
|
placement: [][]int{{0, 1, 2}},
|
||||||
wantReplicateTo: []int{1, 2},
|
wantReplicateTo: []int{1, 2},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "do not remove local copy when MAINTENANCE status is cached",
|
||||||
|
objType: objectSDK.TypeRegular,
|
||||||
|
nodeCount: 3,
|
||||||
|
policy: `REP 1 REP 1`,
|
||||||
|
placement: [][]int{{1, 2}, {1, 0}},
|
||||||
|
headResult: map[int]error{1: new(apistatus.NodeUnderMaintenance)},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range tests {
|
for i := range tests {
|
||||||
|
@ -204,6 +221,11 @@ func TestProcessObject(t *testing.T) {
|
||||||
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
|
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
|
||||||
return nil, errors.New("unexpected object head")
|
return nil, errors.New("unexpected object head")
|
||||||
}
|
}
|
||||||
|
if ti.headResult != nil {
|
||||||
|
if err, ok := ti.headResult[index]; ok {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
for _, i := range ti.objHolders {
|
for _, i := range ti.objHolders {
|
||||||
if index == i {
|
if index == i {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
|
Loading…
Add table
Reference in a new issue
The question of @ale64bit was finally answered: no, it is not :)