diff --git a/pkg/netmap/context.go b/pkg/netmap/context.go index cedb729..29b8167 100644 --- a/pkg/netmap/context.go +++ b/pkg/netmap/context.go @@ -29,6 +29,9 @@ type Context struct { // weightFunc is a weighting function for determining node priority. // By default in combines favours low price and high capacity. weightFunc weightFunc + // container backup factor is a factor for selector counters that expand + // amount of chosen nodes. + cbf uint32 } // Various validation errors. @@ -56,6 +59,7 @@ func NewContext(nm *Netmap) *Context { numCache: make(map[*Filter]uint64), aggregator: newMeanIQRAgg, weightFunc: GetDefaultWeightFunc(nm.Nodes), + cbf: defaultCBF, } } @@ -66,6 +70,14 @@ func (c *Context) setPivot(pivot []byte) { } } +func (c *Context) setCBF(cbf uint32) { + if cbf == 0 { + c.cbf = defaultCBF + } else { + c.cbf = cbf + } +} + // GetDefaultWeightFunc returns default weighting function. func GetDefaultWeightFunc(ns Nodes) weightFunc { mean := newMeanAgg() diff --git a/pkg/netmap/netmap.go b/pkg/netmap/netmap.go index 080dd77..369d9ac 100644 --- a/pkg/netmap/netmap.go +++ b/pkg/netmap/netmap.go @@ -50,10 +50,7 @@ func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes, func (m *Netmap) GetContainerNodes(p *PlacementPolicy, pivot []byte) (ContainerNodes, error) { c := NewContext(m) c.setPivot(pivot) - - if p.ContainerBackupFactor() == 0 { - p.SetContainerBackupFactor(defaultCBF) - } + c.setCBF(p.ContainerBackupFactor()) if err := c.processFilters(p); err != nil { return nil, err diff --git a/pkg/netmap/selector.go b/pkg/netmap/selector.go index ec1aa3d..ff83f1b 100644 --- a/pkg/netmap/selector.go +++ b/pkg/netmap/selector.go @@ -69,7 +69,7 @@ func (c *Context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error) } } - maxNodesInBucket := nodesInBucket * int(p.ContainerBackupFactor()) + maxNodesInBucket := nodesInBucket * int(c.cbf) nodes := make([]Nodes, 0, len(buckets)) fallback := make([]Nodes, 0, len(buckets)) diff --git a/pkg/netmap/selector_test.go b/pkg/netmap/selector_test.go index 81fcd58..135a7b3 100644 --- a/pkg/netmap/selector_test.go +++ b/pkg/netmap/selector_test.go @@ -270,6 +270,7 @@ func TestPlacementPolicy_ProcessSelectors(t *testing.T) { nm, err := NewNetmap(NodesFromInfo(nodes)) require.NoError(t, err) c := NewContext(nm) + c.setCBF(p.ContainerBackupFactor()) require.NoError(t, c.processFilters(p)) require.NoError(t, c.processSelectors(p)) @@ -277,7 +278,7 @@ func TestPlacementPolicy_ProcessSelectors(t *testing.T) { sel := c.Selections[s.Name()] s := c.Selectors[s.Name()] bucketCount, nodesInBucket := GetNodesCount(p, s) - nodesInBucket *= int(p.ContainerBackupFactor()) + nodesInBucket *= int(c.cbf) targ := fmt.Sprintf("selector '%s'", s.Name()) require.Equal(t, bucketCount, len(sel), targ) for _, res := range sel { @@ -312,6 +313,7 @@ func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) { require.NoError(t, err) c := NewContext(nm) c.setPivot([]byte("containerID")) + c.setCBF(p.ContainerBackupFactor()) c.weightFunc = newWeightFunc(newMaxNorm(10000), newReverseMinNorm(1)) c.aggregator = func() aggregator { return new(maxAgg) @@ -388,6 +390,7 @@ func TestPlacementPolicy_ProcessSelectorsInvalid(t *testing.T) { nm, err := NewNetmap(NodesFromInfo(nodes)) require.NoError(t, err) c := NewContext(nm) + c.setCBF(tc.p.ContainerBackupFactor()) require.NoError(t, c.processFilters(tc.p)) err = c.processSelectors(tc.p)