From 20264737336cc25c5177215fa8cb61d1e7602d08 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Sat, 5 Sep 2020 13:37:47 +0300 Subject: [PATCH] [#137] sdk: Implement netmap filtering and selection Signed-off-by: Evgenii Stratonikov --- go.mod | 1 + go.sum | 3 + pkg/netmap/aggregator.go | 243 ++++++++++++++++++++++++++++++++++++ pkg/netmap/container.go | 19 +++ pkg/netmap/context.go | 81 ++++++++++++ pkg/netmap/doc.go | 11 ++ pkg/netmap/filter.go | 129 +++++++++++++++++++ pkg/netmap/filter_test.go | 203 ++++++++++++++++++++++++++++++ pkg/netmap/helper_test.go | 61 +++++++++ pkg/netmap/netmap.go | 73 +++++++++++ pkg/netmap/node_info.go | 89 +++++++++++++ pkg/netmap/selector.go | 98 +++++++++++++++ pkg/netmap/selector_test.go | 218 ++++++++++++++++++++++++++++++++ 13 files changed, 1229 insertions(+) create mode 100644 pkg/netmap/aggregator.go create mode 100644 pkg/netmap/container.go create mode 100644 pkg/netmap/context.go create mode 100644 pkg/netmap/doc.go create mode 100644 pkg/netmap/filter.go create mode 100644 pkg/netmap/filter_test.go create mode 100644 pkg/netmap/helper_test.go create mode 100644 pkg/netmap/netmap.go create mode 100644 pkg/netmap/node_info.go create mode 100644 pkg/netmap/selector.go create mode 100644 pkg/netmap/selector_test.go diff --git a/go.mod b/go.mod index 372a59c..d9b42dd 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/golang/protobuf v1.4.2 github.com/google/uuid v1.1.1 github.com/mr-tron/base58 v1.1.2 + github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.91.0 github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 8f9905b..565fd48 100644 --- a/go.sum +++ b/go.sum @@ -135,6 +135,8 @@ github.com/nspcc-dev/dbft v0.0.0-20191209120240-0d6b7568d9ae/go.mod h1:3FjXOoHmA github.com/nspcc-dev/dbft v0.0.0-20200117124306-478e5cfbf03a/go.mod h1:/YFK+XOxxg0Bfm6P92lY5eDSLYfp06XOdL8KAVgXjVk= github.com/nspcc-dev/dbft v0.0.0-20200219114139-199d286ed6c1/go.mod h1:O0qtn62prQSqizzoagHmuuKoz8QMkU3SzBoKdEvm3aQ= github.com/nspcc-dev/dbft v0.0.0-20200711144034-c526ccc6f570/go.mod h1:1FYQXSbb6/9HQIkoF8XO7W/S8N7AZRkBsgwbcXRvk0E= +github.com/nspcc-dev/hrw v1.0.9 h1:17VcAuTtrstmFppBjfRiia4K2wA/ukXZhLFS8Y8rz5Y= +github.com/nspcc-dev/hrw v1.0.9/go.mod h1:l/W2vx83vMQo6aStyx2AuZrJ+07lGv2JQGlVkPG06MU= github.com/nspcc-dev/neo-go v0.73.1-pre.0.20200303142215-f5a1b928ce09/go.mod h1:pPYwPZ2ks+uMnlRLUyXOpLieaDQSEaf4NM3zHVbRjmg= github.com/nspcc-dev/neo-go v0.91.0 h1:KKOPMKs0fm8JIau1SuwxiLdrZ+1kDPBiVRlWwzfebWE= github.com/nspcc-dev/neo-go v0.91.0/go.mod h1:G6HdOWvzQ6tlvFdvFSN/PgCzLPN/X/X4d5hTjFRUDcc= @@ -175,6 +177,7 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= diff --git a/pkg/netmap/aggregator.go b/pkg/netmap/aggregator.go new file mode 100644 index 0000000..ab66f7f --- /dev/null +++ b/pkg/netmap/aggregator.go @@ -0,0 +1,243 @@ +package netmap + +import ( + "sort" +) + +type ( + // aggregator can calculate some value across all netmap + // such as median, minimum or maximum. + aggregator interface { + Add(float64) + Compute() float64 + } + + // normalizer normalizes weight. + normalizer interface { + Normalize(w float64) float64 + } + + meanSumAgg struct { + sum float64 + count int + } + + meanAgg struct { + mean float64 + count int + } + + minAgg struct { + min float64 + } + + maxAgg struct { + max float64 + } + + meanIQRAgg struct { + k float64 + arr []float64 + } + + reverseMinNorm struct { + min float64 + } + + maxNorm struct { + max float64 + } + + sigmoidNorm struct { + scale float64 + } + + constNorm struct { + value float64 + } + + // weightFunc calculates n's weight. + weightFunc = func(n *Node) float64 +) + +var ( + _ aggregator = (*meanSumAgg)(nil) + _ aggregator = (*meanAgg)(nil) + _ aggregator = (*minAgg)(nil) + _ aggregator = (*maxAgg)(nil) + _ aggregator = (*meanIQRAgg)(nil) + + _ normalizer = (*reverseMinNorm)(nil) + _ normalizer = (*maxNorm)(nil) + _ normalizer = (*sigmoidNorm)(nil) + _ normalizer = (*constNorm)(nil) +) + +// capWeightFunc calculates weight which is equal to capacity. +func capWeightFunc(n *Node) float64 { return float64(n.Capacity) } + +// priceWeightFunc calculates weight which is equal to price. +func priceWeightFunc(n *Node) float64 { return float64(n.Price) } + +// newWeightFunc returns weightFunc which multiplies normalized +// capacity and price. +func newWeightFunc(capNorm, priceNorm normalizer) weightFunc { + return func(n *Node) float64 { + return capNorm.Normalize(float64(n.Capacity)) * priceNorm.Normalize(float64(n.Price)) + } +} + +// newMeanSumAgg returns an aggregator which +// computes mean value by keeping total sum. +func newMeanSumAgg() aggregator { + return new(meanSumAgg) +} + +// newMeanAgg returns an aggregator which +// computes mean value by recalculating it on +// every addition. +func newMeanAgg() aggregator { + return new(meanAgg) +} + +// newMinAgg returns an aggregator which +// computes min value. +func newMinAgg() aggregator { + return new(minAgg) +} + +// newMaxAgg returns an aggregator which +// computes max value. +func newMaxAgg() aggregator { + return new(maxAgg) +} + +// newMeanIQRAgg returns an aggregator which +// computes mean value of values from IQR interval. +func newMeanIQRAgg() aggregator { + return new(meanIQRAgg) +} + +// newReverseMinNorm returns a normalizer which +// normalize values in range of 0.0 to 1.0 to a minimum value. +func newReverseMinNorm(min float64) normalizer { + return &reverseMinNorm{min: min} +} + +// newMaxNorm returns a normalizer which +// normalize values in range of 0.0 to 1.0 to a maximum value. +func newMaxNorm(max float64) normalizer { + return &maxNorm{max: max} +} + +// newSigmoidNorm returns a normalizer which +// normalize values in range of 0.0 to 1.0 to a scaled sigmoid. +func newSigmoidNorm(scale float64) normalizer { + return &sigmoidNorm{scale: scale} +} + +// newConstNorm returns a normalizer which +// returns a constant values +func newConstNorm(value float64) normalizer { + return &constNorm{value: value} +} + +func (a *meanSumAgg) Add(n float64) { + a.sum += n + a.count++ +} + +func (a *meanSumAgg) Compute() float64 { + if a.count == 0 { + return 0 + } + return a.sum / float64(a.count) +} + +func (a *meanAgg) Add(n float64) { + c := a.count + 1 + a.mean = a.mean*(float64(a.count)/float64(c)) + n/float64(c) + a.count++ +} + +func (a *meanAgg) Compute() float64 { + return a.mean +} + +func (a *minAgg) Add(n float64) { + if a.min == 0 || n < a.min { + a.min = n + } +} + +func (a *minAgg) Compute() float64 { + return a.min +} + +func (a *maxAgg) Add(n float64) { + if n > a.max { + a.max = n + } +} + +func (a *maxAgg) Compute() float64 { + return a.max +} + +func (a *meanIQRAgg) Add(n float64) { + a.arr = append(a.arr, n) +} + +func (a *meanIQRAgg) Compute() float64 { + l := len(a.arr) + if l == 0 { + return 0 + } + + sort.Slice(a.arr, func(i, j int) bool { return a.arr[i] < a.arr[j] }) + + var min, max float64 + if l < 4 { + min, max = a.arr[0], a.arr[l-1] + } else { + start, end := l/4, l*3/4-1 + iqr := a.k * (a.arr[end] - a.arr[start]) + min, max = a.arr[start]-iqr, a.arr[end]+iqr + } + + count := 0 + sum := float64(0) + for _, e := range a.arr { + if e >= min && e <= max { + sum += e + count++ + } + } + return sum / float64(count) +} + +func (r *reverseMinNorm) Normalize(w float64) float64 { + if w == 0 { + return 0 + } + return r.min / w +} + +func (r *maxNorm) Normalize(w float64) float64 { + if r.max == 0 { + return 0 + } + return w / r.max +} + +func (r *sigmoidNorm) Normalize(w float64) float64 { + if r.scale == 0 { + return 0 + } + x := w / r.scale + return x / (1 + x) +} + +func (r *constNorm) Normalize(_ float64) float64 { + return r.value +} diff --git a/pkg/netmap/container.go b/pkg/netmap/container.go new file mode 100644 index 0000000..00875a9 --- /dev/null +++ b/pkg/netmap/container.go @@ -0,0 +1,19 @@ +package netmap + +// ContainerNodes represents nodes in the container. +type ContainerNodes interface { + Replicas() []Nodes + Flatten() Nodes +} + +type containerNodes []Nodes + +// Flatten returns list of all nodes from the container. +func (c containerNodes) Flatten() Nodes { + return flattenNodes(c) +} + +// Replicas return list of container replicas. +func (c containerNodes) Replicas() []Nodes { + return c +} diff --git a/pkg/netmap/context.go b/pkg/netmap/context.go new file mode 100644 index 0000000..9de49a9 --- /dev/null +++ b/pkg/netmap/context.go @@ -0,0 +1,81 @@ +package netmap + +import ( + "errors" + + "github.com/nspcc-dev/hrw" + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +// Context contains references to named filters and cached numeric values. +type Context struct { + // Netmap is a netmap structure to operate on. + Netmap *Netmap + // Filters stores processed filters. + Filters map[string]*netmap.Filter + // Selectors stores processed selectors. + Selectors map[string]*netmap.Selector + // Selections stores result of selector processing. + Selections map[string][]Nodes + + // numCache stores parsed numeric values. + numCache map[*netmap.Filter]uint64 + // pivot is a seed for HRW. + pivot []byte + // pivotHash is a saved HRW hash of pivot + pivotHash uint64 + // aggregator is returns aggregator determining bucket weight. + // By default it returns mean value from IQR interval. + aggregator func() aggregator + // weightFunc is a weighting function for determining node priority. + // By default in combines favours low price and high capacity. + weightFunc weightFunc +} + +// Various validation errors. +var ( + ErrMissingField = errors.New("netmap: nil field") + ErrInvalidFilterName = errors.New("netmap: filter name is invalid") + ErrInvalidNumber = errors.New("netmap: number value expected") + ErrInvalidFilterOp = errors.New("netmap: invalid filter operation") + ErrFilterNotFound = errors.New("netmap: filter not found") + ErrNonEmptyFilters = errors.New("netmap: simple filter must no contain sub-filters") + ErrNotEnoughNodes = errors.New("netmap: not enough nodes to SELECT from") + ErrSelectorNotFound = errors.New("netmap: selector not found") + ErrUnnamedTopFilter = errors.New("netmap: all filters on top level must be named") +) + +// NewContext creates new context. It contains various caches. +// In future it may create hierarchical netmap structure to work with. +func NewContext(nm *Netmap) *Context { + return &Context{ + Netmap: nm, + Filters: make(map[string]*netmap.Filter), + Selectors: make(map[string]*netmap.Selector), + Selections: make(map[string][]Nodes), + + numCache: make(map[*netmap.Filter]uint64), + aggregator: newMeanIQRAgg, + weightFunc: GetDefaultWeightFunc(nm.Nodes), + } +} + +func (c *Context) setPivot(pivot []byte) { + if len(pivot) != 0 { + c.pivot = pivot + c.pivotHash = hrw.Hash(pivot) + } +} + +// GetDefaultWeightFunc returns default weighting function. +func GetDefaultWeightFunc(ns Nodes) weightFunc { + mean := newMeanAgg() + min := newMinAgg() + for i := range ns { + mean.Add(float64(ns[i].Capacity)) + min.Add(float64(ns[i].Price)) + } + return newWeightFunc( + newSigmoidNorm(mean.Compute()), + newReverseMinNorm(min.Compute())) +} diff --git a/pkg/netmap/doc.go b/pkg/netmap/doc.go new file mode 100644 index 0000000..a38e985 --- /dev/null +++ b/pkg/netmap/doc.go @@ -0,0 +1,11 @@ +/* +Package netmap provides routines for working with netmap and placement policy. +Work is done in 4 steps: +1. Create context containing results shared between steps. +2. Processing filters. +3. Processing selectors. +4. Processing replicas. + +Each step depends only on previous ones. +*/ +package netmap diff --git a/pkg/netmap/filter.go b/pkg/netmap/filter.go new file mode 100644 index 0000000..e4df979 --- /dev/null +++ b/pkg/netmap/filter.go @@ -0,0 +1,129 @@ +package netmap + +import ( + "fmt" + "strconv" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +// MainFilterName is a name of the filter +// which points to the whole netmap. +const MainFilterName = "*" + +// applyFilter applies named filter to b. +func (c *Context) applyFilter(name string, b *Node) bool { + return name == MainFilterName || c.match(c.Filters[name], b) +} + +// processFilters processes filters and returns error is any of them is invalid. +func (c *Context) processFilters(p *netmap.PlacementPolicy) error { + for _, f := range p.GetFilters() { + if err := c.processFilter(f, true); err != nil { + return err + } + } + return nil +} + +func (c *Context) processFilter(f *netmap.Filter, top bool) error { + if f == nil { + return fmt.Errorf("%w: FILTER", ErrMissingField) + } + if f.GetName() == MainFilterName { + return fmt.Errorf("%w: '*' is reserved", ErrInvalidFilterName) + } + if top && f.GetName() == "" { + return ErrUnnamedTopFilter + } + if !top && f.GetName() != "" && c.Filters[f.GetName()] == nil { + return fmt.Errorf("%w: '%s'", ErrFilterNotFound, f.GetName()) + } + switch f.GetOp() { + case netmap.AND, netmap.OR: + for _, flt := range f.GetFilters() { + if err := c.processFilter(flt, false); err != nil { + return err + } + } + default: + if len(f.GetFilters()) != 0 { + return ErrNonEmptyFilters + } else if !top && f.GetName() != "" { // named reference + return nil + } + switch f.GetOp() { + case netmap.EQ, netmap.NE: + case netmap.GT, netmap.GE, netmap.LT, netmap.LE: + n, err := strconv.ParseUint(f.GetValue(), 10, 64) + if err != nil { + return fmt.Errorf("%w: '%s'", ErrInvalidNumber, f.GetValue()) + } + c.numCache[f] = n + default: + return fmt.Errorf("%w: %d", ErrInvalidFilterOp, f.GetOp()) + } + } + if top { + c.Filters[f.GetName()] = f + } + return nil +} + +// match matches f against b. It returns no errors because +// filter should have been parsed during context creation +// and missing node properties are considered as a regular fail. +func (c *Context) match(f *netmap.Filter, b *Node) bool { + switch f.GetOp() { + case netmap.AND, netmap.OR: + for _, lf := range f.GetFilters() { + if lf.GetName() != "" { + lf = c.Filters[lf.GetName()] + } + ok := c.match(lf, b) + if ok == (f.GetOp() == netmap.OR) { + return ok + } + } + return f.GetOp() == netmap.AND + default: + return c.matchKeyValue(f, b) + } +} + +func (c *Context) matchKeyValue(f *netmap.Filter, b *Node) bool { + switch f.GetOp() { + case netmap.EQ: + return b.Attribute(f.GetKey()) == f.GetValue() + case netmap.NE: + return b.Attribute(f.GetKey()) != f.GetValue() + default: + var attr uint64 + switch f.GetKey() { + case PriceAttr: + attr = b.Price + case CapacityAttr: + attr = b.Capacity + default: + var err error + attr, err = strconv.ParseUint(b.Attribute(f.GetKey()), 10, 64) + if err != nil { + // Note: because filters are somewhat independent from nodes attributes, + // We don't report an error here, and fail filter instead. + return false + } + } + switch f.GetOp() { + case netmap.GT: + return attr > c.numCache[f] + case netmap.GE: + return attr >= c.numCache[f] + case netmap.LT: + return attr < c.numCache[f] + case netmap.LE: + return attr <= c.numCache[f] + } + } + // will not happen if context was created from f (maybe panic?) + return false +} diff --git a/pkg/netmap/filter_test.go b/pkg/netmap/filter_test.go new file mode 100644 index 0000000..82a5445 --- /dev/null +++ b/pkg/netmap/filter_test.go @@ -0,0 +1,203 @@ +package netmap + +import ( + "errors" + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap" + "github.com/stretchr/testify/require" +) + +func TestContext_ProcessFilters(t *testing.T) { + fs := []*netmap.Filter{ + newFilter("StorageSSD", "Storage", "SSD", netmap.EQ), + newFilter("GoodRating", "Rating", "4", netmap.GE), + newFilter("Main", "", "", netmap.AND, + newFilter("StorageSSD", "", "", 0), + newFilter("", "IntField", "123", netmap.LT), + newFilter("GoodRating", "", "", 0)), + } + nm, err := NewNetmap(nil) + require.NoError(t, err) + c := NewContext(nm) + p := newPlacementPolicy(1, nil, nil, fs) + require.NoError(t, c.processFilters(p)) + require.Equal(t, 3, len(c.Filters)) + for _, f := range fs { + require.Equal(t, f, c.Filters[f.GetName()]) + } + + require.Equal(t, uint64(4), c.numCache[fs[1]]) + require.Equal(t, uint64(123), c.numCache[fs[2].GetFilters()[1]]) +} + +func TestContext_ProcessFiltersInvalid(t *testing.T) { + errTestCases := []struct { + name string + filter *netmap.Filter + err error + }{ + { + "UnnamedTop", + newFilter("", "Storage", "SSD", netmap.EQ), + ErrUnnamedTopFilter, + }, + { + "InvalidReference", + newFilter("Main", "", "", netmap.AND, + newFilter("StorageSSD", "", "", 0)), + ErrFilterNotFound, + }, + { + "NonEmptyKeyed", + newFilter("Main", "Storage", "SSD", netmap.EQ, + newFilter("StorageSSD", "", "", 0)), + ErrNonEmptyFilters, + }, + { + "InvalidNumber", + newFilter("Main", "Rating", "three", netmap.GE), + ErrInvalidNumber, + }, + { + "InvalidOp", + newFilter("Main", "Rating", "3", netmap.UnspecifiedOperation), + ErrInvalidFilterOp, + }, + { + "InvalidName", + newFilter("*", "Rating", "3", netmap.GE), + ErrInvalidFilterName, + }, + { + "MissingFilter", + nil, + ErrMissingField, + }, + } + for _, tc := range errTestCases { + t.Run(tc.name, func(t *testing.T) { + c := NewContext(new(Netmap)) + p := newPlacementPolicy(1, nil, nil, []*netmap.Filter{tc.filter}) + err := c.processFilters(p) + require.True(t, errors.Is(err, tc.err), "got: %v", err) + }) + } +} + +func TestFilter_MatchSimple(t *testing.T) { + b := &Node{AttrMap: map[string]string{ + "Rating": "4", + "Country": "Germany", + }} + testCases := []struct { + name string + ok bool + f *netmap.Filter + }{ + { + "GE_true", true, + newFilter("Main", "Rating", "4", netmap.GE), + }, + { + "GE_false", false, + newFilter("Main", "Rating", "5", netmap.GE), + }, + { + "GT_true", true, + newFilter("Main", "Rating", "3", netmap.GT), + }, + { + "GT_false", false, + newFilter("Main", "Rating", "4", netmap.GT), + }, + { + "LE_true", true, + newFilter("Main", "Rating", "4", netmap.LE), + }, + { + "LE_false", false, + newFilter("Main", "Rating", "3", netmap.LE), + }, + { + "LT_true", true, + newFilter("Main", "Rating", "5", netmap.LT), + }, + { + "LT_false", false, + newFilter("Main", "Rating", "4", netmap.LT), + }, + { + "EQ_true", true, + newFilter("Main", "Country", "Germany", netmap.EQ), + }, + { + "EQ_false", false, + newFilter("Main", "Country", "China", netmap.EQ), + }, + { + "NE_true", true, + newFilter("Main", "Country", "France", netmap.NE), + }, + { + "NE_false", false, + newFilter("Main", "Country", "Germany", netmap.NE), + }, + } + for _, tc := range testCases { + c := NewContext(new(Netmap)) + p := newPlacementPolicy(1, nil, nil, []*netmap.Filter{tc.f}) + require.NoError(t, c.processFilters(p)) + require.Equal(t, tc.ok, c.match(tc.f, b)) + } + + t.Run("InvalidOp", func(t *testing.T) { + f := newFilter("Main", "Rating", "5", netmap.EQ) + c := NewContext(new(Netmap)) + p := newPlacementPolicy(1, nil, nil, []*netmap.Filter{f}) + require.NoError(t, c.processFilters(p)) + + // just for the coverage + f.SetOp(netmap.UnspecifiedOperation) + require.False(t, c.match(f, b)) + }) +} + +func TestFilter_Match(t *testing.T) { + fs := []*netmap.Filter{ + newFilter("StorageSSD", "Storage", "SSD", netmap.EQ), + newFilter("GoodRating", "Rating", "4", netmap.GE), + newFilter("Main", "", "", netmap.AND, + newFilter("StorageSSD", "", "", 0), + newFilter("", "IntField", "123", netmap.LT), + newFilter("GoodRating", "", "", 0), + newFilter("", "", "", netmap.OR, + newFilter("", "Param", "Value1", netmap.EQ), + newFilter("", "Param", "Value2", netmap.EQ), + )), + } + c := NewContext(new(Netmap)) + p := newPlacementPolicy(1, nil, nil, fs) + require.NoError(t, c.processFilters(p)) + + t.Run("Good", func(t *testing.T) { + n := getTestNode("Storage", "SSD", "Rating", "10", "IntField", "100", "Param", "Value1") + require.True(t, c.applyFilter("Main", n)) + }) + t.Run("InvalidStorage", func(t *testing.T) { + n := getTestNode("Storage", "HDD", "Rating", "10", "IntField", "100", "Param", "Value1") + require.False(t, c.applyFilter("Main", n)) + }) + t.Run("InvalidRating", func(t *testing.T) { + n := getTestNode("Storage", "SSD", "Rating", "3", "IntField", "100", "Param", "Value1") + require.False(t, c.applyFilter("Main", n)) + }) + t.Run("InvalidIntField", func(t *testing.T) { + n := getTestNode("Storage", "SSD", "Rating", "3", "IntField", "str", "Param", "Value1") + require.False(t, c.applyFilter("Main", n)) + }) + t.Run("InvalidParam", func(t *testing.T) { + n := getTestNode("Storage", "SSD", "Rating", "3", "IntField", "100", "Param", "NotValue") + require.False(t, c.applyFilter("Main", n)) + }) +} diff --git a/pkg/netmap/helper_test.go b/pkg/netmap/helper_test.go new file mode 100644 index 0000000..bcf9da5 --- /dev/null +++ b/pkg/netmap/helper_test.go @@ -0,0 +1,61 @@ +package netmap + +import ( + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +func newFilter(name string, k, v string, op netmap.Operation, fs ...*netmap.Filter) *netmap.Filter { + f := new(netmap.Filter) + f.SetName(name) + f.SetKey(k) + f.SetOp(op) + f.SetValue(v) + f.SetFilters(fs) + return f +} + +func newSelector(name string, attr string, c netmap.Clause, count uint32, filter string) *netmap.Selector { + s := new(netmap.Selector) + s.SetName(name) + s.SetAttribute(attr) + s.SetCount(count) + s.SetClause(c) + s.SetFilter(filter) + return s +} + +func newPlacementPolicy(bf uint32, rs []*netmap.Replica, ss []*netmap.Selector, fs []*netmap.Filter) *netmap.PlacementPolicy { + p := new(netmap.PlacementPolicy) + p.SetContainerBackupFactor(bf) + p.SetReplicas(rs) + p.SetSelectors(ss) + p.SetFilters(fs) + return p +} + +func newReplica(c uint32, s string) *netmap.Replica { + r := new(netmap.Replica) + r.SetCount(c) + r.SetSelector(s) + return r +} + +func nodeInfoFromAttributes(props ...string) netmap.NodeInfo { + attrs := make([]*netmap.Attribute, len(props)/2) + for i := range attrs { + attrs[i] = new(netmap.Attribute) + attrs[i].SetKey(props[i*2]) + attrs[i].SetValue(props[i*2+1]) + } + var n netmap.NodeInfo + n.SetAttributes(attrs) + return n +} + +func getTestNode(props ...string) *Node { + m := make(map[string]string, len(props)/2) + for i := 0; i < len(props); i += 2 { + m[props[i]] = props[i+1] + } + return &Node{AttrMap: m} +} diff --git a/pkg/netmap/netmap.go b/pkg/netmap/netmap.go new file mode 100644 index 0000000..b886999 --- /dev/null +++ b/pkg/netmap/netmap.go @@ -0,0 +1,73 @@ +package netmap + +import ( + "fmt" + + "github.com/nspcc-dev/hrw" + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +// Netmap represents netmap which contains preprocessed nodes. +type Netmap struct { + Nodes Nodes +} + +// NewNetmap constructs netmap from the list of raw nodes. +func NewNetmap(nodes Nodes) (*Netmap, error) { + return &Netmap{ + Nodes: nodes, + }, nil +} + +func flattenNodes(ns []Nodes) Nodes { + result := make(Nodes, 0, len(ns)) + for i := range ns { + result = append(result, ns[i]...) + } + return result +} + +// GetPlacementVectors returns placement vectors for an object given containerNodes cnt. +func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes, error) { + h := hrw.Hash(pivot) + wf := GetDefaultWeightFunc(m.Nodes) + result := make([]Nodes, len(cnt.Replicas())) + for i, rep := range cnt.Replicas() { + result[i] = make(Nodes, len(rep)) + copy(result[i], rep) + hrw.SortSliceByWeightValue(result[i], result[i].Weights(wf), h) + } + return result, nil +} + +// GetContainerNodes returns nodes corresponding to each replica. +// Order of returned nodes corresponds to order of replicas in p. +// pivot is a seed for HRW sorting. +func (m *Netmap) GetContainerNodes(p *netmap.PlacementPolicy, pivot []byte) (ContainerNodes, error) { + c := NewContext(m) + c.setPivot(pivot) + if err := c.processFilters(p); err != nil { + return nil, err + } + if err := c.processSelectors(p); err != nil { + return nil, err + } + result := make([]Nodes, len(p.GetReplicas())) + for i, r := range p.GetReplicas() { + if r == nil { + return nil, fmt.Errorf("%w: REPLICA", ErrMissingField) + } + if r.GetSelector() == "" { + for _, s := range p.GetSelectors() { + result[i] = append(result[i], flattenNodes(c.Selections[s.GetName()])...) + } + } + nodes, ok := c.Selections[r.GetSelector()] + if !ok { + return nil, fmt.Errorf("%w: REPLICA '%s'", ErrSelectorNotFound, r.GetSelector()) + } + result[i] = append(result[i], flattenNodes(nodes)...) + + } + return containerNodes(result), nil +} diff --git a/pkg/netmap/node_info.go b/pkg/netmap/node_info.go new file mode 100644 index 0000000..4634c8e --- /dev/null +++ b/pkg/netmap/node_info.go @@ -0,0 +1,89 @@ +package netmap + +import ( + "strconv" + + "github.com/nspcc-dev/hrw" + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +type ( + // Node is a wrapper over NodeInfo. + Node struct { + ID uint64 + Index int + Capacity uint64 + Price uint64 + AttrMap map[string]string + + InfoV2 *netmap.NodeInfo + } + + // Nodes represents slice of graph leafs. + Nodes []*Node +) + +// Enumeration of well-known attributes. +const ( + CapacityAttr = "Capacity" + PriceAttr = "Price" +) + +var _ hrw.Hasher = (*Node)(nil) + +// Hash is a function from hrw.Hasher interface. It is implemented +// to support weighted hrw therefore sort function sorts nodes +// based on their `N` value. +func (n Node) Hash() uint64 { + return n.ID +} + +// NodesFromV2 converts slice of v2 netmap.NodeInfo to a generic node slice. +func NodesFromV2(infos []netmap.NodeInfo) Nodes { + nodes := make(Nodes, len(infos)) + for i := range infos { + nodes[i] = newNodeV2(i, &infos[i]) + } + return nodes +} + +func newNodeV2(index int, ni *netmap.NodeInfo) *Node { + n := &Node{ + ID: hrw.Hash(ni.GetPublicKey()), + Index: index, + AttrMap: make(map[string]string, len(ni.GetAttributes())), + InfoV2: ni, + } + for _, attr := range ni.GetAttributes() { + switch attr.GetKey() { + case CapacityAttr: + n.Capacity, _ = strconv.ParseUint(attr.GetValue(), 10, 64) + case PriceAttr: + n.Price, _ = strconv.ParseUint(attr.GetValue(), 10, 64) + } + n.AttrMap[attr.GetKey()] = attr.GetValue() + } + return n +} + +// Weights returns slice of nodes weights W. +func (n Nodes) Weights(wf weightFunc) []float64 { + w := make([]float64, 0, len(n)) + for i := range n { + w = append(w, wf(n[i])) + } + return w +} + +// Attribute returns value of attribute k. +func (n *Node) Attribute(k string) string { + return n.AttrMap[k] +} + +// GetBucketWeight computes weight for a Bucket. +func GetBucketWeight(ns Nodes, a aggregator, wf weightFunc) float64 { + for i := range ns { + a.Add(wf(ns[i])) + } + return a.Compute() +} diff --git a/pkg/netmap/selector.go b/pkg/netmap/selector.go new file mode 100644 index 0000000..73a0669 --- /dev/null +++ b/pkg/netmap/selector.go @@ -0,0 +1,98 @@ +package netmap + +import ( + "fmt" + "sort" + + "github.com/nspcc-dev/hrw" + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +// processSelectors processes selectors and returns error is any of them is invalid. +func (c *Context) processSelectors(p *netmap.PlacementPolicy) error { + for _, s := range p.GetSelectors() { + if s == nil { + return fmt.Errorf("%w: SELECT", ErrMissingField) + } else if s.GetFilter() != MainFilterName { + _, ok := c.Filters[s.GetFilter()] + if !ok { + return fmt.Errorf("%w: SELECT FROM '%s'", ErrFilterNotFound, s.GetFilter()) + } + } + c.Selectors[s.GetName()] = s + result, err := c.getSelection(p, s) + if err != nil { + return err + } + c.Selections[s.GetName()] = result + } + return nil +} + +// GetNodesCount returns amount of buckets and nodes in every bucket +// for a given placement policy. +func GetNodesCount(p *netmap.PlacementPolicy, s *netmap.Selector) (int, int) { + switch s.GetClause() { + case netmap.Same: + return 1, int(p.GetContainerBackupFactor() * s.GetCount()) + default: + return int(s.GetCount()), int(p.GetContainerBackupFactor()) + } +} + +// getSelection returns nodes grouped by s.attribute. +func (c *Context) getSelection(p *netmap.PlacementPolicy, s *netmap.Selector) ([]Nodes, error) { + bucketCount, nodesInBucket := GetNodesCount(p, s) + m := c.getSelectionBase(s) + if len(m) < bucketCount { + return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.GetName()) + } + + keys := make(sort.StringSlice, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + if len(c.pivot) == 0 { + // deterministic order in case of zero seed + keys.Sort() + } + + nodes := make([]Nodes, 0, len(m)) + for i := range keys { + ns := m[keys[i]] + if len(ns) >= nodesInBucket { + nodes = append(nodes, ns[:nodesInBucket]) + } + } + if len(nodes) < bucketCount { + return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.GetName()) + } + if len(c.pivot) != 0 { + weights := make([]float64, len(nodes)) + for i := range nodes { + weights[i] = GetBucketWeight(nodes[i], c.aggregator(), c.weightFunc) + } + hrw.SortSliceByWeightIndex(nodes, weights, c.pivotHash) + } + return nodes[:bucketCount], nil +} + +// getSelectionBase returns nodes grouped by selector attribute. +func (c *Context) getSelectionBase(s *netmap.Selector) map[string]Nodes { + f := c.Filters[s.GetFilter()] + isMain := s.GetFilter() == MainFilterName + result := map[string]Nodes{} + for i := range c.Netmap.Nodes { + if isMain || c.match(f, c.Netmap.Nodes[i]) { + v := c.Netmap.Nodes[i].Attribute(s.GetAttribute()) + result[v] = append(result[v], c.Netmap.Nodes[i]) + } + } + + if len(c.pivot) != 0 { + for _, ns := range result { + hrw.SortSliceByWeightValue(ns, ns.Weights(c.weightFunc), c.pivotHash) + } + } + return result +} diff --git a/pkg/netmap/selector_test.go b/pkg/netmap/selector_test.go new file mode 100644 index 0000000..79f363e --- /dev/null +++ b/pkg/netmap/selector_test.go @@ -0,0 +1,218 @@ +package netmap + +import ( + "errors" + "fmt" + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap" + "github.com/stretchr/testify/require" +) + +func TestPlacementPolicy_GetPlacementVectors(t *testing.T) { + p := newPlacementPolicy(2, + []*netmap.Replica{ + newReplica(1, "SPB"), + newReplica(2, "Americas"), + }, + []*netmap.Selector{ + newSelector("SPB", "City", netmap.Same, 1, "SPBSSD"), + newSelector("Americas", "City", netmap.Distinct, 2, "Americas"), + }, + []*netmap.Filter{ + newFilter("SPBSSD", "", "", netmap.AND, + newFilter("", "Country", "RU", netmap.EQ), + newFilter("", "City", "St.Petersburg", netmap.EQ), + newFilter("", "SSD", "1", netmap.EQ)), + newFilter("Americas", "", "", netmap.OR, + newFilter("", "Continent", "NA", netmap.EQ), + newFilter("", "Continent", "SA", netmap.EQ)), + }) + nodes := []netmap.NodeInfo{ + nodeInfoFromAttributes("ID", "1", "Country", "RU", "City", "St.Petersburg", "SSD", "0"), + nodeInfoFromAttributes("ID", "2", "Country", "RU", "City", "St.Petersburg", "SSD", "1"), + nodeInfoFromAttributes("ID", "3", "Country", "RU", "City", "Moscow", "SSD", "1"), + nodeInfoFromAttributes("ID", "4", "Country", "RU", "City", "Moscow", "SSD", "1"), + nodeInfoFromAttributes("ID", "5", "Country", "RU", "City", "St.Petersburg", "SSD", "1"), + nodeInfoFromAttributes("ID", "6", "Continent", "NA", "City", "NewYork"), + nodeInfoFromAttributes("ID", "7", "Continent", "AF", "City", "Cairo"), + nodeInfoFromAttributes("ID", "8", "Continent", "AF", "City", "Cairo"), + nodeInfoFromAttributes("ID", "9", "Continent", "SA", "City", "Lima"), + nodeInfoFromAttributes("ID", "10", "Continent", "AF", "City", "Cairo"), + nodeInfoFromAttributes("ID", "11", "Continent", "NA", "City", "NewYork"), + nodeInfoFromAttributes("ID", "12", "Continent", "NA", "City", "LosAngeles"), + nodeInfoFromAttributes("ID", "13", "Continent", "SA", "City", "Lima"), + } + + nm, err := NewNetmap(NodesFromV2(nodes)) + require.NoError(t, err) + v, err := nm.GetContainerNodes(p, nil) + require.NoError(t, err) + require.Equal(t, 2, len(v.Replicas())) + require.Equal(t, 6, len(v.Flatten())) + + require.Equal(t, 2, len(v.Replicas()[0])) + ids := map[string]struct{}{} + for _, ni := range v.Replicas()[0] { + require.Equal(t, "RU", ni.Attribute("Country")) + require.Equal(t, "St.Petersburg", ni.Attribute("City")) + require.Equal(t, "1", ni.Attribute("SSD")) + ids[ni.Attribute("ID")] = struct{}{} + } + require.Equal(t, len(v.Replicas()[0]), len(ids), "not all nodes we distinct") + + require.Equal(t, 4, len(v.Replicas()[1])) // 2 cities * 2 HRWB + ids = map[string]struct{}{} + for _, ni := range v.Replicas()[1] { + require.Contains(t, []string{"NA", "SA"}, ni.Attribute("Continent")) + ids[ni.Attribute("ID")] = struct{}{} + } + require.Equal(t, len(v.Replicas()[1]), len(ids), "not all nodes we distinct") +} + +func TestPlacementPolicy_ProcessSelectors(t *testing.T) { + p := newPlacementPolicy(2, nil, + []*netmap.Selector{ + newSelector("SameRU", "City", netmap.Same, 2, "FromRU"), + newSelector("DistinctRU", "City", netmap.Distinct, 2, "FromRU"), + newSelector("Good", "Country", netmap.Distinct, 2, "Good"), + newSelector("Main", "Country", netmap.Distinct, 3, "*"), + }, + []*netmap.Filter{ + newFilter("FromRU", "Country", "Russia", netmap.EQ), + newFilter("Good", "Rating", "4", netmap.GE), + }) + nodes := []netmap.NodeInfo{ + nodeInfoFromAttributes("Country", "Russia", "Rating", "1", "City", "SPB"), + nodeInfoFromAttributes("Country", "Germany", "Rating", "5", "City", "Berlin"), + nodeInfoFromAttributes("Country", "Russia", "Rating", "6", "City", "Moscow"), + nodeInfoFromAttributes("Country", "France", "Rating", "4", "City", "Paris"), + nodeInfoFromAttributes("Country", "France", "Rating", "1", "City", "Lyon"), + nodeInfoFromAttributes("Country", "Russia", "Rating", "5", "City", "SPB"), + nodeInfoFromAttributes("Country", "Russia", "Rating", "7", "City", "Moscow"), + nodeInfoFromAttributes("Country", "Germany", "Rating", "3", "City", "Darmstadt"), + nodeInfoFromAttributes("Country", "Germany", "Rating", "7", "City", "Frankfurt"), + nodeInfoFromAttributes("Country", "Russia", "Rating", "9", "City", "SPB"), + nodeInfoFromAttributes("Country", "Russia", "Rating", "9", "City", "SPB"), + } + + nm, err := NewNetmap(NodesFromV2(nodes)) + require.NoError(t, err) + c := NewContext(nm) + require.NoError(t, c.processFilters(p)) + require.NoError(t, c.processSelectors(p)) + + for _, s := range p.GetSelectors() { + sel := c.Selections[s.GetName()] + s := c.Selectors[s.GetName()] + bucketCount, nodesInBucket := GetNodesCount(p, s) + targ := fmt.Sprintf("selector '%s'", s.GetName()) + require.Equal(t, bucketCount, len(sel), targ) + for _, res := range sel { + require.Equal(t, nodesInBucket, len(res), targ) + for j := range res { + require.True(t, c.applyFilter(s.GetFilter(), res[j]), targ) + } + } + } + +} + +func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) { + p := newPlacementPolicy(1, nil, + []*netmap.Selector{ + newSelector("Main", "Country", netmap.Distinct, 3, "*"), + }, nil) + + // bucket weight order: RU > DE > FR + nodes := []netmap.NodeInfo{ + nodeInfoFromAttributes("Country", "Germany", PriceAttr, "2", CapacityAttr, "10000"), + nodeInfoFromAttributes("Country", "Germany", PriceAttr, "4", CapacityAttr, "1"), + nodeInfoFromAttributes("Country", "France", PriceAttr, "3", CapacityAttr, "10"), + nodeInfoFromAttributes("Country", "Russia", PriceAttr, "2", CapacityAttr, "10000"), + nodeInfoFromAttributes("Country", "Russia", PriceAttr, "1", CapacityAttr, "10000"), + nodeInfoFromAttributes("Country", "Russia", CapacityAttr, "10000"), + nodeInfoFromAttributes("Country", "France", PriceAttr, "100", CapacityAttr, "1"), + nodeInfoFromAttributes("Country", "France", PriceAttr, "7", CapacityAttr, "10000"), + nodeInfoFromAttributes("Country", "Russia", PriceAttr, "2", CapacityAttr, "1"), + } + nm, err := NewNetmap(NodesFromV2(nodes)) + require.NoError(t, err) + c := NewContext(nm) + c.setPivot([]byte("containerID")) + c.weightFunc = newWeightFunc(newMaxNorm(10000), newReverseMinNorm(1)) + c.aggregator = newMaxAgg + require.NoError(t, c.processFilters(p)) + require.NoError(t, c.processSelectors(p)) + + cnt := c.Selections["Main"] + expected := []Nodes{ + {{Index: 4, Capacity: 10000, Price: 1}}, // best RU + {{Index: 0, Capacity: 10000, Price: 2}}, // best DE + {{Index: 7, Capacity: 10000, Price: 7}}, // best FR + } + require.Equal(t, len(expected), len(cnt)) + for i := range expected { + require.Equal(t, len(expected[i]), len(cnt[i])) + require.Equal(t, expected[i][0].Index, cnt[i][0].Index) + require.Equal(t, expected[i][0].Capacity, cnt[i][0].Capacity) + require.Equal(t, expected[i][0].Price, cnt[i][0].Price) + } + + res, err := nm.GetPlacementVectors(containerNodes(cnt), []byte("objectID")) + require.NoError(t, err) + require.Equal(t, res, cnt) +} + +func TestPlacementPolicy_ProcessSelectorsInvalid(t *testing.T) { + testCases := []struct { + name string + p *netmap.PlacementPolicy + err error + }{ + { + "MissingSelector", + newPlacementPolicy(2, nil, + []*netmap.Selector{nil}, + []*netmap.Filter{}), + ErrMissingField, + }, + { + "InvalidFilterReference", + newPlacementPolicy(1, nil, + []*netmap.Selector{newSelector("MyStore", "Country", netmap.Distinct, 1, "FromNL")}, + []*netmap.Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}), + ErrFilterNotFound, + }, + { + "NotEnoughNodes (backup factor)", + newPlacementPolicy(2, nil, + []*netmap.Selector{newSelector("MyStore", "Country", netmap.Distinct, 1, "FromRU")}, + []*netmap.Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}), + ErrNotEnoughNodes, + }, + { + "NotEnoughNodes (buckets)", + newPlacementPolicy(1, nil, + []*netmap.Selector{newSelector("MyStore", "Country", netmap.Distinct, 2, "FromRU")}, + []*netmap.Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}), + ErrNotEnoughNodes, + }, + } + nodes := []netmap.NodeInfo{ + nodeInfoFromAttributes("Country", "Russia"), + nodeInfoFromAttributes("Country", "Germany"), + nodeInfoFromAttributes(), + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + nm, err := NewNetmap(NodesFromV2(nodes)) + require.NoError(t, err) + c := NewContext(nm) + require.NoError(t, c.processFilters(tc.p)) + + err = c.processSelectors(tc.p) + require.True(t, errors.Is(err, tc.err), "got: %v", err) + }) + } +}