Fix maintenance node processing in policer #1604

Merged
fyrchik merged 5 commits from fyrchik/frostfs-node:fix-policer into master 2025-01-17 08:50:09 +00:00
4 changed files with 42 additions and 32 deletions

View file

@ -126,12 +126,15 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
} else { } else {
if status := checkedNodes.processStatus(nodes[i]); status.Processed() { if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
if status == nodeHoldsObject { if status == nodeHoldsObject {
// node already contains replica, no need to replicate
nodes = append(nodes[:i], nodes[i+1:]...)
i--
shortage-- shortage--
} }
if status == nodeIsUnderMaintenance {
shortage--
uncheckedCopies++
}
nodes = append(nodes[:i], nodes[i+1:]...)
i--
continue continue
} }
@ -143,10 +146,10 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
if err == nil { if err == nil {
shortage-- shortage--
checkedNodes.submitReplicaHolder(nodes[i]) checkedNodes.set(nodes[i], nodeHoldsObject)
} else { } else {
if client.IsErrObjectNotFound(err) { if client.IsErrObjectNotFound(err) {
checkedNodes.submitReplicaCandidate(nodes[i]) checkedNodes.set(nodes[i], nodeDoesNotHoldObject)
continue continue
} else if client.IsErrNodeUnderMaintenance(err) { } else if client.IsErrNodeUnderMaintenance(err) {
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies) shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
@ -155,6 +158,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
zap.Stringer("object", addr), zap.Stringer("object", addr),
zap.Error(err), zap.Error(err),
) )
checkedNodes.set(nodes[i], nodeStatusUnknown)
} }
} }
} }
@ -174,7 +178,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
// However, additional copies should not be removed in this case, // However, additional copies should not be removed in this case,
// because we can remove the only copy this way. // because we can remove the only copy this way.
func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) { func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
checkedNodes.submitReplicaHolder(node) checkedNodes.set(node, nodeIsUnderMaintenance)
shortage-- shortage--
uncheckedCopies++ uncheckedCopies++

View file

@ -16,9 +16,9 @@ func TestNodeCache(t *testing.T) {
cache.SubmitSuccessfulReplication(node) cache.SubmitSuccessfulReplication(node)
require.Equal(t, cache.processStatus(node), nodeHoldsObject) require.Equal(t, cache.processStatus(node), nodeHoldsObject)
cache.submitReplicaCandidate(node) cache.set(node, nodeDoesNotHoldObject)
require.Equal(t, cache.processStatus(node), nodeDoesNotHoldObject) require.Equal(t, cache.processStatus(node), nodeDoesNotHoldObject)
cache.submitReplicaHolder(node) cache.set(node, nodeHoldsObject)
require.Equal(t, cache.processStatus(node), nodeHoldsObject) require.Equal(t, cache.processStatus(node), nodeHoldsObject)
} }

View file

@ -8,6 +8,8 @@ const (
nodeNotProcessed nodeProcessStatus = iota nodeNotProcessed nodeProcessStatus = iota
nodeDoesNotHoldObject nodeDoesNotHoldObject
nodeHoldsObject nodeHoldsObject
nodeStatusUnknown
nodeIsUnderMaintenance
) )
func (st nodeProcessStatus) Processed() bool { func (st nodeProcessStatus) Processed() bool {
@ -15,37 +17,19 @@ func (st nodeProcessStatus) Processed() bool {
} }
// nodeCache tracks Policer's check progress. // nodeCache tracks Policer's check progress.
type nodeCache map[uint64]bool type nodeCache map[uint64]nodeProcessStatus
func newNodeCache() nodeCache { func newNodeCache() nodeCache {
return make(map[uint64]bool) return make(map[uint64]nodeProcessStatus)
} }
func (n nodeCache) set(node netmap.NodeInfo, val bool) { func (n nodeCache) set(node netmap.NodeInfo, val nodeProcessStatus) {
n[node.Hash()] = val n[node.Hash()] = val
} }
// submits storage node as a candidate to store the object replica in case of
// shortage.
func (n nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
n.set(node, false)
}
// submits storage node as a current object replica holder.
func (n nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
n.set(node, true)
}
// processStatus returns current processing status of the storage node. // processStatus returns current processing status of the storage node.
func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus { func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
switch val, ok := n[node.Hash()]; { return n[node.Hash()]
case !ok:
return nodeNotProcessed
case val:
return nodeHoldsObject
default:
return nodeDoesNotHoldObject
}
} }
// SubmitSuccessfulReplication marks given storage node as a current object // SubmitSuccessfulReplication marks given storage node as a current object
@ -53,5 +37,5 @@ func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
// //
// SubmitSuccessfulReplication implements replicator.TaskResult. // SubmitSuccessfulReplication implements replicator.TaskResult.
func (n nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) { func (n nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
n.submitReplicaHolder(node) n.set(node, nodeHoldsObject)
} }

View file

@ -78,6 +78,7 @@ func TestProcessObject(t *testing.T) {
maintenanceNodes []int maintenanceNodes []int
wantRemoveRedundant bool wantRemoveRedundant bool
wantReplicateTo []int wantReplicateTo []int
headResult map[int]error
ecInfo *objectcore.ECInfo ecInfo *objectcore.ECInfo
}{ }{
{ {
@ -127,7 +128,7 @@ func TestProcessObject(t *testing.T) {
nodeCount: 2, nodeCount: 2,
policy: `REP 2 REP 2`, policy: `REP 2 REP 2`,
placement: [][]int{{0, 1}, {0, 1}}, placement: [][]int{{0, 1}, {0, 1}},
wantReplicateTo: []int{1, 1}, // is this actually good? wantReplicateTo: []int{1},
Review

The question of @ale64bit was finally answered: no, it is not :)

The question of @ale64bit was finally answered: no, it is not :)
}, },
{ {
desc: "lock object must be replicated to all nodes", desc: "lock object must be replicated to all nodes",
@ -145,6 +146,14 @@ func TestProcessObject(t *testing.T) {
objHolders: []int{1}, objHolders: []int{1},
maintenanceNodes: []int{2}, maintenanceNodes: []int{2},
}, },
{
desc: "preserve local copy when node response with MAINTENANCE",
nodeCount: 3,
policy: `REP 2`,
placement: [][]int{{1, 2}},
objHolders: []int{1},
headResult: map[int]error{2: new(apistatus.NodeUnderMaintenance)},
},
{ {
desc: "lock object must be replicated to all EC nodes", desc: "lock object must be replicated to all EC nodes",
objType: objectSDK.TypeLock, objType: objectSDK.TypeLock,
@ -161,6 +170,14 @@ func TestProcessObject(t *testing.T) {
placement: [][]int{{0, 1, 2}}, placement: [][]int{{0, 1, 2}},
wantReplicateTo: []int{1, 2}, wantReplicateTo: []int{1, 2},
}, },
{
desc: "do not remove local copy when MAINTENANCE status is cached",
objType: objectSDK.TypeRegular,
nodeCount: 3,
policy: `REP 1 REP 1`,
placement: [][]int{{1, 2}, {1, 0}},
headResult: map[int]error{1: new(apistatus.NodeUnderMaintenance)},
},
} }
for i := range tests { for i := range tests {
@ -204,6 +221,11 @@ func TestProcessObject(t *testing.T) {
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a) t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
return nil, errors.New("unexpected object head") return nil, errors.New("unexpected object head")
} }
if ti.headResult != nil {
if err, ok := ti.headResult[index]; ok {
return nil, err
}
}
for _, i := range ti.objHolders { for _, i := range ti.objHolders {
if index == i { if index == i {
return nil, nil return nil, nil