[#157] netmap: Fallback to using minimal backup factor

Use CBF=1 when strict policy can't be satisfied.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2020-11-27 11:46:25 +03:00 committed by Leonard Lyubich
parent c01024b553
commit ea6b6adfe0
2 changed files with 45 additions and 9 deletions

View file

@ -36,14 +36,14 @@ func (c *Context) processSelectors(p *PlacementPolicy) error {
return nil
}
// GetNodesCount returns amount of buckets and nodes in every bucket
// for a given placement policy.
func GetNodesCount(p *PlacementPolicy, s *Selector) (int, int) {
// GetNodesCount returns amount of buckets and minimum number of nodes in every bucket
// for the given selector.
func GetNodesCount(_ *PlacementPolicy, s *Selector) (int, int) {
switch s.Clause() {
case ClauseSame:
return 1, int(p.ContainerBackupFactor() * s.Count())
return 1, int(s.Count())
default:
return int(s.Count()), int(p.ContainerBackupFactor())
return int(s.Count()), 1
}
}
@ -69,17 +69,25 @@ func (c *Context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error)
}
}
maxNodesInBucket := nodesInBucket * int(p.ContainerBackupFactor())
nodes := make([]Nodes, 0, len(buckets))
fallback := make([]Nodes, 0, len(buckets))
for i := range buckets {
ns := buckets[i].nodes
if len(ns) >= nodesInBucket {
nodes = append(nodes, ns[:nodesInBucket])
if len(ns) >= maxNodesInBucket {
nodes = append(nodes, ns[:maxNodesInBucket])
} else if len(ns) >= nodesInBucket {
fallback = append(fallback, ns)
}
}
if len(nodes) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name())
// Fallback to using minimum allowed backup factor (1).
nodes = append(nodes, fallback...)
if len(nodes) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name())
}
}
if len(c.pivot) != 0 {

View file

@ -92,6 +92,33 @@ func TestPlacementPolicy_GetPlacementVectors(t *testing.T) {
require.Equal(t, len(v.Replicas()[1]), len(ids), "not all nodes we distinct")
}
func TestPlacementPolicy_LowerBound(t *testing.T) {
p := newPlacementPolicy(
2, // backup factor
[]*Replica{
newReplica(1, "X"),
},
[]*Selector{
newSelector("X", "Country", ClauseSame, 2, "*"),
},
nil, // filters
)
nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "DE"),
nodeInfoFromAttributes("ID", "2", "Country", "DE"),
nodeInfoFromAttributes("ID", "3", "Country", "DE"),
}
nm, err := NewNetmap(NodesFromInfo(nodes))
require.NoError(t, err)
v, err := nm.GetContainerNodes(p, nil)
require.NoError(t, err)
require.Equal(t, 3, len(v.Flatten()))
}
func TestPlacementPolicy_ProcessSelectors(t *testing.T) {
p := newPlacementPolicy(2, nil,
[]*Selector{
@ -128,6 +155,7 @@ func TestPlacementPolicy_ProcessSelectors(t *testing.T) {
sel := c.Selections[s.Name()]
s := c.Selectors[s.Name()]
bucketCount, nodesInBucket := GetNodesCount(p, s)
nodesInBucket *= int(p.ContainerBackupFactor())
targ := fmt.Sprintf("selector '%s'", s.Name())
require.Equal(t, bucketCount, len(sel), targ)
for _, res := range sel {
@ -216,7 +244,7 @@ func TestPlacementPolicy_ProcessSelectorsInvalid(t *testing.T) {
{
"NotEnoughNodes (backup factor)",
newPlacementPolicy(2, nil,
[]*Selector{newSelector("MyStore", "Country", ClauseDistinct, 1, "FromRU")},
[]*Selector{newSelector("MyStore", "Country", ClauseDistinct, 2, "FromRU")},
[]*Filter{newFilter("FromRU", "Country", "Russia", OpEQ)}),
ErrNotEnoughNodes,
},