diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index b6f48faa4..569d49c56 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -13,10 +13,45 @@ import ( "go.uber.org/zap" ) +// tracks Policer's check progress type nodeCache map[uint64]bool -func (n nodeCache) SubmitSuccessfulReplication(id uint64) { - n[id] = true +func newNodeCache() *nodeCache { + m := make(map[uint64]bool) + return (*nodeCache)(&m) +} + +// submits storage node as a candidate to store the object replica in case of +// shortage. +func (n *nodeCache) submitReplicaCandidate(node netmap.NodeInfo) { + (*n)[node.Hash()] = false +} + +// submits storage node as a current object replica holder. +func (n *nodeCache) submitReplicaHolder(node netmap.NodeInfo) { + (*n)[node.Hash()] = true +} + +// processStatus returns current processing status of the storage node +// +// >0 if node does not currently hold the object +// 0 if node already holds the object +// <0 if node has not been processed yet +func (n *nodeCache) processStatus(node netmap.NodeInfo) int8 { + val, ok := (*n)[node.Hash()] + if !ok { + return -1 + } + + if val { + return 0 + } + + return 1 +} + +func (n *nodeCache) SubmitSuccessfulReplication(id uint64) { + (*n)[id] = true } func (p *Policer) processObject(ctx context.Context, addr oid.Address) { @@ -68,7 +103,7 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) { } // cached info about already checked nodes - var checkedNodes nodeCache = make(map[uint64]bool, numOfContainerNodes) + checkedNodes := newNodeCache() for i := range nn { select { @@ -96,7 +131,7 @@ type processPlacementContext struct { } func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, - nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache) { + nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) { prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr) for i := 0; shortage > 0 && i < len(nodes); i++ { @@ -111,9 +146,8 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, shortage-- } else { - nodeID := nodes[i].Hash() - if hasReplica, checked := checkedNodes[nodeID]; checked { - if hasReplica { + if status := checkedNodes.processStatus(nodes[i]); status >= 0 { + if status == 0 { // node already contains replica, no need to replicate nodes = append(nodes[:i], nodes[i+1:]...) i-- @@ -130,7 +164,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, cancel() if client.IsErrObjectNotFound(err) { - checkedNodes[nodeID] = false + checkedNodes.submitReplicaCandidate(nodes[i]) continue } @@ -141,7 +175,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, ) } else { shortage-- - checkedNodes[nodeID] = true + checkedNodes.submitReplicaHolder(nodes[i]) } } diff --git a/pkg/services/policer/check_test.go b/pkg/services/policer/check_test.go new file mode 100644 index 000000000..d04c62502 --- /dev/null +++ b/pkg/services/policer/check_test.go @@ -0,0 +1,24 @@ +package policer + +import ( + "testing" + + netmaptest "github.com/nspcc-dev/neofs-sdk-go/netmap/test" + "github.com/stretchr/testify/require" +) + +func TestNodeCache(t *testing.T) { + cache := newNodeCache() + node := netmaptest.NodeInfo() + + require.Negative(t, cache.processStatus(node)) + + cache.SubmitSuccessfulReplication(node.Hash()) + require.Zero(t, cache.processStatus(node)) + + cache.submitReplicaCandidate(node) + require.Positive(t, cache.processStatus(node)) + + cache.submitReplicaHolder(node) + require.Zero(t, cache.processStatus(node)) +}