forked from TrueCloudLab/frostfs-api-go
[#137] sdk: Implement netmap filtering and selection
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
efbf73b775
commit
2026473733
13 changed files with 1229 additions and 0 deletions
1
go.mod
1
go.mod
|
@ -6,6 +6,7 @@ require (
|
|||
github.com/golang/protobuf v1.4.2
|
||||
github.com/google/uuid v1.1.1
|
||||
github.com/mr-tron/base58 v1.1.2
|
||||
github.com/nspcc-dev/hrw v1.0.9
|
||||
github.com/nspcc-dev/neo-go v0.91.0
|
||||
github.com/nspcc-dev/neofs-crypto v0.3.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
|
|
3
go.sum
3
go.sum
|
@ -135,6 +135,8 @@ github.com/nspcc-dev/dbft v0.0.0-20191209120240-0d6b7568d9ae/go.mod h1:3FjXOoHmA
|
|||
github.com/nspcc-dev/dbft v0.0.0-20200117124306-478e5cfbf03a/go.mod h1:/YFK+XOxxg0Bfm6P92lY5eDSLYfp06XOdL8KAVgXjVk=
|
||||
github.com/nspcc-dev/dbft v0.0.0-20200219114139-199d286ed6c1/go.mod h1:O0qtn62prQSqizzoagHmuuKoz8QMkU3SzBoKdEvm3aQ=
|
||||
github.com/nspcc-dev/dbft v0.0.0-20200711144034-c526ccc6f570/go.mod h1:1FYQXSbb6/9HQIkoF8XO7W/S8N7AZRkBsgwbcXRvk0E=
|
||||
github.com/nspcc-dev/hrw v1.0.9 h1:17VcAuTtrstmFppBjfRiia4K2wA/ukXZhLFS8Y8rz5Y=
|
||||
github.com/nspcc-dev/hrw v1.0.9/go.mod h1:l/W2vx83vMQo6aStyx2AuZrJ+07lGv2JQGlVkPG06MU=
|
||||
github.com/nspcc-dev/neo-go v0.73.1-pre.0.20200303142215-f5a1b928ce09/go.mod h1:pPYwPZ2ks+uMnlRLUyXOpLieaDQSEaf4NM3zHVbRjmg=
|
||||
github.com/nspcc-dev/neo-go v0.91.0 h1:KKOPMKs0fm8JIau1SuwxiLdrZ+1kDPBiVRlWwzfebWE=
|
||||
github.com/nspcc-dev/neo-go v0.91.0/go.mod h1:G6HdOWvzQ6tlvFdvFSN/PgCzLPN/X/X4d5hTjFRUDcc=
|
||||
|
@ -175,6 +177,7 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR
|
|||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
|
||||
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
|
||||
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
|
||||
|
|
243
pkg/netmap/aggregator.go
Normal file
243
pkg/netmap/aggregator.go
Normal file
|
@ -0,0 +1,243 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"sort"
|
||||
)
|
||||
|
||||
type (
|
||||
// aggregator can calculate some value across all netmap
|
||||
// such as median, minimum or maximum.
|
||||
aggregator interface {
|
||||
Add(float64)
|
||||
Compute() float64
|
||||
}
|
||||
|
||||
// normalizer normalizes weight.
|
||||
normalizer interface {
|
||||
Normalize(w float64) float64
|
||||
}
|
||||
|
||||
meanSumAgg struct {
|
||||
sum float64
|
||||
count int
|
||||
}
|
||||
|
||||
meanAgg struct {
|
||||
mean float64
|
||||
count int
|
||||
}
|
||||
|
||||
minAgg struct {
|
||||
min float64
|
||||
}
|
||||
|
||||
maxAgg struct {
|
||||
max float64
|
||||
}
|
||||
|
||||
meanIQRAgg struct {
|
||||
k float64
|
||||
arr []float64
|
||||
}
|
||||
|
||||
reverseMinNorm struct {
|
||||
min float64
|
||||
}
|
||||
|
||||
maxNorm struct {
|
||||
max float64
|
||||
}
|
||||
|
||||
sigmoidNorm struct {
|
||||
scale float64
|
||||
}
|
||||
|
||||
constNorm struct {
|
||||
value float64
|
||||
}
|
||||
|
||||
// weightFunc calculates n's weight.
|
||||
weightFunc = func(n *Node) float64
|
||||
)
|
||||
|
||||
var (
|
||||
_ aggregator = (*meanSumAgg)(nil)
|
||||
_ aggregator = (*meanAgg)(nil)
|
||||
_ aggregator = (*minAgg)(nil)
|
||||
_ aggregator = (*maxAgg)(nil)
|
||||
_ aggregator = (*meanIQRAgg)(nil)
|
||||
|
||||
_ normalizer = (*reverseMinNorm)(nil)
|
||||
_ normalizer = (*maxNorm)(nil)
|
||||
_ normalizer = (*sigmoidNorm)(nil)
|
||||
_ normalizer = (*constNorm)(nil)
|
||||
)
|
||||
|
||||
// capWeightFunc calculates weight which is equal to capacity.
|
||||
func capWeightFunc(n *Node) float64 { return float64(n.Capacity) }
|
||||
|
||||
// priceWeightFunc calculates weight which is equal to price.
|
||||
func priceWeightFunc(n *Node) float64 { return float64(n.Price) }
|
||||
|
||||
// newWeightFunc returns weightFunc which multiplies normalized
|
||||
// capacity and price.
|
||||
func newWeightFunc(capNorm, priceNorm normalizer) weightFunc {
|
||||
return func(n *Node) float64 {
|
||||
return capNorm.Normalize(float64(n.Capacity)) * priceNorm.Normalize(float64(n.Price))
|
||||
}
|
||||
}
|
||||
|
||||
// newMeanSumAgg returns an aggregator which
|
||||
// computes mean value by keeping total sum.
|
||||
func newMeanSumAgg() aggregator {
|
||||
return new(meanSumAgg)
|
||||
}
|
||||
|
||||
// newMeanAgg returns an aggregator which
|
||||
// computes mean value by recalculating it on
|
||||
// every addition.
|
||||
func newMeanAgg() aggregator {
|
||||
return new(meanAgg)
|
||||
}
|
||||
|
||||
// newMinAgg returns an aggregator which
|
||||
// computes min value.
|
||||
func newMinAgg() aggregator {
|
||||
return new(minAgg)
|
||||
}
|
||||
|
||||
// newMaxAgg returns an aggregator which
|
||||
// computes max value.
|
||||
func newMaxAgg() aggregator {
|
||||
return new(maxAgg)
|
||||
}
|
||||
|
||||
// newMeanIQRAgg returns an aggregator which
|
||||
// computes mean value of values from IQR interval.
|
||||
func newMeanIQRAgg() aggregator {
|
||||
return new(meanIQRAgg)
|
||||
}
|
||||
|
||||
// newReverseMinNorm returns a normalizer which
|
||||
// normalize values in range of 0.0 to 1.0 to a minimum value.
|
||||
func newReverseMinNorm(min float64) normalizer {
|
||||
return &reverseMinNorm{min: min}
|
||||
}
|
||||
|
||||
// newMaxNorm returns a normalizer which
|
||||
// normalize values in range of 0.0 to 1.0 to a maximum value.
|
||||
func newMaxNorm(max float64) normalizer {
|
||||
return &maxNorm{max: max}
|
||||
}
|
||||
|
||||
// newSigmoidNorm returns a normalizer which
|
||||
// normalize values in range of 0.0 to 1.0 to a scaled sigmoid.
|
||||
func newSigmoidNorm(scale float64) normalizer {
|
||||
return &sigmoidNorm{scale: scale}
|
||||
}
|
||||
|
||||
// newConstNorm returns a normalizer which
|
||||
// returns a constant values
|
||||
func newConstNorm(value float64) normalizer {
|
||||
return &constNorm{value: value}
|
||||
}
|
||||
|
||||
func (a *meanSumAgg) Add(n float64) {
|
||||
a.sum += n
|
||||
a.count++
|
||||
}
|
||||
|
||||
func (a *meanSumAgg) Compute() float64 {
|
||||
if a.count == 0 {
|
||||
return 0
|
||||
}
|
||||
return a.sum / float64(a.count)
|
||||
}
|
||||
|
||||
func (a *meanAgg) Add(n float64) {
|
||||
c := a.count + 1
|
||||
a.mean = a.mean*(float64(a.count)/float64(c)) + n/float64(c)
|
||||
a.count++
|
||||
}
|
||||
|
||||
func (a *meanAgg) Compute() float64 {
|
||||
return a.mean
|
||||
}
|
||||
|
||||
func (a *minAgg) Add(n float64) {
|
||||
if a.min == 0 || n < a.min {
|
||||
a.min = n
|
||||
}
|
||||
}
|
||||
|
||||
func (a *minAgg) Compute() float64 {
|
||||
return a.min
|
||||
}
|
||||
|
||||
func (a *maxAgg) Add(n float64) {
|
||||
if n > a.max {
|
||||
a.max = n
|
||||
}
|
||||
}
|
||||
|
||||
func (a *maxAgg) Compute() float64 {
|
||||
return a.max
|
||||
}
|
||||
|
||||
func (a *meanIQRAgg) Add(n float64) {
|
||||
a.arr = append(a.arr, n)
|
||||
}
|
||||
|
||||
func (a *meanIQRAgg) Compute() float64 {
|
||||
l := len(a.arr)
|
||||
if l == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
sort.Slice(a.arr, func(i, j int) bool { return a.arr[i] < a.arr[j] })
|
||||
|
||||
var min, max float64
|
||||
if l < 4 {
|
||||
min, max = a.arr[0], a.arr[l-1]
|
||||
} else {
|
||||
start, end := l/4, l*3/4-1
|
||||
iqr := a.k * (a.arr[end] - a.arr[start])
|
||||
min, max = a.arr[start]-iqr, a.arr[end]+iqr
|
||||
}
|
||||
|
||||
count := 0
|
||||
sum := float64(0)
|
||||
for _, e := range a.arr {
|
||||
if e >= min && e <= max {
|
||||
sum += e
|
||||
count++
|
||||
}
|
||||
}
|
||||
return sum / float64(count)
|
||||
}
|
||||
|
||||
func (r *reverseMinNorm) Normalize(w float64) float64 {
|
||||
if w == 0 {
|
||||
return 0
|
||||
}
|
||||
return r.min / w
|
||||
}
|
||||
|
||||
func (r *maxNorm) Normalize(w float64) float64 {
|
||||
if r.max == 0 {
|
||||
return 0
|
||||
}
|
||||
return w / r.max
|
||||
}
|
||||
|
||||
func (r *sigmoidNorm) Normalize(w float64) float64 {
|
||||
if r.scale == 0 {
|
||||
return 0
|
||||
}
|
||||
x := w / r.scale
|
||||
return x / (1 + x)
|
||||
}
|
||||
|
||||
func (r *constNorm) Normalize(_ float64) float64 {
|
||||
return r.value
|
||||
}
|
19
pkg/netmap/container.go
Normal file
19
pkg/netmap/container.go
Normal file
|
@ -0,0 +1,19 @@
|
|||
package netmap
|
||||
|
||||
// ContainerNodes represents nodes in the container.
|
||||
type ContainerNodes interface {
|
||||
Replicas() []Nodes
|
||||
Flatten() Nodes
|
||||
}
|
||||
|
||||
type containerNodes []Nodes
|
||||
|
||||
// Flatten returns list of all nodes from the container.
|
||||
func (c containerNodes) Flatten() Nodes {
|
||||
return flattenNodes(c)
|
||||
}
|
||||
|
||||
// Replicas return list of container replicas.
|
||||
func (c containerNodes) Replicas() []Nodes {
|
||||
return c
|
||||
}
|
81
pkg/netmap/context.go
Normal file
81
pkg/netmap/context.go
Normal file
|
@ -0,0 +1,81 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/hrw"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
|
||||
)
|
||||
|
||||
// Context contains references to named filters and cached numeric values.
|
||||
type Context struct {
|
||||
// Netmap is a netmap structure to operate on.
|
||||
Netmap *Netmap
|
||||
// Filters stores processed filters.
|
||||
Filters map[string]*netmap.Filter
|
||||
// Selectors stores processed selectors.
|
||||
Selectors map[string]*netmap.Selector
|
||||
// Selections stores result of selector processing.
|
||||
Selections map[string][]Nodes
|
||||
|
||||
// numCache stores parsed numeric values.
|
||||
numCache map[*netmap.Filter]uint64
|
||||
// pivot is a seed for HRW.
|
||||
pivot []byte
|
||||
// pivotHash is a saved HRW hash of pivot
|
||||
pivotHash uint64
|
||||
// aggregator is returns aggregator determining bucket weight.
|
||||
// By default it returns mean value from IQR interval.
|
||||
aggregator func() aggregator
|
||||
// weightFunc is a weighting function for determining node priority.
|
||||
// By default in combines favours low price and high capacity.
|
||||
weightFunc weightFunc
|
||||
}
|
||||
|
||||
// Various validation errors.
|
||||
var (
|
||||
ErrMissingField = errors.New("netmap: nil field")
|
||||
ErrInvalidFilterName = errors.New("netmap: filter name is invalid")
|
||||
ErrInvalidNumber = errors.New("netmap: number value expected")
|
||||
ErrInvalidFilterOp = errors.New("netmap: invalid filter operation")
|
||||
ErrFilterNotFound = errors.New("netmap: filter not found")
|
||||
ErrNonEmptyFilters = errors.New("netmap: simple filter must no contain sub-filters")
|
||||
ErrNotEnoughNodes = errors.New("netmap: not enough nodes to SELECT from")
|
||||
ErrSelectorNotFound = errors.New("netmap: selector not found")
|
||||
ErrUnnamedTopFilter = errors.New("netmap: all filters on top level must be named")
|
||||
)
|
||||
|
||||
// NewContext creates new context. It contains various caches.
|
||||
// In future it may create hierarchical netmap structure to work with.
|
||||
func NewContext(nm *Netmap) *Context {
|
||||
return &Context{
|
||||
Netmap: nm,
|
||||
Filters: make(map[string]*netmap.Filter),
|
||||
Selectors: make(map[string]*netmap.Selector),
|
||||
Selections: make(map[string][]Nodes),
|
||||
|
||||
numCache: make(map[*netmap.Filter]uint64),
|
||||
aggregator: newMeanIQRAgg,
|
||||
weightFunc: GetDefaultWeightFunc(nm.Nodes),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Context) setPivot(pivot []byte) {
|
||||
if len(pivot) != 0 {
|
||||
c.pivot = pivot
|
||||
c.pivotHash = hrw.Hash(pivot)
|
||||
}
|
||||
}
|
||||
|
||||
// GetDefaultWeightFunc returns default weighting function.
|
||||
func GetDefaultWeightFunc(ns Nodes) weightFunc {
|
||||
mean := newMeanAgg()
|
||||
min := newMinAgg()
|
||||
for i := range ns {
|
||||
mean.Add(float64(ns[i].Capacity))
|
||||
min.Add(float64(ns[i].Price))
|
||||
}
|
||||
return newWeightFunc(
|
||||
newSigmoidNorm(mean.Compute()),
|
||||
newReverseMinNorm(min.Compute()))
|
||||
}
|
11
pkg/netmap/doc.go
Normal file
11
pkg/netmap/doc.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
/*
|
||||
Package netmap provides routines for working with netmap and placement policy.
|
||||
Work is done in 4 steps:
|
||||
1. Create context containing results shared between steps.
|
||||
2. Processing filters.
|
||||
3. Processing selectors.
|
||||
4. Processing replicas.
|
||||
|
||||
Each step depends only on previous ones.
|
||||
*/
|
||||
package netmap
|
129
pkg/netmap/filter.go
Normal file
129
pkg/netmap/filter.go
Normal file
|
@ -0,0 +1,129 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
|
||||
)
|
||||
|
||||
// MainFilterName is a name of the filter
|
||||
// which points to the whole netmap.
|
||||
const MainFilterName = "*"
|
||||
|
||||
// applyFilter applies named filter to b.
|
||||
func (c *Context) applyFilter(name string, b *Node) bool {
|
||||
return name == MainFilterName || c.match(c.Filters[name], b)
|
||||
}
|
||||
|
||||
// processFilters processes filters and returns error is any of them is invalid.
|
||||
func (c *Context) processFilters(p *netmap.PlacementPolicy) error {
|
||||
for _, f := range p.GetFilters() {
|
||||
if err := c.processFilter(f, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Context) processFilter(f *netmap.Filter, top bool) error {
|
||||
if f == nil {
|
||||
return fmt.Errorf("%w: FILTER", ErrMissingField)
|
||||
}
|
||||
if f.GetName() == MainFilterName {
|
||||
return fmt.Errorf("%w: '*' is reserved", ErrInvalidFilterName)
|
||||
}
|
||||
if top && f.GetName() == "" {
|
||||
return ErrUnnamedTopFilter
|
||||
}
|
||||
if !top && f.GetName() != "" && c.Filters[f.GetName()] == nil {
|
||||
return fmt.Errorf("%w: '%s'", ErrFilterNotFound, f.GetName())
|
||||
}
|
||||
switch f.GetOp() {
|
||||
case netmap.AND, netmap.OR:
|
||||
for _, flt := range f.GetFilters() {
|
||||
if err := c.processFilter(flt, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
default:
|
||||
if len(f.GetFilters()) != 0 {
|
||||
return ErrNonEmptyFilters
|
||||
} else if !top && f.GetName() != "" { // named reference
|
||||
return nil
|
||||
}
|
||||
switch f.GetOp() {
|
||||
case netmap.EQ, netmap.NE:
|
||||
case netmap.GT, netmap.GE, netmap.LT, netmap.LE:
|
||||
n, err := strconv.ParseUint(f.GetValue(), 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: '%s'", ErrInvalidNumber, f.GetValue())
|
||||
}
|
||||
c.numCache[f] = n
|
||||
default:
|
||||
return fmt.Errorf("%w: %d", ErrInvalidFilterOp, f.GetOp())
|
||||
}
|
||||
}
|
||||
if top {
|
||||
c.Filters[f.GetName()] = f
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// match matches f against b. It returns no errors because
|
||||
// filter should have been parsed during context creation
|
||||
// and missing node properties are considered as a regular fail.
|
||||
func (c *Context) match(f *netmap.Filter, b *Node) bool {
|
||||
switch f.GetOp() {
|
||||
case netmap.AND, netmap.OR:
|
||||
for _, lf := range f.GetFilters() {
|
||||
if lf.GetName() != "" {
|
||||
lf = c.Filters[lf.GetName()]
|
||||
}
|
||||
ok := c.match(lf, b)
|
||||
if ok == (f.GetOp() == netmap.OR) {
|
||||
return ok
|
||||
}
|
||||
}
|
||||
return f.GetOp() == netmap.AND
|
||||
default:
|
||||
return c.matchKeyValue(f, b)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Context) matchKeyValue(f *netmap.Filter, b *Node) bool {
|
||||
switch f.GetOp() {
|
||||
case netmap.EQ:
|
||||
return b.Attribute(f.GetKey()) == f.GetValue()
|
||||
case netmap.NE:
|
||||
return b.Attribute(f.GetKey()) != f.GetValue()
|
||||
default:
|
||||
var attr uint64
|
||||
switch f.GetKey() {
|
||||
case PriceAttr:
|
||||
attr = b.Price
|
||||
case CapacityAttr:
|
||||
attr = b.Capacity
|
||||
default:
|
||||
var err error
|
||||
attr, err = strconv.ParseUint(b.Attribute(f.GetKey()), 10, 64)
|
||||
if err != nil {
|
||||
// Note: because filters are somewhat independent from nodes attributes,
|
||||
// We don't report an error here, and fail filter instead.
|
||||
return false
|
||||
}
|
||||
}
|
||||
switch f.GetOp() {
|
||||
case netmap.GT:
|
||||
return attr > c.numCache[f]
|
||||
case netmap.GE:
|
||||
return attr >= c.numCache[f]
|
||||
case netmap.LT:
|
||||
return attr < c.numCache[f]
|
||||
case netmap.LE:
|
||||
return attr <= c.numCache[f]
|
||||
}
|
||||
}
|
||||
// will not happen if context was created from f (maybe panic?)
|
||||
return false
|
||||
}
|
203
pkg/netmap/filter_test.go
Normal file
203
pkg/netmap/filter_test.go
Normal file
|
@ -0,0 +1,203 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestContext_ProcessFilters(t *testing.T) {
|
||||
fs := []*netmap.Filter{
|
||||
newFilter("StorageSSD", "Storage", "SSD", netmap.EQ),
|
||||
newFilter("GoodRating", "Rating", "4", netmap.GE),
|
||||
newFilter("Main", "", "", netmap.AND,
|
||||
newFilter("StorageSSD", "", "", 0),
|
||||
newFilter("", "IntField", "123", netmap.LT),
|
||||
newFilter("GoodRating", "", "", 0)),
|
||||
}
|
||||
nm, err := NewNetmap(nil)
|
||||
require.NoError(t, err)
|
||||
c := NewContext(nm)
|
||||
p := newPlacementPolicy(1, nil, nil, fs)
|
||||
require.NoError(t, c.processFilters(p))
|
||||
require.Equal(t, 3, len(c.Filters))
|
||||
for _, f := range fs {
|
||||
require.Equal(t, f, c.Filters[f.GetName()])
|
||||
}
|
||||
|
||||
require.Equal(t, uint64(4), c.numCache[fs[1]])
|
||||
require.Equal(t, uint64(123), c.numCache[fs[2].GetFilters()[1]])
|
||||
}
|
||||
|
||||
func TestContext_ProcessFiltersInvalid(t *testing.T) {
|
||||
errTestCases := []struct {
|
||||
name string
|
||||
filter *netmap.Filter
|
||||
err error
|
||||
}{
|
||||
{
|
||||
"UnnamedTop",
|
||||
newFilter("", "Storage", "SSD", netmap.EQ),
|
||||
ErrUnnamedTopFilter,
|
||||
},
|
||||
{
|
||||
"InvalidReference",
|
||||
newFilter("Main", "", "", netmap.AND,
|
||||
newFilter("StorageSSD", "", "", 0)),
|
||||
ErrFilterNotFound,
|
||||
},
|
||||
{
|
||||
"NonEmptyKeyed",
|
||||
newFilter("Main", "Storage", "SSD", netmap.EQ,
|
||||
newFilter("StorageSSD", "", "", 0)),
|
||||
ErrNonEmptyFilters,
|
||||
},
|
||||
{
|
||||
"InvalidNumber",
|
||||
newFilter("Main", "Rating", "three", netmap.GE),
|
||||
ErrInvalidNumber,
|
||||
},
|
||||
{
|
||||
"InvalidOp",
|
||||
newFilter("Main", "Rating", "3", netmap.UnspecifiedOperation),
|
||||
ErrInvalidFilterOp,
|
||||
},
|
||||
{
|
||||
"InvalidName",
|
||||
newFilter("*", "Rating", "3", netmap.GE),
|
||||
ErrInvalidFilterName,
|
||||
},
|
||||
{
|
||||
"MissingFilter",
|
||||
nil,
|
||||
ErrMissingField,
|
||||
},
|
||||
}
|
||||
for _, tc := range errTestCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
c := NewContext(new(Netmap))
|
||||
p := newPlacementPolicy(1, nil, nil, []*netmap.Filter{tc.filter})
|
||||
err := c.processFilters(p)
|
||||
require.True(t, errors.Is(err, tc.err), "got: %v", err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilter_MatchSimple(t *testing.T) {
|
||||
b := &Node{AttrMap: map[string]string{
|
||||
"Rating": "4",
|
||||
"Country": "Germany",
|
||||
}}
|
||||
testCases := []struct {
|
||||
name string
|
||||
ok bool
|
||||
f *netmap.Filter
|
||||
}{
|
||||
{
|
||||
"GE_true", true,
|
||||
newFilter("Main", "Rating", "4", netmap.GE),
|
||||
},
|
||||
{
|
||||
"GE_false", false,
|
||||
newFilter("Main", "Rating", "5", netmap.GE),
|
||||
},
|
||||
{
|
||||
"GT_true", true,
|
||||
newFilter("Main", "Rating", "3", netmap.GT),
|
||||
},
|
||||
{
|
||||
"GT_false", false,
|
||||
newFilter("Main", "Rating", "4", netmap.GT),
|
||||
},
|
||||
{
|
||||
"LE_true", true,
|
||||
newFilter("Main", "Rating", "4", netmap.LE),
|
||||
},
|
||||
{
|
||||
"LE_false", false,
|
||||
newFilter("Main", "Rating", "3", netmap.LE),
|
||||
},
|
||||
{
|
||||
"LT_true", true,
|
||||
newFilter("Main", "Rating", "5", netmap.LT),
|
||||
},
|
||||
{
|
||||
"LT_false", false,
|
||||
newFilter("Main", "Rating", "4", netmap.LT),
|
||||
},
|
||||
{
|
||||
"EQ_true", true,
|
||||
newFilter("Main", "Country", "Germany", netmap.EQ),
|
||||
},
|
||||
{
|
||||
"EQ_false", false,
|
||||
newFilter("Main", "Country", "China", netmap.EQ),
|
||||
},
|
||||
{
|
||||
"NE_true", true,
|
||||
newFilter("Main", "Country", "France", netmap.NE),
|
||||
},
|
||||
{
|
||||
"NE_false", false,
|
||||
newFilter("Main", "Country", "Germany", netmap.NE),
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
c := NewContext(new(Netmap))
|
||||
p := newPlacementPolicy(1, nil, nil, []*netmap.Filter{tc.f})
|
||||
require.NoError(t, c.processFilters(p))
|
||||
require.Equal(t, tc.ok, c.match(tc.f, b))
|
||||
}
|
||||
|
||||
t.Run("InvalidOp", func(t *testing.T) {
|
||||
f := newFilter("Main", "Rating", "5", netmap.EQ)
|
||||
c := NewContext(new(Netmap))
|
||||
p := newPlacementPolicy(1, nil, nil, []*netmap.Filter{f})
|
||||
require.NoError(t, c.processFilters(p))
|
||||
|
||||
// just for the coverage
|
||||
f.SetOp(netmap.UnspecifiedOperation)
|
||||
require.False(t, c.match(f, b))
|
||||
})
|
||||
}
|
||||
|
||||
func TestFilter_Match(t *testing.T) {
|
||||
fs := []*netmap.Filter{
|
||||
newFilter("StorageSSD", "Storage", "SSD", netmap.EQ),
|
||||
newFilter("GoodRating", "Rating", "4", netmap.GE),
|
||||
newFilter("Main", "", "", netmap.AND,
|
||||
newFilter("StorageSSD", "", "", 0),
|
||||
newFilter("", "IntField", "123", netmap.LT),
|
||||
newFilter("GoodRating", "", "", 0),
|
||||
newFilter("", "", "", netmap.OR,
|
||||
newFilter("", "Param", "Value1", netmap.EQ),
|
||||
newFilter("", "Param", "Value2", netmap.EQ),
|
||||
)),
|
||||
}
|
||||
c := NewContext(new(Netmap))
|
||||
p := newPlacementPolicy(1, nil, nil, fs)
|
||||
require.NoError(t, c.processFilters(p))
|
||||
|
||||
t.Run("Good", func(t *testing.T) {
|
||||
n := getTestNode("Storage", "SSD", "Rating", "10", "IntField", "100", "Param", "Value1")
|
||||
require.True(t, c.applyFilter("Main", n))
|
||||
})
|
||||
t.Run("InvalidStorage", func(t *testing.T) {
|
||||
n := getTestNode("Storage", "HDD", "Rating", "10", "IntField", "100", "Param", "Value1")
|
||||
require.False(t, c.applyFilter("Main", n))
|
||||
})
|
||||
t.Run("InvalidRating", func(t *testing.T) {
|
||||
n := getTestNode("Storage", "SSD", "Rating", "3", "IntField", "100", "Param", "Value1")
|
||||
require.False(t, c.applyFilter("Main", n))
|
||||
})
|
||||
t.Run("InvalidIntField", func(t *testing.T) {
|
||||
n := getTestNode("Storage", "SSD", "Rating", "3", "IntField", "str", "Param", "Value1")
|
||||
require.False(t, c.applyFilter("Main", n))
|
||||
})
|
||||
t.Run("InvalidParam", func(t *testing.T) {
|
||||
n := getTestNode("Storage", "SSD", "Rating", "3", "IntField", "100", "Param", "NotValue")
|
||||
require.False(t, c.applyFilter("Main", n))
|
||||
})
|
||||
}
|
61
pkg/netmap/helper_test.go
Normal file
61
pkg/netmap/helper_test.go
Normal file
|
@ -0,0 +1,61 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
|
||||
)
|
||||
|
||||
func newFilter(name string, k, v string, op netmap.Operation, fs ...*netmap.Filter) *netmap.Filter {
|
||||
f := new(netmap.Filter)
|
||||
f.SetName(name)
|
||||
f.SetKey(k)
|
||||
f.SetOp(op)
|
||||
f.SetValue(v)
|
||||
f.SetFilters(fs)
|
||||
return f
|
||||
}
|
||||
|
||||
func newSelector(name string, attr string, c netmap.Clause, count uint32, filter string) *netmap.Selector {
|
||||
s := new(netmap.Selector)
|
||||
s.SetName(name)
|
||||
s.SetAttribute(attr)
|
||||
s.SetCount(count)
|
||||
s.SetClause(c)
|
||||
s.SetFilter(filter)
|
||||
return s
|
||||
}
|
||||
|
||||
func newPlacementPolicy(bf uint32, rs []*netmap.Replica, ss []*netmap.Selector, fs []*netmap.Filter) *netmap.PlacementPolicy {
|
||||
p := new(netmap.PlacementPolicy)
|
||||
p.SetContainerBackupFactor(bf)
|
||||
p.SetReplicas(rs)
|
||||
p.SetSelectors(ss)
|
||||
p.SetFilters(fs)
|
||||
return p
|
||||
}
|
||||
|
||||
func newReplica(c uint32, s string) *netmap.Replica {
|
||||
r := new(netmap.Replica)
|
||||
r.SetCount(c)
|
||||
r.SetSelector(s)
|
||||
return r
|
||||
}
|
||||
|
||||
func nodeInfoFromAttributes(props ...string) netmap.NodeInfo {
|
||||
attrs := make([]*netmap.Attribute, len(props)/2)
|
||||
for i := range attrs {
|
||||
attrs[i] = new(netmap.Attribute)
|
||||
attrs[i].SetKey(props[i*2])
|
||||
attrs[i].SetValue(props[i*2+1])
|
||||
}
|
||||
var n netmap.NodeInfo
|
||||
n.SetAttributes(attrs)
|
||||
return n
|
||||
}
|
||||
|
||||
func getTestNode(props ...string) *Node {
|
||||
m := make(map[string]string, len(props)/2)
|
||||
for i := 0; i < len(props); i += 2 {
|
||||
m[props[i]] = props[i+1]
|
||||
}
|
||||
return &Node{AttrMap: m}
|
||||
}
|
73
pkg/netmap/netmap.go
Normal file
73
pkg/netmap/netmap.go
Normal file
|
@ -0,0 +1,73 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/hrw"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
|
||||
)
|
||||
|
||||
// Netmap represents netmap which contains preprocessed nodes.
|
||||
type Netmap struct {
|
||||
Nodes Nodes
|
||||
}
|
||||
|
||||
// NewNetmap constructs netmap from the list of raw nodes.
|
||||
func NewNetmap(nodes Nodes) (*Netmap, error) {
|
||||
return &Netmap{
|
||||
Nodes: nodes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func flattenNodes(ns []Nodes) Nodes {
|
||||
result := make(Nodes, 0, len(ns))
|
||||
for i := range ns {
|
||||
result = append(result, ns[i]...)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// GetPlacementVectors returns placement vectors for an object given containerNodes cnt.
|
||||
func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes, error) {
|
||||
h := hrw.Hash(pivot)
|
||||
wf := GetDefaultWeightFunc(m.Nodes)
|
||||
result := make([]Nodes, len(cnt.Replicas()))
|
||||
for i, rep := range cnt.Replicas() {
|
||||
result[i] = make(Nodes, len(rep))
|
||||
copy(result[i], rep)
|
||||
hrw.SortSliceByWeightValue(result[i], result[i].Weights(wf), h)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// GetContainerNodes returns nodes corresponding to each replica.
|
||||
// Order of returned nodes corresponds to order of replicas in p.
|
||||
// pivot is a seed for HRW sorting.
|
||||
func (m *Netmap) GetContainerNodes(p *netmap.PlacementPolicy, pivot []byte) (ContainerNodes, error) {
|
||||
c := NewContext(m)
|
||||
c.setPivot(pivot)
|
||||
if err := c.processFilters(p); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := c.processSelectors(p); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := make([]Nodes, len(p.GetReplicas()))
|
||||
for i, r := range p.GetReplicas() {
|
||||
if r == nil {
|
||||
return nil, fmt.Errorf("%w: REPLICA", ErrMissingField)
|
||||
}
|
||||
if r.GetSelector() == "" {
|
||||
for _, s := range p.GetSelectors() {
|
||||
result[i] = append(result[i], flattenNodes(c.Selections[s.GetName()])...)
|
||||
}
|
||||
}
|
||||
nodes, ok := c.Selections[r.GetSelector()]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%w: REPLICA '%s'", ErrSelectorNotFound, r.GetSelector())
|
||||
}
|
||||
result[i] = append(result[i], flattenNodes(nodes)...)
|
||||
|
||||
}
|
||||
return containerNodes(result), nil
|
||||
}
|
89
pkg/netmap/node_info.go
Normal file
89
pkg/netmap/node_info.go
Normal file
|
@ -0,0 +1,89 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/nspcc-dev/hrw"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
|
||||
)
|
||||
|
||||
type (
|
||||
// Node is a wrapper over NodeInfo.
|
||||
Node struct {
|
||||
ID uint64
|
||||
Index int
|
||||
Capacity uint64
|
||||
Price uint64
|
||||
AttrMap map[string]string
|
||||
|
||||
InfoV2 *netmap.NodeInfo
|
||||
}
|
||||
|
||||
// Nodes represents slice of graph leafs.
|
||||
Nodes []*Node
|
||||
)
|
||||
|
||||
// Enumeration of well-known attributes.
|
||||
const (
|
||||
CapacityAttr = "Capacity"
|
||||
PriceAttr = "Price"
|
||||
)
|
||||
|
||||
var _ hrw.Hasher = (*Node)(nil)
|
||||
|
||||
// Hash is a function from hrw.Hasher interface. It is implemented
|
||||
// to support weighted hrw therefore sort function sorts nodes
|
||||
// based on their `N` value.
|
||||
func (n Node) Hash() uint64 {
|
||||
return n.ID
|
||||
}
|
||||
|
||||
// NodesFromV2 converts slice of v2 netmap.NodeInfo to a generic node slice.
|
||||
func NodesFromV2(infos []netmap.NodeInfo) Nodes {
|
||||
nodes := make(Nodes, len(infos))
|
||||
for i := range infos {
|
||||
nodes[i] = newNodeV2(i, &infos[i])
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func newNodeV2(index int, ni *netmap.NodeInfo) *Node {
|
||||
n := &Node{
|
||||
ID: hrw.Hash(ni.GetPublicKey()),
|
||||
Index: index,
|
||||
AttrMap: make(map[string]string, len(ni.GetAttributes())),
|
||||
InfoV2: ni,
|
||||
}
|
||||
for _, attr := range ni.GetAttributes() {
|
||||
switch attr.GetKey() {
|
||||
case CapacityAttr:
|
||||
n.Capacity, _ = strconv.ParseUint(attr.GetValue(), 10, 64)
|
||||
case PriceAttr:
|
||||
n.Price, _ = strconv.ParseUint(attr.GetValue(), 10, 64)
|
||||
}
|
||||
n.AttrMap[attr.GetKey()] = attr.GetValue()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// Weights returns slice of nodes weights W.
|
||||
func (n Nodes) Weights(wf weightFunc) []float64 {
|
||||
w := make([]float64, 0, len(n))
|
||||
for i := range n {
|
||||
w = append(w, wf(n[i]))
|
||||
}
|
||||
return w
|
||||
}
|
||||
|
||||
// Attribute returns value of attribute k.
|
||||
func (n *Node) Attribute(k string) string {
|
||||
return n.AttrMap[k]
|
||||
}
|
||||
|
||||
// GetBucketWeight computes weight for a Bucket.
|
||||
func GetBucketWeight(ns Nodes, a aggregator, wf weightFunc) float64 {
|
||||
for i := range ns {
|
||||
a.Add(wf(ns[i]))
|
||||
}
|
||||
return a.Compute()
|
||||
}
|
98
pkg/netmap/selector.go
Normal file
98
pkg/netmap/selector.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/nspcc-dev/hrw"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
|
||||
)
|
||||
|
||||
// processSelectors processes selectors and returns error is any of them is invalid.
|
||||
func (c *Context) processSelectors(p *netmap.PlacementPolicy) error {
|
||||
for _, s := range p.GetSelectors() {
|
||||
if s == nil {
|
||||
return fmt.Errorf("%w: SELECT", ErrMissingField)
|
||||
} else if s.GetFilter() != MainFilterName {
|
||||
_, ok := c.Filters[s.GetFilter()]
|
||||
if !ok {
|
||||
return fmt.Errorf("%w: SELECT FROM '%s'", ErrFilterNotFound, s.GetFilter())
|
||||
}
|
||||
}
|
||||
c.Selectors[s.GetName()] = s
|
||||
result, err := c.getSelection(p, s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Selections[s.GetName()] = result
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetNodesCount returns amount of buckets and nodes in every bucket
|
||||
// for a given placement policy.
|
||||
func GetNodesCount(p *netmap.PlacementPolicy, s *netmap.Selector) (int, int) {
|
||||
switch s.GetClause() {
|
||||
case netmap.Same:
|
||||
return 1, int(p.GetContainerBackupFactor() * s.GetCount())
|
||||
default:
|
||||
return int(s.GetCount()), int(p.GetContainerBackupFactor())
|
||||
}
|
||||
}
|
||||
|
||||
// getSelection returns nodes grouped by s.attribute.
|
||||
func (c *Context) getSelection(p *netmap.PlacementPolicy, s *netmap.Selector) ([]Nodes, error) {
|
||||
bucketCount, nodesInBucket := GetNodesCount(p, s)
|
||||
m := c.getSelectionBase(s)
|
||||
if len(m) < bucketCount {
|
||||
return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.GetName())
|
||||
}
|
||||
|
||||
keys := make(sort.StringSlice, 0, len(m))
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
if len(c.pivot) == 0 {
|
||||
// deterministic order in case of zero seed
|
||||
keys.Sort()
|
||||
}
|
||||
|
||||
nodes := make([]Nodes, 0, len(m))
|
||||
for i := range keys {
|
||||
ns := m[keys[i]]
|
||||
if len(ns) >= nodesInBucket {
|
||||
nodes = append(nodes, ns[:nodesInBucket])
|
||||
}
|
||||
}
|
||||
if len(nodes) < bucketCount {
|
||||
return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.GetName())
|
||||
}
|
||||
if len(c.pivot) != 0 {
|
||||
weights := make([]float64, len(nodes))
|
||||
for i := range nodes {
|
||||
weights[i] = GetBucketWeight(nodes[i], c.aggregator(), c.weightFunc)
|
||||
}
|
||||
hrw.SortSliceByWeightIndex(nodes, weights, c.pivotHash)
|
||||
}
|
||||
return nodes[:bucketCount], nil
|
||||
}
|
||||
|
||||
// getSelectionBase returns nodes grouped by selector attribute.
|
||||
func (c *Context) getSelectionBase(s *netmap.Selector) map[string]Nodes {
|
||||
f := c.Filters[s.GetFilter()]
|
||||
isMain := s.GetFilter() == MainFilterName
|
||||
result := map[string]Nodes{}
|
||||
for i := range c.Netmap.Nodes {
|
||||
if isMain || c.match(f, c.Netmap.Nodes[i]) {
|
||||
v := c.Netmap.Nodes[i].Attribute(s.GetAttribute())
|
||||
result[v] = append(result[v], c.Netmap.Nodes[i])
|
||||
}
|
||||
}
|
||||
|
||||
if len(c.pivot) != 0 {
|
||||
for _, ns := range result {
|
||||
hrw.SortSliceByWeightValue(ns, ns.Weights(c.weightFunc), c.pivotHash)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
218
pkg/netmap/selector_test.go
Normal file
218
pkg/netmap/selector_test.go
Normal file
|
@ -0,0 +1,218 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPlacementPolicy_GetPlacementVectors(t *testing.T) {
|
||||
p := newPlacementPolicy(2,
|
||||
[]*netmap.Replica{
|
||||
newReplica(1, "SPB"),
|
||||
newReplica(2, "Americas"),
|
||||
},
|
||||
[]*netmap.Selector{
|
||||
newSelector("SPB", "City", netmap.Same, 1, "SPBSSD"),
|
||||
newSelector("Americas", "City", netmap.Distinct, 2, "Americas"),
|
||||
},
|
||||
[]*netmap.Filter{
|
||||
newFilter("SPBSSD", "", "", netmap.AND,
|
||||
newFilter("", "Country", "RU", netmap.EQ),
|
||||
newFilter("", "City", "St.Petersburg", netmap.EQ),
|
||||
newFilter("", "SSD", "1", netmap.EQ)),
|
||||
newFilter("Americas", "", "", netmap.OR,
|
||||
newFilter("", "Continent", "NA", netmap.EQ),
|
||||
newFilter("", "Continent", "SA", netmap.EQ)),
|
||||
})
|
||||
nodes := []netmap.NodeInfo{
|
||||
nodeInfoFromAttributes("ID", "1", "Country", "RU", "City", "St.Petersburg", "SSD", "0"),
|
||||
nodeInfoFromAttributes("ID", "2", "Country", "RU", "City", "St.Petersburg", "SSD", "1"),
|
||||
nodeInfoFromAttributes("ID", "3", "Country", "RU", "City", "Moscow", "SSD", "1"),
|
||||
nodeInfoFromAttributes("ID", "4", "Country", "RU", "City", "Moscow", "SSD", "1"),
|
||||
nodeInfoFromAttributes("ID", "5", "Country", "RU", "City", "St.Petersburg", "SSD", "1"),
|
||||
nodeInfoFromAttributes("ID", "6", "Continent", "NA", "City", "NewYork"),
|
||||
nodeInfoFromAttributes("ID", "7", "Continent", "AF", "City", "Cairo"),
|
||||
nodeInfoFromAttributes("ID", "8", "Continent", "AF", "City", "Cairo"),
|
||||
nodeInfoFromAttributes("ID", "9", "Continent", "SA", "City", "Lima"),
|
||||
nodeInfoFromAttributes("ID", "10", "Continent", "AF", "City", "Cairo"),
|
||||
nodeInfoFromAttributes("ID", "11", "Continent", "NA", "City", "NewYork"),
|
||||
nodeInfoFromAttributes("ID", "12", "Continent", "NA", "City", "LosAngeles"),
|
||||
nodeInfoFromAttributes("ID", "13", "Continent", "SA", "City", "Lima"),
|
||||
}
|
||||
|
||||
nm, err := NewNetmap(NodesFromV2(nodes))
|
||||
require.NoError(t, err)
|
||||
v, err := nm.GetContainerNodes(p, nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(v.Replicas()))
|
||||
require.Equal(t, 6, len(v.Flatten()))
|
||||
|
||||
require.Equal(t, 2, len(v.Replicas()[0]))
|
||||
ids := map[string]struct{}{}
|
||||
for _, ni := range v.Replicas()[0] {
|
||||
require.Equal(t, "RU", ni.Attribute("Country"))
|
||||
require.Equal(t, "St.Petersburg", ni.Attribute("City"))
|
||||
require.Equal(t, "1", ni.Attribute("SSD"))
|
||||
ids[ni.Attribute("ID")] = struct{}{}
|
||||
}
|
||||
require.Equal(t, len(v.Replicas()[0]), len(ids), "not all nodes we distinct")
|
||||
|
||||
require.Equal(t, 4, len(v.Replicas()[1])) // 2 cities * 2 HRWB
|
||||
ids = map[string]struct{}{}
|
||||
for _, ni := range v.Replicas()[1] {
|
||||
require.Contains(t, []string{"NA", "SA"}, ni.Attribute("Continent"))
|
||||
ids[ni.Attribute("ID")] = struct{}{}
|
||||
}
|
||||
require.Equal(t, len(v.Replicas()[1]), len(ids), "not all nodes we distinct")
|
||||
}
|
||||
|
||||
func TestPlacementPolicy_ProcessSelectors(t *testing.T) {
|
||||
p := newPlacementPolicy(2, nil,
|
||||
[]*netmap.Selector{
|
||||
newSelector("SameRU", "City", netmap.Same, 2, "FromRU"),
|
||||
newSelector("DistinctRU", "City", netmap.Distinct, 2, "FromRU"),
|
||||
newSelector("Good", "Country", netmap.Distinct, 2, "Good"),
|
||||
newSelector("Main", "Country", netmap.Distinct, 3, "*"),
|
||||
},
|
||||
[]*netmap.Filter{
|
||||
newFilter("FromRU", "Country", "Russia", netmap.EQ),
|
||||
newFilter("Good", "Rating", "4", netmap.GE),
|
||||
})
|
||||
nodes := []netmap.NodeInfo{
|
||||
nodeInfoFromAttributes("Country", "Russia", "Rating", "1", "City", "SPB"),
|
||||
nodeInfoFromAttributes("Country", "Germany", "Rating", "5", "City", "Berlin"),
|
||||
nodeInfoFromAttributes("Country", "Russia", "Rating", "6", "City", "Moscow"),
|
||||
nodeInfoFromAttributes("Country", "France", "Rating", "4", "City", "Paris"),
|
||||
nodeInfoFromAttributes("Country", "France", "Rating", "1", "City", "Lyon"),
|
||||
nodeInfoFromAttributes("Country", "Russia", "Rating", "5", "City", "SPB"),
|
||||
nodeInfoFromAttributes("Country", "Russia", "Rating", "7", "City", "Moscow"),
|
||||
nodeInfoFromAttributes("Country", "Germany", "Rating", "3", "City", "Darmstadt"),
|
||||
nodeInfoFromAttributes("Country", "Germany", "Rating", "7", "City", "Frankfurt"),
|
||||
nodeInfoFromAttributes("Country", "Russia", "Rating", "9", "City", "SPB"),
|
||||
nodeInfoFromAttributes("Country", "Russia", "Rating", "9", "City", "SPB"),
|
||||
}
|
||||
|
||||
nm, err := NewNetmap(NodesFromV2(nodes))
|
||||
require.NoError(t, err)
|
||||
c := NewContext(nm)
|
||||
require.NoError(t, c.processFilters(p))
|
||||
require.NoError(t, c.processSelectors(p))
|
||||
|
||||
for _, s := range p.GetSelectors() {
|
||||
sel := c.Selections[s.GetName()]
|
||||
s := c.Selectors[s.GetName()]
|
||||
bucketCount, nodesInBucket := GetNodesCount(p, s)
|
||||
targ := fmt.Sprintf("selector '%s'", s.GetName())
|
||||
require.Equal(t, bucketCount, len(sel), targ)
|
||||
for _, res := range sel {
|
||||
require.Equal(t, nodesInBucket, len(res), targ)
|
||||
for j := range res {
|
||||
require.True(t, c.applyFilter(s.GetFilter(), res[j]), targ)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) {
|
||||
p := newPlacementPolicy(1, nil,
|
||||
[]*netmap.Selector{
|
||||
newSelector("Main", "Country", netmap.Distinct, 3, "*"),
|
||||
}, nil)
|
||||
|
||||
// bucket weight order: RU > DE > FR
|
||||
nodes := []netmap.NodeInfo{
|
||||
nodeInfoFromAttributes("Country", "Germany", PriceAttr, "2", CapacityAttr, "10000"),
|
||||
nodeInfoFromAttributes("Country", "Germany", PriceAttr, "4", CapacityAttr, "1"),
|
||||
nodeInfoFromAttributes("Country", "France", PriceAttr, "3", CapacityAttr, "10"),
|
||||
nodeInfoFromAttributes("Country", "Russia", PriceAttr, "2", CapacityAttr, "10000"),
|
||||
nodeInfoFromAttributes("Country", "Russia", PriceAttr, "1", CapacityAttr, "10000"),
|
||||
nodeInfoFromAttributes("Country", "Russia", CapacityAttr, "10000"),
|
||||
nodeInfoFromAttributes("Country", "France", PriceAttr, "100", CapacityAttr, "1"),
|
||||
nodeInfoFromAttributes("Country", "France", PriceAttr, "7", CapacityAttr, "10000"),
|
||||
nodeInfoFromAttributes("Country", "Russia", PriceAttr, "2", CapacityAttr, "1"),
|
||||
}
|
||||
nm, err := NewNetmap(NodesFromV2(nodes))
|
||||
require.NoError(t, err)
|
||||
c := NewContext(nm)
|
||||
c.setPivot([]byte("containerID"))
|
||||
c.weightFunc = newWeightFunc(newMaxNorm(10000), newReverseMinNorm(1))
|
||||
c.aggregator = newMaxAgg
|
||||
require.NoError(t, c.processFilters(p))
|
||||
require.NoError(t, c.processSelectors(p))
|
||||
|
||||
cnt := c.Selections["Main"]
|
||||
expected := []Nodes{
|
||||
{{Index: 4, Capacity: 10000, Price: 1}}, // best RU
|
||||
{{Index: 0, Capacity: 10000, Price: 2}}, // best DE
|
||||
{{Index: 7, Capacity: 10000, Price: 7}}, // best FR
|
||||
}
|
||||
require.Equal(t, len(expected), len(cnt))
|
||||
for i := range expected {
|
||||
require.Equal(t, len(expected[i]), len(cnt[i]))
|
||||
require.Equal(t, expected[i][0].Index, cnt[i][0].Index)
|
||||
require.Equal(t, expected[i][0].Capacity, cnt[i][0].Capacity)
|
||||
require.Equal(t, expected[i][0].Price, cnt[i][0].Price)
|
||||
}
|
||||
|
||||
res, err := nm.GetPlacementVectors(containerNodes(cnt), []byte("objectID"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, res, cnt)
|
||||
}
|
||||
|
||||
func TestPlacementPolicy_ProcessSelectorsInvalid(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
p *netmap.PlacementPolicy
|
||||
err error
|
||||
}{
|
||||
{
|
||||
"MissingSelector",
|
||||
newPlacementPolicy(2, nil,
|
||||
[]*netmap.Selector{nil},
|
||||
[]*netmap.Filter{}),
|
||||
ErrMissingField,
|
||||
},
|
||||
{
|
||||
"InvalidFilterReference",
|
||||
newPlacementPolicy(1, nil,
|
||||
[]*netmap.Selector{newSelector("MyStore", "Country", netmap.Distinct, 1, "FromNL")},
|
||||
[]*netmap.Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}),
|
||||
ErrFilterNotFound,
|
||||
},
|
||||
{
|
||||
"NotEnoughNodes (backup factor)",
|
||||
newPlacementPolicy(2, nil,
|
||||
[]*netmap.Selector{newSelector("MyStore", "Country", netmap.Distinct, 1, "FromRU")},
|
||||
[]*netmap.Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}),
|
||||
ErrNotEnoughNodes,
|
||||
},
|
||||
{
|
||||
"NotEnoughNodes (buckets)",
|
||||
newPlacementPolicy(1, nil,
|
||||
[]*netmap.Selector{newSelector("MyStore", "Country", netmap.Distinct, 2, "FromRU")},
|
||||
[]*netmap.Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}),
|
||||
ErrNotEnoughNodes,
|
||||
},
|
||||
}
|
||||
nodes := []netmap.NodeInfo{
|
||||
nodeInfoFromAttributes("Country", "Russia"),
|
||||
nodeInfoFromAttributes("Country", "Germany"),
|
||||
nodeInfoFromAttributes(),
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
nm, err := NewNetmap(NodesFromV2(nodes))
|
||||
require.NoError(t, err)
|
||||
c := NewContext(nm)
|
||||
require.NoError(t, c.processFilters(tc.p))
|
||||
|
||||
err = c.processSelectors(tc.p)
|
||||
require.True(t, errors.Is(err, tc.err), "got: %v", err)
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue