From ea6b6adfe029815d6f5cf82ba2c68128e5d8a1ed Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 27 Nov 2020 11:46:25 +0300 Subject: [PATCH] [#157] netmap: Fallback to using minimal backup factor Use CBF=1 when strict policy can't be satisfied. Signed-off-by: Evgenii Stratonikov --- pkg/netmap/selector.go | 24 ++++++++++++++++-------- pkg/netmap/selector_test.go | 30 +++++++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/pkg/netmap/selector.go b/pkg/netmap/selector.go index f12027c..ec1aa3d 100644 --- a/pkg/netmap/selector.go +++ b/pkg/netmap/selector.go @@ -36,14 +36,14 @@ func (c *Context) processSelectors(p *PlacementPolicy) error { return nil } -// GetNodesCount returns amount of buckets and nodes in every bucket -// for a given placement policy. -func GetNodesCount(p *PlacementPolicy, s *Selector) (int, int) { +// GetNodesCount returns amount of buckets and minimum number of nodes in every bucket +// for the given selector. +func GetNodesCount(_ *PlacementPolicy, s *Selector) (int, int) { switch s.Clause() { case ClauseSame: - return 1, int(p.ContainerBackupFactor() * s.Count()) + return 1, int(s.Count()) default: - return int(s.Count()), int(p.ContainerBackupFactor()) + return int(s.Count()), 1 } } @@ -69,17 +69,25 @@ func (c *Context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error) } } + maxNodesInBucket := nodesInBucket * int(p.ContainerBackupFactor()) nodes := make([]Nodes, 0, len(buckets)) + fallback := make([]Nodes, 0, len(buckets)) for i := range buckets { ns := buckets[i].nodes - if len(ns) >= nodesInBucket { - nodes = append(nodes, ns[:nodesInBucket]) + if len(ns) >= maxNodesInBucket { + nodes = append(nodes, ns[:maxNodesInBucket]) + } else if len(ns) >= nodesInBucket { + fallback = append(fallback, ns) } } if len(nodes) < bucketCount { - return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name()) + // Fallback to using minimum allowed backup factor (1). + nodes = append(nodes, fallback...) + if len(nodes) < bucketCount { + return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name()) + } } if len(c.pivot) != 0 { diff --git a/pkg/netmap/selector_test.go b/pkg/netmap/selector_test.go index 7f74f21..b1cf378 100644 --- a/pkg/netmap/selector_test.go +++ b/pkg/netmap/selector_test.go @@ -92,6 +92,33 @@ func TestPlacementPolicy_GetPlacementVectors(t *testing.T) { require.Equal(t, len(v.Replicas()[1]), len(ids), "not all nodes we distinct") } +func TestPlacementPolicy_LowerBound(t *testing.T) { + p := newPlacementPolicy( + 2, // backup factor + []*Replica{ + newReplica(1, "X"), + }, + []*Selector{ + newSelector("X", "Country", ClauseSame, 2, "*"), + }, + nil, // filters + ) + + nodes := []NodeInfo{ + nodeInfoFromAttributes("ID", "1", "Country", "DE"), + nodeInfoFromAttributes("ID", "2", "Country", "DE"), + nodeInfoFromAttributes("ID", "3", "Country", "DE"), + } + + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + + v, err := nm.GetContainerNodes(p, nil) + require.NoError(t, err) + + require.Equal(t, 3, len(v.Flatten())) +} + func TestPlacementPolicy_ProcessSelectors(t *testing.T) { p := newPlacementPolicy(2, nil, []*Selector{ @@ -128,6 +155,7 @@ func TestPlacementPolicy_ProcessSelectors(t *testing.T) { sel := c.Selections[s.Name()] s := c.Selectors[s.Name()] bucketCount, nodesInBucket := GetNodesCount(p, s) + nodesInBucket *= int(p.ContainerBackupFactor()) targ := fmt.Sprintf("selector '%s'", s.Name()) require.Equal(t, bucketCount, len(sel), targ) for _, res := range sel { @@ -216,7 +244,7 @@ func TestPlacementPolicy_ProcessSelectorsInvalid(t *testing.T) { { "NotEnoughNodes (backup factor)", newPlacementPolicy(2, nil, - []*Selector{newSelector("MyStore", "Country", ClauseDistinct, 1, "FromRU")}, + []*Selector{newSelector("MyStore", "Country", ClauseDistinct, 2, "FromRU")}, []*Filter{newFilter("FromRU", "Country", "Russia", OpEQ)}), ErrNotEnoughNodes, },