[#199] sdk/netmap: Correct linter's remarks

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-11-16 17:52:15 +03:00 committed by Alex Vanin
parent 459d295788
commit 3a966ee5df
9 changed files with 69 additions and 37 deletions

View file

@ -73,12 +73,6 @@ var (
_ normalizer = (*constNorm)(nil)
)
// capWeightFunc calculates weight which is equal to capacity.
func capWeightFunc(n *Node) float64 { return float64(n.Capacity) }
// priceWeightFunc calculates weight which is equal to price.
func priceWeightFunc(n *Node) float64 { return float64(n.Price) }
// newWeightFunc returns weightFunc which multiplies normalized
// capacity and price.
func newWeightFunc(capNorm, priceNorm normalizer) weightFunc {
@ -87,12 +81,6 @@ func newWeightFunc(capNorm, priceNorm normalizer) weightFunc {
}
}
// newMeanSumAgg returns an aggregator which
// computes mean value by keeping total sum.
func newMeanSumAgg() aggregator {
return new(meanSumAgg)
}
// newMeanAgg returns an aggregator which
// computes mean value by recalculating it on
// every addition.
@ -106,12 +94,6 @@ func newMinAgg() aggregator {
return new(minAgg)
}
// newMaxAgg returns an aggregator which
// computes max value.
func newMaxAgg() aggregator {
return new(maxAgg)
}
// newMeanIQRAgg returns an aggregator which
// computes mean value of values from IQR interval.
func newMeanIQRAgg() aggregator {
@ -124,24 +106,12 @@ func newReverseMinNorm(min float64) normalizer {
return &reverseMinNorm{min: min}
}
// newMaxNorm returns a normalizer which
// normalize values in range of 0.0 to 1.0 to a maximum value.
func newMaxNorm(max float64) normalizer {
return &maxNorm{max: max}
}
// newSigmoidNorm returns a normalizer which
// normalize values in range of 0.0 to 1.0 to a scaled sigmoid.
func newSigmoidNorm(scale float64) normalizer {
return &sigmoidNorm{scale: scale}
}
// newConstNorm returns a normalizer which
// returns a constant values
func newConstNorm(value float64) normalizer {
return &constNorm{value: value}
}
func (a *meanSumAgg) Add(n float64) {
a.sum += n
a.count++
@ -151,6 +121,7 @@ func (a *meanSumAgg) Compute() float64 {
if a.count == 0 {
return 0
}
return a.sum / float64(a.count)
}
@ -197,22 +168,27 @@ func (a *meanIQRAgg) Compute() float64 {
sort.Slice(a.arr, func(i, j int) bool { return a.arr[i] < a.arr[j] })
var min, max float64
if l < 4 {
const minLn = 4
if l < minLn {
min, max = a.arr[0], a.arr[l-1]
} else {
start, end := l/4, l*3/4-1
start, end := l/minLn, l*3/minLn-1
iqr := a.k * (a.arr[end] - a.arr[start])
min, max = a.arr[start]-iqr, a.arr[end]+iqr
}
count := 0
sum := float64(0)
for _, e := range a.arr {
if e >= min && e <= max {
sum += e
count++
}
}
return sum / float64(count)
}
@ -220,6 +196,7 @@ func (r *reverseMinNorm) Normalize(w float64) float64 {
if w == 0 {
return 0
}
return r.min / w
}
@ -227,6 +204,7 @@ func (r *maxNorm) Normalize(w float64) float64 {
if r.max == 0 {
return 0
}
return w / r.max
}
@ -234,7 +212,9 @@ func (r *sigmoidNorm) Normalize(w float64) float64 {
if r.scale == 0 {
return 0
}
x := w / r.scale
return x / (1 + x)
}

View file

@ -45,7 +45,7 @@ func (c Clause) ToV2() netmap.Clause {
func (c Clause) String() string {
switch c {
default:
return "UNSPECIFIED"
return "CLAUSE_UNSPECIFIED"
case ClauseDistinct:
return "DISTINCT"
case ClauseSame:

View file

@ -70,10 +70,12 @@ func (c *Context) setPivot(pivot []byte) {
func GetDefaultWeightFunc(ns Nodes) weightFunc {
mean := newMeanAgg()
min := newMinAgg()
for i := range ns {
mean.Add(float64(ns[i].Capacity))
min.Add(float64(ns[i].Price))
}
return newWeightFunc(
newSigmoidNorm(mean.Compute()),
newReverseMinNorm(min.Compute()))

View file

@ -26,6 +26,7 @@ func (c *Context) processFilters(p *PlacementPolicy) error {
return err
}
}
return nil
}
@ -33,15 +34,19 @@ func (c *Context) processFilter(f *Filter, top bool) error {
if f == nil {
return fmt.Errorf("%w: FILTER", ErrMissingField)
}
if f.Name() == MainFilterName {
return fmt.Errorf("%w: '*' is reserved", ErrInvalidFilterName)
}
if top && f.Name() == "" {
return ErrUnnamedTopFilter
}
if !top && f.Name() != "" && c.Filters[f.Name()] == nil {
return fmt.Errorf("%w: '%s'", ErrFilterNotFound, f.Name())
}
switch f.Operation() {
case OpAND, OpOR:
for _, flt := range f.InnerFilters() {
@ -55,6 +60,7 @@ func (c *Context) processFilter(f *Filter, top bool) error {
} else if !top && f.Name() != "" { // named reference
return nil
}
switch f.Operation() {
case OpEQ, OpNE:
case OpGT, OpGE, OpLT, OpLE:
@ -62,14 +68,17 @@ func (c *Context) processFilter(f *Filter, top bool) error {
if err != nil {
return fmt.Errorf("%w: '%s'", ErrInvalidNumber, f.Value())
}
c.numCache[f] = n
default:
return fmt.Errorf("%w: %s", ErrInvalidFilterOp, f.Operation())
}
}
if top {
c.Filters[f.Name()] = f
}
return nil
}
@ -83,11 +92,13 @@ func (c *Context) match(f *Filter, b *Node) bool {
if lf.Name() != "" {
lf = c.Filters[lf.Name()]
}
ok := c.match(lf, b)
if ok == (f.Operation() == OpOR) {
return ok
}
}
return f.Operation() == OpAND
default:
return c.matchKeyValue(f, b)
@ -102,6 +113,7 @@ func (c *Context) matchKeyValue(f *Filter, b *Node) bool {
return b.Attribute(f.Key()) != f.Value()
default:
var attr uint64
switch f.Key() {
case PriceAttr:
attr = b.Price
@ -109,6 +121,7 @@ func (c *Context) matchKeyValue(f *Filter, b *Node) bool {
attr = b.Capacity
default:
var err error
attr, err = strconv.ParseUint(b.Attribute(f.Key()), 10, 64)
if err != nil {
// Note: because filters are somewhat independent from nodes attributes,
@ -116,6 +129,7 @@ func (c *Context) matchKeyValue(f *Filter, b *Node) bool {
return false
}
}
switch f.Operation() {
case OpGT:
return attr > c.numCache[f]

View file

@ -23,6 +23,7 @@ func flattenNodes(ns []Nodes) Nodes {
for i := range ns {
result = append(result, ns[i]...)
}
return result
}
@ -31,11 +32,13 @@ func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes,
h := hrw.Hash(pivot)
wf := GetDefaultWeightFunc(m.Nodes)
result := make([]Nodes, len(cnt.Replicas()))
for i, rep := range cnt.Replicas() {
result[i] = make(Nodes, len(rep))
copy(result[i], rep)
hrw.SortSliceByWeightValue(result[i], result[i].Weights(wf), h)
}
return result, nil
}
@ -45,28 +48,35 @@ func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes,
func (m *Netmap) GetContainerNodes(p *PlacementPolicy, pivot []byte) (ContainerNodes, error) {
c := NewContext(m)
c.setPivot(pivot)
if err := c.processFilters(p); err != nil {
return nil, err
}
if err := c.processSelectors(p); err != nil {
return nil, err
}
result := make([]Nodes, len(p.Replicas()))
for i, r := range p.Replicas() {
if r == nil {
return nil, fmt.Errorf("%w: REPLICA", ErrMissingField)
}
if r.Selector() == "" {
for _, s := range p.Selectors() {
result[i] = append(result[i], flattenNodes(c.Selections[s.Name()])...)
}
}
nodes, ok := c.Selections[r.Selector()]
if !ok {
return nil, fmt.Errorf("%w: REPLICA '%s'", ErrSelectorNotFound, r.Selector())
}
result[i] = append(result[i], flattenNodes(nodes)...)
result[i] = append(result[i], flattenNodes(nodes)...)
}
return containerNodes(result), nil
}

View file

@ -63,6 +63,7 @@ func NodesFromInfo(infos []NodeInfo) Nodes {
for i := range infos {
nodes[i] = newNodeV2(i, &infos[i])
}
return nodes
}
@ -73,6 +74,7 @@ func newNodeV2(index int, ni *NodeInfo) *Node {
AttrMap: make(map[string]string, len(ni.Attributes())),
NodeInfo: ni,
}
for _, attr := range ni.Attributes() {
switch attr.Key() {
case CapacityAttr:
@ -80,8 +82,10 @@ func newNodeV2(index int, ni *NodeInfo) *Node {
case PriceAttr:
n.Price, _ = strconv.ParseUint(attr.Value(), 10, 64)
}
n.AttrMap[attr.Key()] = attr.Value()
}
return n
}
@ -91,6 +95,7 @@ func (n Nodes) Weights(wf weightFunc) []float64 {
for i := range n {
w = append(w, wf(n[i]))
}
return w
}
@ -104,6 +109,7 @@ func GetBucketWeight(ns Nodes, a aggregator, wf weightFunc) float64 {
for i := range ns {
a.Add(wf(ns[i]))
}
return a.Compute()
}
@ -134,7 +140,7 @@ func (s NodeState) ToV2() netmap.NodeState {
func (s NodeState) String() string {
switch s {
default:
return "UNSPECIFIED"
return "STATE_UNSPECIFIED"
case NodeStateOffline:
return "OFFLINE"
case NodeStateOnline:

View file

@ -86,7 +86,7 @@ func (op Operation) ToV2() netmap.Operation {
func (op Operation) String() string {
switch op {
default:
return "UNSPECIFIED"
return "OPERATION_UNSPECIFIED"
case OpNE:
return "NE"
case OpEQ:

View file

@ -22,13 +22,17 @@ func (c *Context) processSelectors(p *PlacementPolicy) error {
return fmt.Errorf("%w: SELECT FROM '%s'", ErrFilterNotFound, s.Filter())
}
}
c.Selectors[s.Name()] = s
result, err := c.getSelection(p, s)
if err != nil {
return err
}
c.Selections[s.Name()] = result
}
return nil
}
@ -47,6 +51,7 @@ func GetNodesCount(p *PlacementPolicy, s *Selector) (int, int) {
func (c *Context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error) {
bucketCount, nodesInBucket := GetNodesCount(p, s)
buckets := c.getSelectionBase(s)
if len(buckets) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name())
}
@ -65,22 +70,27 @@ func (c *Context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error)
}
nodes := make([]Nodes, 0, len(buckets))
for i := range buckets {
ns := buckets[i].nodes
if len(ns) >= nodesInBucket {
nodes = append(nodes, ns[:nodesInBucket])
}
}
if len(nodes) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name())
}
if len(c.pivot) != 0 {
weights := make([]float64, len(nodes))
for i := range nodes {
weights[i] = GetBucketWeight(nodes[i], c.aggregator(), c.weightFunc)
}
hrw.SortSliceByWeightIndex(nodes, weights, c.pivotHash)
}
return nodes[:bucketCount], nil
}
@ -97,6 +107,7 @@ func (c *Context) getSelectionBase(s *Selector) []nodeAttrPair {
result := []nodeAttrPair{}
nodeMap := map[string]Nodes{}
attr := s.Attribute()
for i := range c.Netmap.Nodes {
if isMain || c.match(f, c.Netmap.Nodes[i]) {
if attr == "" {
@ -108,6 +119,7 @@ func (c *Context) getSelectionBase(s *Selector) []nodeAttrPair {
}
}
}
if attr != "" {
for k, ns := range nodeMap {
result = append(result, nodeAttrPair{attr: k, nodes: ns})
@ -119,6 +131,7 @@ func (c *Context) getSelectionBase(s *Selector) []nodeAttrPair {
hrw.SortSliceByWeightValue(result[i].nodes, result[i].nodes.Weights(c.weightFunc), c.pivotHash)
}
}
return result
}

View file

@ -163,7 +163,10 @@ func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) {
c := NewContext(nm)
c.setPivot([]byte("containerID"))
c.weightFunc = newWeightFunc(newMaxNorm(10000), newReverseMinNorm(1))
c.aggregator = newMaxAgg
c.aggregator = func() aggregator {
return new(maxAgg)
}
require.NoError(t, c.processFilters(p))
require.NoError(t, c.processSelectors(p))
@ -186,6 +189,10 @@ func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) {
require.Equal(t, res, cnt)
}
func newMaxNorm(max float64) normalizer {
return &maxNorm{max: max}
}
func TestPlacementPolicy_ProcessSelectorsInvalid(t *testing.T) {
testCases := []struct {
name string