[#1680] policer: Refactor tracking the processed nodes

Add clear methods with docs. Use the methods instead of direct map
and bool instructions.

Signed-off-by: Leonard Lyubich <ctulhurider@gmail.com>
This commit is contained in:
Leonard Lyubich 2022-10-07 13:19:31 +04:00 committed by fyrchik
parent feaa9eace7
commit e6f8904040
2 changed files with 67 additions and 9 deletions

View file

@ -13,10 +13,45 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// tracks Policer's check progress
type nodeCache map[uint64]bool type nodeCache map[uint64]bool
func (n nodeCache) SubmitSuccessfulReplication(id uint64) { func newNodeCache() *nodeCache {
n[id] = true m := make(map[uint64]bool)
return (*nodeCache)(&m)
}
// submits storage node as a candidate to store the object replica in case of
// shortage.
func (n *nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
(*n)[node.Hash()] = false
}
// submits storage node as a current object replica holder.
func (n *nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
(*n)[node.Hash()] = true
}
// processStatus returns current processing status of the storage node
//
// >0 if node does not currently hold the object
// 0 if node already holds the object
// <0 if node has not been processed yet
func (n *nodeCache) processStatus(node netmap.NodeInfo) int8 {
val, ok := (*n)[node.Hash()]
if !ok {
return -1
}
if val {
return 0
}
return 1
}
func (n *nodeCache) SubmitSuccessfulReplication(id uint64) {
(*n)[id] = true
} }
func (p *Policer) processObject(ctx context.Context, addr oid.Address) { func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
@ -68,7 +103,7 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
} }
// cached info about already checked nodes // cached info about already checked nodes
var checkedNodes nodeCache = make(map[uint64]bool, numOfContainerNodes) checkedNodes := newNodeCache()
for i := range nn { for i := range nn {
select { select {
@ -96,7 +131,7 @@ type processPlacementContext struct {
} }
func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address,
nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache) { nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr) prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
for i := 0; shortage > 0 && i < len(nodes); i++ { for i := 0; shortage > 0 && i < len(nodes); i++ {
@ -111,9 +146,8 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address,
shortage-- shortage--
} else { } else {
nodeID := nodes[i].Hash() if status := checkedNodes.processStatus(nodes[i]); status >= 0 {
if hasReplica, checked := checkedNodes[nodeID]; checked { if status == 0 {
if hasReplica {
// node already contains replica, no need to replicate // node already contains replica, no need to replicate
nodes = append(nodes[:i], nodes[i+1:]...) nodes = append(nodes[:i], nodes[i+1:]...)
i-- i--
@ -130,7 +164,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address,
cancel() cancel()
if client.IsErrObjectNotFound(err) { if client.IsErrObjectNotFound(err) {
checkedNodes[nodeID] = false checkedNodes.submitReplicaCandidate(nodes[i])
continue continue
} }
@ -141,7 +175,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address,
) )
} else { } else {
shortage-- shortage--
checkedNodes[nodeID] = true checkedNodes.submitReplicaHolder(nodes[i])
} }
} }

View file

@ -0,0 +1,24 @@
package policer
import (
"testing"
netmaptest "github.com/nspcc-dev/neofs-sdk-go/netmap/test"
"github.com/stretchr/testify/require"
)
func TestNodeCache(t *testing.T) {
cache := newNodeCache()
node := netmaptest.NodeInfo()
require.Negative(t, cache.processStatus(node))
cache.SubmitSuccessfulReplication(node.Hash())
require.Zero(t, cache.processStatus(node))
cache.submitReplicaCandidate(node)
require.Positive(t, cache.processStatus(node))
cache.submitReplicaHolder(node)
require.Zero(t, cache.processStatus(node))
}