Fix maintenance node processing in policer #1604

Merged
fyrchik merged 5 commits from fyrchik/frostfs-node:fix-policer into master 2025-01-17 08:50:09 +00:00
4 changed files with 42 additions and 32 deletions

View file

@ -126,12 +126,15 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
} else {
if status := checkedNodes.processStatus(nodes[i]); status.Processed() {
if status == nodeHoldsObject {
// node already contains replica, no need to replicate
nodes = append(nodes[:i], nodes[i+1:]...)
i--
shortage--
}
if status == nodeIsUnderMaintenance {
shortage--
uncheckedCopies++
}
nodes = append(nodes[:i], nodes[i+1:]...)
i--
continue
}
@ -143,10 +146,10 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
if err == nil {
shortage--
checkedNodes.submitReplicaHolder(nodes[i])
checkedNodes.set(nodes[i], nodeHoldsObject)
} else {
if client.IsErrObjectNotFound(err) {
checkedNodes.submitReplicaCandidate(nodes[i])
checkedNodes.set(nodes[i], nodeDoesNotHoldObject)
continue
} else if client.IsErrNodeUnderMaintenance(err) {
shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies)
@ -155,6 +158,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
zap.Stringer("object", addr),
zap.Error(err),
)
checkedNodes.set(nodes[i], nodeStatusUnknown)
}
}
}
@ -174,7 +178,7 @@ func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRe
// However, additional copies should not be removed in this case,
// because we can remove the only copy this way.
func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) {
checkedNodes.submitReplicaHolder(node)
checkedNodes.set(node, nodeIsUnderMaintenance)
shortage--
uncheckedCopies++

View file

@ -16,9 +16,9 @@ func TestNodeCache(t *testing.T) {
cache.SubmitSuccessfulReplication(node)
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
cache.submitReplicaCandidate(node)
cache.set(node, nodeDoesNotHoldObject)
require.Equal(t, cache.processStatus(node), nodeDoesNotHoldObject)
cache.submitReplicaHolder(node)
cache.set(node, nodeHoldsObject)
require.Equal(t, cache.processStatus(node), nodeHoldsObject)
}

View file

@ -8,6 +8,8 @@ const (
nodeNotProcessed nodeProcessStatus = iota
nodeDoesNotHoldObject
nodeHoldsObject
nodeStatusUnknown
nodeIsUnderMaintenance
)
func (st nodeProcessStatus) Processed() bool {
@ -15,37 +17,19 @@ func (st nodeProcessStatus) Processed() bool {
}
// nodeCache tracks Policer's check progress.
type nodeCache map[uint64]bool
type nodeCache map[uint64]nodeProcessStatus
func newNodeCache() nodeCache {
return make(map[uint64]bool)
return make(map[uint64]nodeProcessStatus)
}
func (n nodeCache) set(node netmap.NodeInfo, val bool) {
func (n nodeCache) set(node netmap.NodeInfo, val nodeProcessStatus) {
n[node.Hash()] = val
}
// submits storage node as a candidate to store the object replica in case of
// shortage.
func (n nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
n.set(node, false)
}
// submits storage node as a current object replica holder.
func (n nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
n.set(node, true)
}
// processStatus returns current processing status of the storage node.
func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
switch val, ok := n[node.Hash()]; {
case !ok:
return nodeNotProcessed
case val:
return nodeHoldsObject
default:
return nodeDoesNotHoldObject
}
return n[node.Hash()]
}
// SubmitSuccessfulReplication marks given storage node as a current object
@ -53,5 +37,5 @@ func (n nodeCache) processStatus(node netmap.NodeInfo) nodeProcessStatus {
//
// SubmitSuccessfulReplication implements replicator.TaskResult.
func (n nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
n.submitReplicaHolder(node)
n.set(node, nodeHoldsObject)
}

View file

@ -78,6 +78,7 @@ func TestProcessObject(t *testing.T) {
maintenanceNodes []int
wantRemoveRedundant bool
wantReplicateTo []int
headResult map[int]error
ecInfo *objectcore.ECInfo
}{
{
@ -127,7 +128,7 @@ func TestProcessObject(t *testing.T) {
nodeCount: 2,
policy: `REP 2 REP 2`,
placement: [][]int{{0, 1}, {0, 1}},
wantReplicateTo: []int{1, 1}, // is this actually good?
Review

The question of @ale64bit was finally answered: no, it is not :)

The question of @ale64bit was finally answered: no, it is not :)
wantReplicateTo: []int{1},
},
{
desc: "lock object must be replicated to all nodes",
@ -145,6 +146,14 @@ func TestProcessObject(t *testing.T) {
objHolders: []int{1},
maintenanceNodes: []int{2},
},
{
desc: "preserve local copy when node response with MAINTENANCE",
nodeCount: 3,
policy: `REP 2`,
placement: [][]int{{1, 2}},
objHolders: []int{1},
headResult: map[int]error{2: new(apistatus.NodeUnderMaintenance)},
},
{
desc: "lock object must be replicated to all EC nodes",
objType: objectSDK.TypeLock,
@ -161,6 +170,14 @@ func TestProcessObject(t *testing.T) {
placement: [][]int{{0, 1, 2}},
wantReplicateTo: []int{1, 2},
},
{
desc: "do not remove local copy when MAINTENANCE status is cached",
objType: objectSDK.TypeRegular,
nodeCount: 3,
policy: `REP 1 REP 1`,
placement: [][]int{{1, 2}, {1, 0}},
headResult: map[int]error{1: new(apistatus.NodeUnderMaintenance)},
},
}
for i := range tests {
@ -204,6 +221,11 @@ func TestProcessObject(t *testing.T) {
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
return nil, errors.New("unexpected object head")
}
if ti.headResult != nil {
if err, ok := ti.headResult[index]; ok {
return nil, err
}
}
for _, i := range ti.objHolders {
if index == i {
return nil, nil