diff --git a/pkg/netmap/aggregator.go b/pkg/netmap/aggregator.go index ab66f7f..d1c6061 100644 --- a/pkg/netmap/aggregator.go +++ b/pkg/netmap/aggregator.go @@ -73,12 +73,6 @@ var ( _ normalizer = (*constNorm)(nil) ) -// capWeightFunc calculates weight which is equal to capacity. -func capWeightFunc(n *Node) float64 { return float64(n.Capacity) } - -// priceWeightFunc calculates weight which is equal to price. -func priceWeightFunc(n *Node) float64 { return float64(n.Price) } - // newWeightFunc returns weightFunc which multiplies normalized // capacity and price. func newWeightFunc(capNorm, priceNorm normalizer) weightFunc { @@ -87,12 +81,6 @@ func newWeightFunc(capNorm, priceNorm normalizer) weightFunc { } } -// newMeanSumAgg returns an aggregator which -// computes mean value by keeping total sum. -func newMeanSumAgg() aggregator { - return new(meanSumAgg) -} - // newMeanAgg returns an aggregator which // computes mean value by recalculating it on // every addition. @@ -106,12 +94,6 @@ func newMinAgg() aggregator { return new(minAgg) } -// newMaxAgg returns an aggregator which -// computes max value. -func newMaxAgg() aggregator { - return new(maxAgg) -} - // newMeanIQRAgg returns an aggregator which // computes mean value of values from IQR interval. func newMeanIQRAgg() aggregator { @@ -124,24 +106,12 @@ func newReverseMinNorm(min float64) normalizer { return &reverseMinNorm{min: min} } -// newMaxNorm returns a normalizer which -// normalize values in range of 0.0 to 1.0 to a maximum value. -func newMaxNorm(max float64) normalizer { - return &maxNorm{max: max} -} - // newSigmoidNorm returns a normalizer which // normalize values in range of 0.0 to 1.0 to a scaled sigmoid. func newSigmoidNorm(scale float64) normalizer { return &sigmoidNorm{scale: scale} } -// newConstNorm returns a normalizer which -// returns a constant values -func newConstNorm(value float64) normalizer { - return &constNorm{value: value} -} - func (a *meanSumAgg) Add(n float64) { a.sum += n a.count++ @@ -151,6 +121,7 @@ func (a *meanSumAgg) Compute() float64 { if a.count == 0 { return 0 } + return a.sum / float64(a.count) } @@ -197,22 +168,27 @@ func (a *meanIQRAgg) Compute() float64 { sort.Slice(a.arr, func(i, j int) bool { return a.arr[i] < a.arr[j] }) var min, max float64 - if l < 4 { + + const minLn = 4 + + if l < minLn { min, max = a.arr[0], a.arr[l-1] } else { - start, end := l/4, l*3/4-1 + start, end := l/minLn, l*3/minLn-1 iqr := a.k * (a.arr[end] - a.arr[start]) min, max = a.arr[start]-iqr, a.arr[end]+iqr } count := 0 sum := float64(0) + for _, e := range a.arr { if e >= min && e <= max { sum += e count++ } } + return sum / float64(count) } @@ -220,6 +196,7 @@ func (r *reverseMinNorm) Normalize(w float64) float64 { if w == 0 { return 0 } + return r.min / w } @@ -227,6 +204,7 @@ func (r *maxNorm) Normalize(w float64) float64 { if r.max == 0 { return 0 } + return w / r.max } @@ -234,7 +212,9 @@ func (r *sigmoidNorm) Normalize(w float64) float64 { if r.scale == 0 { return 0 } + x := w / r.scale + return x / (1 + x) } diff --git a/pkg/netmap/clause.go b/pkg/netmap/clause.go index 2e0278b..37ea3a0 100644 --- a/pkg/netmap/clause.go +++ b/pkg/netmap/clause.go @@ -45,7 +45,7 @@ func (c Clause) ToV2() netmap.Clause { func (c Clause) String() string { switch c { default: - return "UNSPECIFIED" + return "CLAUSE_UNSPECIFIED" case ClauseDistinct: return "DISTINCT" case ClauseSame: diff --git a/pkg/netmap/context.go b/pkg/netmap/context.go index 992d2bd..cedb729 100644 --- a/pkg/netmap/context.go +++ b/pkg/netmap/context.go @@ -70,10 +70,12 @@ func (c *Context) setPivot(pivot []byte) { func GetDefaultWeightFunc(ns Nodes) weightFunc { mean := newMeanAgg() min := newMinAgg() + for i := range ns { mean.Add(float64(ns[i].Capacity)) min.Add(float64(ns[i].Price)) } + return newWeightFunc( newSigmoidNorm(mean.Compute()), newReverseMinNorm(min.Compute())) diff --git a/pkg/netmap/filter.go b/pkg/netmap/filter.go index c9b0515..531645e 100644 --- a/pkg/netmap/filter.go +++ b/pkg/netmap/filter.go @@ -26,6 +26,7 @@ func (c *Context) processFilters(p *PlacementPolicy) error { return err } } + return nil } @@ -33,15 +34,19 @@ func (c *Context) processFilter(f *Filter, top bool) error { if f == nil { return fmt.Errorf("%w: FILTER", ErrMissingField) } + if f.Name() == MainFilterName { return fmt.Errorf("%w: '*' is reserved", ErrInvalidFilterName) } + if top && f.Name() == "" { return ErrUnnamedTopFilter } + if !top && f.Name() != "" && c.Filters[f.Name()] == nil { return fmt.Errorf("%w: '%s'", ErrFilterNotFound, f.Name()) } + switch f.Operation() { case OpAND, OpOR: for _, flt := range f.InnerFilters() { @@ -55,6 +60,7 @@ func (c *Context) processFilter(f *Filter, top bool) error { } else if !top && f.Name() != "" { // named reference return nil } + switch f.Operation() { case OpEQ, OpNE: case OpGT, OpGE, OpLT, OpLE: @@ -62,14 +68,17 @@ func (c *Context) processFilter(f *Filter, top bool) error { if err != nil { return fmt.Errorf("%w: '%s'", ErrInvalidNumber, f.Value()) } + c.numCache[f] = n default: return fmt.Errorf("%w: %s", ErrInvalidFilterOp, f.Operation()) } } + if top { c.Filters[f.Name()] = f } + return nil } @@ -83,11 +92,13 @@ func (c *Context) match(f *Filter, b *Node) bool { if lf.Name() != "" { lf = c.Filters[lf.Name()] } + ok := c.match(lf, b) if ok == (f.Operation() == OpOR) { return ok } } + return f.Operation() == OpAND default: return c.matchKeyValue(f, b) @@ -102,6 +113,7 @@ func (c *Context) matchKeyValue(f *Filter, b *Node) bool { return b.Attribute(f.Key()) != f.Value() default: var attr uint64 + switch f.Key() { case PriceAttr: attr = b.Price @@ -109,6 +121,7 @@ func (c *Context) matchKeyValue(f *Filter, b *Node) bool { attr = b.Capacity default: var err error + attr, err = strconv.ParseUint(b.Attribute(f.Key()), 10, 64) if err != nil { // Note: because filters are somewhat independent from nodes attributes, @@ -116,6 +129,7 @@ func (c *Context) matchKeyValue(f *Filter, b *Node) bool { return false } } + switch f.Operation() { case OpGT: return attr > c.numCache[f] diff --git a/pkg/netmap/netmap.go b/pkg/netmap/netmap.go index 3be6092..95e823b 100644 --- a/pkg/netmap/netmap.go +++ b/pkg/netmap/netmap.go @@ -23,6 +23,7 @@ func flattenNodes(ns []Nodes) Nodes { for i := range ns { result = append(result, ns[i]...) } + return result } @@ -31,11 +32,13 @@ func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes, h := hrw.Hash(pivot) wf := GetDefaultWeightFunc(m.Nodes) result := make([]Nodes, len(cnt.Replicas())) + for i, rep := range cnt.Replicas() { result[i] = make(Nodes, len(rep)) copy(result[i], rep) hrw.SortSliceByWeightValue(result[i], result[i].Weights(wf), h) } + return result, nil } @@ -45,28 +48,35 @@ func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes, func (m *Netmap) GetContainerNodes(p *PlacementPolicy, pivot []byte) (ContainerNodes, error) { c := NewContext(m) c.setPivot(pivot) + if err := c.processFilters(p); err != nil { return nil, err } + if err := c.processSelectors(p); err != nil { return nil, err } + result := make([]Nodes, len(p.Replicas())) + for i, r := range p.Replicas() { if r == nil { return nil, fmt.Errorf("%w: REPLICA", ErrMissingField) } + if r.Selector() == "" { for _, s := range p.Selectors() { result[i] = append(result[i], flattenNodes(c.Selections[s.Name()])...) } } + nodes, ok := c.Selections[r.Selector()] if !ok { return nil, fmt.Errorf("%w: REPLICA '%s'", ErrSelectorNotFound, r.Selector()) } - result[i] = append(result[i], flattenNodes(nodes)...) + result[i] = append(result[i], flattenNodes(nodes)...) } + return containerNodes(result), nil } diff --git a/pkg/netmap/node_info.go b/pkg/netmap/node_info.go index 80ec7d1..a4debbc 100644 --- a/pkg/netmap/node_info.go +++ b/pkg/netmap/node_info.go @@ -63,6 +63,7 @@ func NodesFromInfo(infos []NodeInfo) Nodes { for i := range infos { nodes[i] = newNodeV2(i, &infos[i]) } + return nodes } @@ -73,6 +74,7 @@ func newNodeV2(index int, ni *NodeInfo) *Node { AttrMap: make(map[string]string, len(ni.Attributes())), NodeInfo: ni, } + for _, attr := range ni.Attributes() { switch attr.Key() { case CapacityAttr: @@ -80,8 +82,10 @@ func newNodeV2(index int, ni *NodeInfo) *Node { case PriceAttr: n.Price, _ = strconv.ParseUint(attr.Value(), 10, 64) } + n.AttrMap[attr.Key()] = attr.Value() } + return n } @@ -91,6 +95,7 @@ func (n Nodes) Weights(wf weightFunc) []float64 { for i := range n { w = append(w, wf(n[i])) } + return w } @@ -104,6 +109,7 @@ func GetBucketWeight(ns Nodes, a aggregator, wf weightFunc) float64 { for i := range ns { a.Add(wf(ns[i])) } + return a.Compute() } @@ -134,7 +140,7 @@ func (s NodeState) ToV2() netmap.NodeState { func (s NodeState) String() string { switch s { default: - return "UNSPECIFIED" + return "STATE_UNSPECIFIED" case NodeStateOffline: return "OFFLINE" case NodeStateOnline: diff --git a/pkg/netmap/operation.go b/pkg/netmap/operation.go index 2a8281c..dd388ff 100644 --- a/pkg/netmap/operation.go +++ b/pkg/netmap/operation.go @@ -86,7 +86,7 @@ func (op Operation) ToV2() netmap.Operation { func (op Operation) String() string { switch op { default: - return "UNSPECIFIED" + return "OPERATION_UNSPECIFIED" case OpNE: return "NE" case OpEQ: diff --git a/pkg/netmap/selector.go b/pkg/netmap/selector.go index c77e062..f12027c 100644 --- a/pkg/netmap/selector.go +++ b/pkg/netmap/selector.go @@ -22,13 +22,17 @@ func (c *Context) processSelectors(p *PlacementPolicy) error { return fmt.Errorf("%w: SELECT FROM '%s'", ErrFilterNotFound, s.Filter()) } } + c.Selectors[s.Name()] = s + result, err := c.getSelection(p, s) if err != nil { return err } + c.Selections[s.Name()] = result } + return nil } @@ -47,6 +51,7 @@ func GetNodesCount(p *PlacementPolicy, s *Selector) (int, int) { func (c *Context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error) { bucketCount, nodesInBucket := GetNodesCount(p, s) buckets := c.getSelectionBase(s) + if len(buckets) < bucketCount { return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name()) } @@ -65,22 +70,27 @@ func (c *Context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error) } nodes := make([]Nodes, 0, len(buckets)) + for i := range buckets { ns := buckets[i].nodes if len(ns) >= nodesInBucket { nodes = append(nodes, ns[:nodesInBucket]) } } + if len(nodes) < bucketCount { return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name()) } + if len(c.pivot) != 0 { weights := make([]float64, len(nodes)) for i := range nodes { weights[i] = GetBucketWeight(nodes[i], c.aggregator(), c.weightFunc) } + hrw.SortSliceByWeightIndex(nodes, weights, c.pivotHash) } + return nodes[:bucketCount], nil } @@ -97,6 +107,7 @@ func (c *Context) getSelectionBase(s *Selector) []nodeAttrPair { result := []nodeAttrPair{} nodeMap := map[string]Nodes{} attr := s.Attribute() + for i := range c.Netmap.Nodes { if isMain || c.match(f, c.Netmap.Nodes[i]) { if attr == "" { @@ -108,6 +119,7 @@ func (c *Context) getSelectionBase(s *Selector) []nodeAttrPair { } } } + if attr != "" { for k, ns := range nodeMap { result = append(result, nodeAttrPair{attr: k, nodes: ns}) @@ -119,6 +131,7 @@ func (c *Context) getSelectionBase(s *Selector) []nodeAttrPair { hrw.SortSliceByWeightValue(result[i].nodes, result[i].nodes.Weights(c.weightFunc), c.pivotHash) } } + return result } diff --git a/pkg/netmap/selector_test.go b/pkg/netmap/selector_test.go index 2c8abee..7f74f21 100644 --- a/pkg/netmap/selector_test.go +++ b/pkg/netmap/selector_test.go @@ -163,7 +163,10 @@ func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) { c := NewContext(nm) c.setPivot([]byte("containerID")) c.weightFunc = newWeightFunc(newMaxNorm(10000), newReverseMinNorm(1)) - c.aggregator = newMaxAgg + c.aggregator = func() aggregator { + return new(maxAgg) + } + require.NoError(t, c.processFilters(p)) require.NoError(t, c.processSelectors(p)) @@ -186,6 +189,10 @@ func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) { require.Equal(t, res, cnt) } +func newMaxNorm(max float64) normalizer { + return &maxNorm{max: max} +} + func TestPlacementPolicy_ProcessSelectorsInvalid(t *testing.T) { testCases := []struct { name string