[#249] pkg/netmap: Add CBF field in placement context

If CBF value is not set, then netmap package uses default CBF
value. However it modifies placement policy structure in
`GetContainerNodes()` because policy passed as a pointer.

Instead package can store CBF value in internal context and use
it without policy modification.

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-01-26 18:01:02 +03:00 committed by Alex Vanin
parent 98284f0bfa
commit c35e15a758
4 changed files with 18 additions and 6 deletions

View file

@ -29,6 +29,9 @@ type Context struct {
// weightFunc is a weighting function for determining node priority. // weightFunc is a weighting function for determining node priority.
// By default in combines favours low price and high capacity. // By default in combines favours low price and high capacity.
weightFunc weightFunc weightFunc weightFunc
// container backup factor is a factor for selector counters that expand
// amount of chosen nodes.
cbf uint32
} }
// Various validation errors. // Various validation errors.
@ -56,6 +59,7 @@ func NewContext(nm *Netmap) *Context {
numCache: make(map[*Filter]uint64), numCache: make(map[*Filter]uint64),
aggregator: newMeanIQRAgg, aggregator: newMeanIQRAgg,
weightFunc: GetDefaultWeightFunc(nm.Nodes), weightFunc: GetDefaultWeightFunc(nm.Nodes),
cbf: defaultCBF,
} }
} }
@ -66,6 +70,14 @@ func (c *Context) setPivot(pivot []byte) {
} }
} }
func (c *Context) setCBF(cbf uint32) {
if cbf == 0 {
c.cbf = defaultCBF
} else {
c.cbf = cbf
}
}
// GetDefaultWeightFunc returns default weighting function. // GetDefaultWeightFunc returns default weighting function.
func GetDefaultWeightFunc(ns Nodes) weightFunc { func GetDefaultWeightFunc(ns Nodes) weightFunc {
mean := newMeanAgg() mean := newMeanAgg()

View file

@ -50,10 +50,7 @@ func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes,
func (m *Netmap) GetContainerNodes(p *PlacementPolicy, pivot []byte) (ContainerNodes, error) { func (m *Netmap) GetContainerNodes(p *PlacementPolicy, pivot []byte) (ContainerNodes, error) {
c := NewContext(m) c := NewContext(m)
c.setPivot(pivot) c.setPivot(pivot)
c.setCBF(p.ContainerBackupFactor())
if p.ContainerBackupFactor() == 0 {
p.SetContainerBackupFactor(defaultCBF)
}
if err := c.processFilters(p); err != nil { if err := c.processFilters(p); err != nil {
return nil, err return nil, err

View file

@ -69,7 +69,7 @@ func (c *Context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error)
} }
} }
maxNodesInBucket := nodesInBucket * int(p.ContainerBackupFactor()) maxNodesInBucket := nodesInBucket * int(c.cbf)
nodes := make([]Nodes, 0, len(buckets)) nodes := make([]Nodes, 0, len(buckets))
fallback := make([]Nodes, 0, len(buckets)) fallback := make([]Nodes, 0, len(buckets))

View file

@ -270,6 +270,7 @@ func TestPlacementPolicy_ProcessSelectors(t *testing.T) {
nm, err := NewNetmap(NodesFromInfo(nodes)) nm, err := NewNetmap(NodesFromInfo(nodes))
require.NoError(t, err) require.NoError(t, err)
c := NewContext(nm) c := NewContext(nm)
c.setCBF(p.ContainerBackupFactor())
require.NoError(t, c.processFilters(p)) require.NoError(t, c.processFilters(p))
require.NoError(t, c.processSelectors(p)) require.NoError(t, c.processSelectors(p))
@ -277,7 +278,7 @@ func TestPlacementPolicy_ProcessSelectors(t *testing.T) {
sel := c.Selections[s.Name()] sel := c.Selections[s.Name()]
s := c.Selectors[s.Name()] s := c.Selectors[s.Name()]
bucketCount, nodesInBucket := GetNodesCount(p, s) bucketCount, nodesInBucket := GetNodesCount(p, s)
nodesInBucket *= int(p.ContainerBackupFactor()) nodesInBucket *= int(c.cbf)
targ := fmt.Sprintf("selector '%s'", s.Name()) targ := fmt.Sprintf("selector '%s'", s.Name())
require.Equal(t, bucketCount, len(sel), targ) require.Equal(t, bucketCount, len(sel), targ)
for _, res := range sel { for _, res := range sel {
@ -312,6 +313,7 @@ func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
c := NewContext(nm) c := NewContext(nm)
c.setPivot([]byte("containerID")) c.setPivot([]byte("containerID"))
c.setCBF(p.ContainerBackupFactor())
c.weightFunc = newWeightFunc(newMaxNorm(10000), newReverseMinNorm(1)) c.weightFunc = newWeightFunc(newMaxNorm(10000), newReverseMinNorm(1))
c.aggregator = func() aggregator { c.aggregator = func() aggregator {
return new(maxAgg) return new(maxAgg)
@ -388,6 +390,7 @@ func TestPlacementPolicy_ProcessSelectorsInvalid(t *testing.T) {
nm, err := NewNetmap(NodesFromInfo(nodes)) nm, err := NewNetmap(NodesFromInfo(nodes))
require.NoError(t, err) require.NoError(t, err)
c := NewContext(nm) c := NewContext(nm)
c.setCBF(tc.p.ContainerBackupFactor())
require.NoError(t, c.processFilters(tc.p)) require.NoError(t, c.processFilters(tc.p))
err = c.processSelectors(tc.p) err = c.processSelectors(tc.p)