From 369bd382b375afaf18c0b6202af1b921bb1e9450 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 27 Oct 2021 13:00:35 +0300 Subject: [PATCH] [#42] netmap: move package from neofs-api-go Signed-off-by: Evgenii Stratonikov --- go.mod | 1 + netmap/aggregator.go | 223 +++++++++++++++ netmap/clause.go | 69 +++++ netmap/clause_test.go | 43 +++ netmap/container.go | 19 ++ netmap/context.go | 94 +++++++ netmap/doc.go | 11 + netmap/filter.go | 272 ++++++++++++++++++ netmap/filter_test.go | 331 ++++++++++++++++++++++ netmap/helper_test.go | 93 +++++++ netmap/netmap.go | 100 +++++++ netmap/network_info.go | 204 ++++++++++++++ netmap/network_info_test.go | 214 +++++++++++++++ netmap/node_info.go | 429 +++++++++++++++++++++++++++++ netmap/node_info_test.go | 263 ++++++++++++++++++ netmap/operation.go | 116 ++++++++ netmap/operation_test.go | 73 +++++ netmap/policy.go | 145 ++++++++++ netmap/policy_test.go | 180 ++++++++++++ netmap/replica.go | 71 +++++ netmap/replica_test.go | 99 +++++++ netmap/selector.go | 264 ++++++++++++++++++ netmap/selector_test.go | 530 ++++++++++++++++++++++++++++++++++++ netmap/test/generate.go | 37 +++ 24 files changed, 3881 insertions(+) create mode 100644 netmap/aggregator.go create mode 100644 netmap/clause.go create mode 100644 netmap/clause_test.go create mode 100644 netmap/container.go create mode 100644 netmap/context.go create mode 100644 netmap/doc.go create mode 100644 netmap/filter.go create mode 100644 netmap/filter_test.go create mode 100644 netmap/helper_test.go create mode 100644 netmap/netmap.go create mode 100644 netmap/network_info.go create mode 100644 netmap/network_info_test.go create mode 100644 netmap/node_info.go create mode 100644 netmap/node_info_test.go create mode 100644 netmap/operation.go create mode 100644 netmap/operation_test.go create mode 100644 netmap/policy.go create mode 100644 netmap/policy_test.go create mode 100644 netmap/replica.go create mode 100644 netmap/replica_test.go create mode 100644 netmap/selector.go create mode 100644 netmap/selector_test.go create mode 100644 netmap/test/generate.go diff --git a/go.mod b/go.mod index 4761a1d..c40aa5c 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521073959-f0d4d129b7f1 github.com/golang/mock v1.6.0 github.com/google/uuid v1.2.0 + github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.96.1 github.com/nspcc-dev/neofs-api-go v1.30.0 github.com/stretchr/testify v1.7.0 diff --git a/netmap/aggregator.go b/netmap/aggregator.go new file mode 100644 index 0000000..d1c6061 --- /dev/null +++ b/netmap/aggregator.go @@ -0,0 +1,223 @@ +package netmap + +import ( + "sort" +) + +type ( + // aggregator can calculate some value across all netmap + // such as median, minimum or maximum. + aggregator interface { + Add(float64) + Compute() float64 + } + + // normalizer normalizes weight. + normalizer interface { + Normalize(w float64) float64 + } + + meanSumAgg struct { + sum float64 + count int + } + + meanAgg struct { + mean float64 + count int + } + + minAgg struct { + min float64 + } + + maxAgg struct { + max float64 + } + + meanIQRAgg struct { + k float64 + arr []float64 + } + + reverseMinNorm struct { + min float64 + } + + maxNorm struct { + max float64 + } + + sigmoidNorm struct { + scale float64 + } + + constNorm struct { + value float64 + } + + // weightFunc calculates n's weight. + weightFunc = func(n *Node) float64 +) + +var ( + _ aggregator = (*meanSumAgg)(nil) + _ aggregator = (*meanAgg)(nil) + _ aggregator = (*minAgg)(nil) + _ aggregator = (*maxAgg)(nil) + _ aggregator = (*meanIQRAgg)(nil) + + _ normalizer = (*reverseMinNorm)(nil) + _ normalizer = (*maxNorm)(nil) + _ normalizer = (*sigmoidNorm)(nil) + _ normalizer = (*constNorm)(nil) +) + +// newWeightFunc returns weightFunc which multiplies normalized +// capacity and price. +func newWeightFunc(capNorm, priceNorm normalizer) weightFunc { + return func(n *Node) float64 { + return capNorm.Normalize(float64(n.Capacity)) * priceNorm.Normalize(float64(n.Price)) + } +} + +// newMeanAgg returns an aggregator which +// computes mean value by recalculating it on +// every addition. +func newMeanAgg() aggregator { + return new(meanAgg) +} + +// newMinAgg returns an aggregator which +// computes min value. +func newMinAgg() aggregator { + return new(minAgg) +} + +// newMeanIQRAgg returns an aggregator which +// computes mean value of values from IQR interval. +func newMeanIQRAgg() aggregator { + return new(meanIQRAgg) +} + +// newReverseMinNorm returns a normalizer which +// normalize values in range of 0.0 to 1.0 to a minimum value. +func newReverseMinNorm(min float64) normalizer { + return &reverseMinNorm{min: min} +} + +// newSigmoidNorm returns a normalizer which +// normalize values in range of 0.0 to 1.0 to a scaled sigmoid. +func newSigmoidNorm(scale float64) normalizer { + return &sigmoidNorm{scale: scale} +} + +func (a *meanSumAgg) Add(n float64) { + a.sum += n + a.count++ +} + +func (a *meanSumAgg) Compute() float64 { + if a.count == 0 { + return 0 + } + + return a.sum / float64(a.count) +} + +func (a *meanAgg) Add(n float64) { + c := a.count + 1 + a.mean = a.mean*(float64(a.count)/float64(c)) + n/float64(c) + a.count++ +} + +func (a *meanAgg) Compute() float64 { + return a.mean +} + +func (a *minAgg) Add(n float64) { + if a.min == 0 || n < a.min { + a.min = n + } +} + +func (a *minAgg) Compute() float64 { + return a.min +} + +func (a *maxAgg) Add(n float64) { + if n > a.max { + a.max = n + } +} + +func (a *maxAgg) Compute() float64 { + return a.max +} + +func (a *meanIQRAgg) Add(n float64) { + a.arr = append(a.arr, n) +} + +func (a *meanIQRAgg) Compute() float64 { + l := len(a.arr) + if l == 0 { + return 0 + } + + sort.Slice(a.arr, func(i, j int) bool { return a.arr[i] < a.arr[j] }) + + var min, max float64 + + const minLn = 4 + + if l < minLn { + min, max = a.arr[0], a.arr[l-1] + } else { + start, end := l/minLn, l*3/minLn-1 + iqr := a.k * (a.arr[end] - a.arr[start]) + min, max = a.arr[start]-iqr, a.arr[end]+iqr + } + + count := 0 + sum := float64(0) + + for _, e := range a.arr { + if e >= min && e <= max { + sum += e + count++ + } + } + + return sum / float64(count) +} + +func (r *reverseMinNorm) Normalize(w float64) float64 { + if w == 0 { + return 0 + } + + return r.min / w +} + +func (r *maxNorm) Normalize(w float64) float64 { + if r.max == 0 { + return 0 + } + + return w / r.max +} + +func (r *sigmoidNorm) Normalize(w float64) float64 { + if r.scale == 0 { + return 0 + } + + x := w / r.scale + + return x / (1 + x) +} + +func (r *constNorm) Normalize(_ float64) float64 { + return r.value +} diff --git a/netmap/clause.go b/netmap/clause.go new file mode 100644 index 0000000..d2b5c40 --- /dev/null +++ b/netmap/clause.go @@ -0,0 +1,69 @@ +package netmap + +import ( + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +// Clause is an enumeration of selector modifiers +// that shows how the node set will be formed. +type Clause uint32 + +const ( + ClauseUnspecified Clause = iota + + // ClauseSame is a selector modifier to select only nodes having the same value of bucket attribute. + ClauseSame + + // ClauseDistinct is a selector modifier to select nodes having different values of bucket attribute. + ClauseDistinct +) + +// ClauseFromV2 converts v2 Clause to Clause. +func ClauseFromV2(c netmap.Clause) Clause { + switch c { + default: + return ClauseUnspecified + case netmap.Same: + return ClauseSame + case netmap.Distinct: + return ClauseDistinct + } +} + +// ToV2 converts Clause to v2 Clause. +func (c Clause) ToV2() netmap.Clause { + switch c { + default: + return netmap.UnspecifiedClause + case ClauseDistinct: + return netmap.Distinct + case ClauseSame: + return netmap.Same + } +} + +// String returns string representation of Clause. +// +// String mapping: +// * ClauseDistinct: DISTINCT; +// * ClauseSame: SAME; +// * ClauseUnspecified, default: CLAUSE_UNSPECIFIED. +func (c Clause) String() string { + return c.ToV2().String() +} + +// FromString parses Clause from a string representation. +// It is a reverse action to String(). +// +// Returns true if s was parsed successfully. +func (c *Clause) FromString(s string) bool { + var g netmap.Clause + + ok := g.FromString(s) + + if ok { + *c = ClauseFromV2(g) + } + + return ok +} diff --git a/netmap/clause_test.go b/netmap/clause_test.go new file mode 100644 index 0000000..6ce5e82 --- /dev/null +++ b/netmap/clause_test.go @@ -0,0 +1,43 @@ +package netmap + +import ( + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap" + "github.com/stretchr/testify/require" +) + +func TestClauseFromV2(t *testing.T) { + for _, item := range []struct { + c Clause + cV2 netmap.Clause + }{ + { + c: ClauseUnspecified, + cV2: netmap.UnspecifiedClause, + }, + { + c: ClauseSame, + cV2: netmap.Same, + }, + { + c: ClauseDistinct, + cV2: netmap.Distinct, + }, + } { + require.Equal(t, item.c, ClauseFromV2(item.cV2)) + require.Equal(t, item.cV2, item.c.ToV2()) + } +} + +func TestClause_String(t *testing.T) { + toPtr := func(v Clause) *Clause { + return &v + } + + testEnumStrings(t, new(Clause), []enumStringItem{ + {val: toPtr(ClauseDistinct), str: "DISTINCT"}, + {val: toPtr(ClauseSame), str: "SAME"}, + {val: toPtr(ClauseUnspecified), str: "CLAUSE_UNSPECIFIED"}, + }) +} diff --git a/netmap/container.go b/netmap/container.go new file mode 100644 index 0000000..00875a9 --- /dev/null +++ b/netmap/container.go @@ -0,0 +1,19 @@ +package netmap + +// ContainerNodes represents nodes in the container. +type ContainerNodes interface { + Replicas() []Nodes + Flatten() Nodes +} + +type containerNodes []Nodes + +// Flatten returns list of all nodes from the container. +func (c containerNodes) Flatten() Nodes { + return flattenNodes(c) +} + +// Replicas return list of container replicas. +func (c containerNodes) Replicas() []Nodes { + return c +} diff --git a/netmap/context.go b/netmap/context.go new file mode 100644 index 0000000..29b8167 --- /dev/null +++ b/netmap/context.go @@ -0,0 +1,94 @@ +package netmap + +import ( + "errors" + + "github.com/nspcc-dev/hrw" +) + +// Context contains references to named filters and cached numeric values. +type Context struct { + // Netmap is a netmap structure to operate on. + Netmap *Netmap + // Filters stores processed filters. + Filters map[string]*Filter + // Selectors stores processed selectors. + Selectors map[string]*Selector + // Selections stores result of selector processing. + Selections map[string][]Nodes + + // numCache stores parsed numeric values. + numCache map[*Filter]uint64 + // pivot is a seed for HRW. + pivot []byte + // pivotHash is a saved HRW hash of pivot + pivotHash uint64 + // aggregator is returns aggregator determining bucket weight. + // By default it returns mean value from IQR interval. + aggregator func() aggregator + // weightFunc is a weighting function for determining node priority. + // By default in combines favours low price and high capacity. + weightFunc weightFunc + // container backup factor is a factor for selector counters that expand + // amount of chosen nodes. + cbf uint32 +} + +// Various validation errors. +var ( + ErrMissingField = errors.New("netmap: nil field") + ErrInvalidFilterName = errors.New("netmap: filter name is invalid") + ErrInvalidNumber = errors.New("netmap: number value expected") + ErrInvalidFilterOp = errors.New("netmap: invalid filter operation") + ErrFilterNotFound = errors.New("netmap: filter not found") + ErrNonEmptyFilters = errors.New("netmap: simple filter must no contain sub-filters") + ErrNotEnoughNodes = errors.New("netmap: not enough nodes to SELECT from") + ErrSelectorNotFound = errors.New("netmap: selector not found") + ErrUnnamedTopFilter = errors.New("netmap: all filters on top level must be named") +) + +// NewContext creates new context. It contains various caches. +// In future it may create hierarchical netmap structure to work with. +func NewContext(nm *Netmap) *Context { + return &Context{ + Netmap: nm, + Filters: make(map[string]*Filter), + Selectors: make(map[string]*Selector), + Selections: make(map[string][]Nodes), + + numCache: make(map[*Filter]uint64), + aggregator: newMeanIQRAgg, + weightFunc: GetDefaultWeightFunc(nm.Nodes), + cbf: defaultCBF, + } +} + +func (c *Context) setPivot(pivot []byte) { + if len(pivot) != 0 { + c.pivot = pivot + c.pivotHash = hrw.Hash(pivot) + } +} + +func (c *Context) setCBF(cbf uint32) { + if cbf == 0 { + c.cbf = defaultCBF + } else { + c.cbf = cbf + } +} + +// GetDefaultWeightFunc returns default weighting function. +func GetDefaultWeightFunc(ns Nodes) weightFunc { + mean := newMeanAgg() + min := newMinAgg() + + for i := range ns { + mean.Add(float64(ns[i].Capacity)) + min.Add(float64(ns[i].Price)) + } + + return newWeightFunc( + newSigmoidNorm(mean.Compute()), + newReverseMinNorm(min.Compute())) +} diff --git a/netmap/doc.go b/netmap/doc.go new file mode 100644 index 0000000..a38e985 --- /dev/null +++ b/netmap/doc.go @@ -0,0 +1,11 @@ +/* +Package netmap provides routines for working with netmap and placement policy. +Work is done in 4 steps: +1. Create context containing results shared between steps. +2. Processing filters. +3. Processing selectors. +4. Processing replicas. + +Each step depends only on previous ones. +*/ +package netmap diff --git a/netmap/filter.go b/netmap/filter.go new file mode 100644 index 0000000..38fdb4f --- /dev/null +++ b/netmap/filter.go @@ -0,0 +1,272 @@ +package netmap + +import ( + "fmt" + "strconv" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +// Filter represents v2-compatible netmap filter. +type Filter netmap.Filter + +// MainFilterName is a name of the filter +// which points to the whole netmap. +const MainFilterName = "*" + +// applyFilter applies named filter to b. +func (c *Context) applyFilter(name string, b *Node) bool { + return name == MainFilterName || c.match(c.Filters[name], b) +} + +// processFilters processes filters and returns error is any of them is invalid. +func (c *Context) processFilters(p *PlacementPolicy) error { + for _, f := range p.Filters() { + if err := c.processFilter(f, true); err != nil { + return err + } + } + + return nil +} + +func (c *Context) processFilter(f *Filter, top bool) error { + if f == nil { + return fmt.Errorf("%w: FILTER", ErrMissingField) + } + + if f.Name() == MainFilterName { + return fmt.Errorf("%w: '*' is reserved", ErrInvalidFilterName) + } + + if top && f.Name() == "" { + return ErrUnnamedTopFilter + } + + if !top && f.Name() != "" && c.Filters[f.Name()] == nil { + return fmt.Errorf("%w: '%s'", ErrFilterNotFound, f.Name()) + } + + switch f.Operation() { + case OpAND, OpOR: + for _, flt := range f.InnerFilters() { + if err := c.processFilter(flt, false); err != nil { + return err + } + } + default: + if len(f.InnerFilters()) != 0 { + return ErrNonEmptyFilters + } else if !top && f.Name() != "" { // named reference + return nil + } + + switch f.Operation() { + case OpEQ, OpNE: + case OpGT, OpGE, OpLT, OpLE: + n, err := strconv.ParseUint(f.Value(), 10, 64) + if err != nil { + return fmt.Errorf("%w: '%s'", ErrInvalidNumber, f.Value()) + } + + c.numCache[f] = n + default: + return fmt.Errorf("%w: %s", ErrInvalidFilterOp, f.Operation()) + } + } + + if top { + c.Filters[f.Name()] = f + } + + return nil +} + +// match matches f against b. It returns no errors because +// filter should have been parsed during context creation +// and missing node properties are considered as a regular fail. +func (c *Context) match(f *Filter, b *Node) bool { + switch f.Operation() { + case OpAND, OpOR: + for _, lf := range f.InnerFilters() { + if lf.Name() != "" { + lf = c.Filters[lf.Name()] + } + + ok := c.match(lf, b) + if ok == (f.Operation() == OpOR) { + return ok + } + } + + return f.Operation() == OpAND + default: + return c.matchKeyValue(f, b) + } +} + +func (c *Context) matchKeyValue(f *Filter, b *Node) bool { + switch f.Operation() { + case OpEQ: + return b.Attribute(f.Key()) == f.Value() + case OpNE: + return b.Attribute(f.Key()) != f.Value() + default: + var attr uint64 + + switch f.Key() { + case AttrPrice: + attr = b.Price + case AttrCapacity: + attr = b.Capacity + default: + var err error + + attr, err = strconv.ParseUint(b.Attribute(f.Key()), 10, 64) + if err != nil { + // Note: because filters are somewhat independent from nodes attributes, + // We don't report an error here, and fail filter instead. + return false + } + } + + switch f.Operation() { + case OpGT: + return attr > c.numCache[f] + case OpGE: + return attr >= c.numCache[f] + case OpLT: + return attr < c.numCache[f] + case OpLE: + return attr <= c.numCache[f] + default: + // do nothing and return false + } + } + // will not happen if context was created from f (maybe panic?) + return false +} + +// NewFilter creates and returns new Filter instance. +// +// Defaults: +// - name: ""; +// - key: ""; +// - value: ""; +// - operation: 0; +// - filters: nil. +func NewFilter() *Filter { + return NewFilterFromV2(new(netmap.Filter)) +} + +// NewFilterFromV2 converts v2 Filter to Filter. +// +// Nil netmap.Filter converts to nil. +func NewFilterFromV2(f *netmap.Filter) *Filter { + return (*Filter)(f) +} + +// ToV2 converts Filter to v2 Filter. +// +// Nil Filter converts to nil. +func (f *Filter) ToV2() *netmap.Filter { + return (*netmap.Filter)(f) +} + +// Key returns key to filter. +func (f *Filter) Key() string { + return (*netmap.Filter)(f).GetKey() +} + +// SetKey sets key to filter. +func (f *Filter) SetKey(key string) { + (*netmap.Filter)(f).SetKey(key) +} + +// Value returns value to match. +func (f *Filter) Value() string { + return (*netmap.Filter)(f).GetValue() +} + +// SetValue sets value to match. +func (f *Filter) SetValue(val string) { + (*netmap.Filter)(f).SetValue(val) +} + +// Name returns filter name. +func (f *Filter) Name() string { + return (*netmap.Filter)(f).GetName() +} + +// SetName sets filter name. +func (f *Filter) SetName(name string) { + (*netmap.Filter)(f).SetName(name) +} + +// Operation returns filtering operation. +func (f *Filter) Operation() Operation { + return OperationFromV2( + (*netmap.Filter)(f).GetOp()) +} + +// SetOperation sets filtering operation. +func (f *Filter) SetOperation(op Operation) { + (*netmap.Filter)(f).SetOp(op.ToV2()) +} + +func filtersFromV2(fs []*netmap.Filter) []*Filter { + if fs == nil { + return nil + } + + res := make([]*Filter, 0, len(fs)) + + for i := range fs { + res = append(res, NewFilterFromV2(fs[i])) + } + + return res +} + +// InnerFilters returns list of inner filters. +func (f *Filter) InnerFilters() []*Filter { + return filtersFromV2((*netmap.Filter)(f).GetFilters()) +} + +func filtersToV2(fs []*Filter) (fsV2 []*netmap.Filter) { + if fs != nil { + fsV2 = make([]*netmap.Filter, 0, len(fs)) + + for i := range fs { + fsV2 = append(fsV2, fs[i].ToV2()) + } + } + + return +} + +// SetInnerFilters sets list of inner filters. +func (f *Filter) SetInnerFilters(fs ...*Filter) { + (*netmap.Filter)(f). + SetFilters(filtersToV2(fs)) +} + +// Marshal marshals Filter into a protobuf binary form. +func (f *Filter) Marshal() ([]byte, error) { + return (*netmap.Filter)(f).StableMarshal(nil) +} + +// Unmarshal unmarshals protobuf binary representation of Filter. +func (f *Filter) Unmarshal(data []byte) error { + return (*netmap.Filter)(f).Unmarshal(data) +} + +// MarshalJSON encodes Filter to protobuf JSON format. +func (f *Filter) MarshalJSON() ([]byte, error) { + return (*netmap.Filter)(f).MarshalJSON() +} + +// UnmarshalJSON decodes Filter from protobuf JSON format. +func (f *Filter) UnmarshalJSON(data []byte) error { + return (*netmap.Filter)(f).UnmarshalJSON(data) +} diff --git a/netmap/filter_test.go b/netmap/filter_test.go new file mode 100644 index 0000000..c9103e1 --- /dev/null +++ b/netmap/filter_test.go @@ -0,0 +1,331 @@ +package netmap + +import ( + "errors" + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap" + "github.com/stretchr/testify/require" +) + +func TestContext_ProcessFilters(t *testing.T) { + fs := []*Filter{ + newFilter("StorageSSD", "Storage", "SSD", OpEQ), + newFilter("GoodRating", "Rating", "4", OpGE), + newFilter("Main", "", "", OpAND, + newFilter("StorageSSD", "", "", 0), + newFilter("", "IntField", "123", OpLT), + newFilter("GoodRating", "", "", 0)), + } + nm, err := NewNetmap(nil) + require.NoError(t, err) + c := NewContext(nm) + p := newPlacementPolicy(1, nil, nil, fs) + require.NoError(t, c.processFilters(p)) + require.Equal(t, 3, len(c.Filters)) + for _, f := range fs { + require.Equal(t, f, c.Filters[f.Name()]) + } + + require.Equal(t, uint64(4), c.numCache[fs[1]]) + require.Equal(t, uint64(123), c.numCache[fs[2].InnerFilters()[1]]) +} + +func TestContext_ProcessFiltersInvalid(t *testing.T) { + errTestCases := []struct { + name string + filter *Filter + err error + }{ + { + "UnnamedTop", + newFilter("", "Storage", "SSD", OpEQ), + ErrUnnamedTopFilter, + }, + { + "InvalidReference", + newFilter("Main", "", "", OpAND, + newFilter("StorageSSD", "", "", 0)), + ErrFilterNotFound, + }, + { + "NonEmptyKeyed", + newFilter("Main", "Storage", "SSD", OpEQ, + newFilter("StorageSSD", "", "", 0)), + ErrNonEmptyFilters, + }, + { + "InvalidNumber", + newFilter("Main", "Rating", "three", OpGE), + ErrInvalidNumber, + }, + { + "InvalidOp", + newFilter("Main", "Rating", "3", 0), + ErrInvalidFilterOp, + }, + { + "InvalidName", + newFilter("*", "Rating", "3", OpGE), + ErrInvalidFilterName, + }, + { + "MissingFilter", + nil, + ErrMissingField, + }, + } + for _, tc := range errTestCases { + t.Run(tc.name, func(t *testing.T) { + c := NewContext(new(Netmap)) + p := newPlacementPolicy(1, nil, nil, []*Filter{tc.filter}) + err := c.processFilters(p) + require.True(t, errors.Is(err, tc.err), "got: %v", err) + }) + } +} + +func TestFilter_MatchSimple(t *testing.T) { + b := &Node{AttrMap: map[string]string{ + "Rating": "4", + "Country": "Germany", + }} + testCases := []struct { + name string + ok bool + f *Filter + }{ + { + "GE_true", true, + newFilter("Main", "Rating", "4", OpGE), + }, + { + "GE_false", false, + newFilter("Main", "Rating", "5", OpGE), + }, + { + "GT_true", true, + newFilter("Main", "Rating", "3", OpGT), + }, + { + "GT_false", false, + newFilter("Main", "Rating", "4", OpGT), + }, + { + "LE_true", true, + newFilter("Main", "Rating", "4", OpLE), + }, + { + "LE_false", false, + newFilter("Main", "Rating", "3", OpLE), + }, + { + "LT_true", true, + newFilter("Main", "Rating", "5", OpLT), + }, + { + "LT_false", false, + newFilter("Main", "Rating", "4", OpLT), + }, + { + "EQ_true", true, + newFilter("Main", "Country", "Germany", OpEQ), + }, + { + "EQ_false", false, + newFilter("Main", "Country", "China", OpEQ), + }, + { + "NE_true", true, + newFilter("Main", "Country", "France", OpNE), + }, + { + "NE_false", false, + newFilter("Main", "Country", "Germany", OpNE), + }, + } + for _, tc := range testCases { + c := NewContext(new(Netmap)) + p := newPlacementPolicy(1, nil, nil, []*Filter{tc.f}) + require.NoError(t, c.processFilters(p)) + require.Equal(t, tc.ok, c.match(tc.f, b)) + } + + t.Run("InvalidOp", func(t *testing.T) { + f := newFilter("Main", "Rating", "5", OpEQ) + c := NewContext(new(Netmap)) + p := newPlacementPolicy(1, nil, nil, []*Filter{f}) + require.NoError(t, c.processFilters(p)) + + // just for the coverage + f.SetOperation(0) + require.False(t, c.match(f, b)) + }) +} + +func TestFilter_Match(t *testing.T) { + fs := []*Filter{ + newFilter("StorageSSD", "Storage", "SSD", OpEQ), + newFilter("GoodRating", "Rating", "4", OpGE), + newFilter("Main", "", "", OpAND, + newFilter("StorageSSD", "", "", 0), + newFilter("", "IntField", "123", OpLT), + newFilter("GoodRating", "", "", 0), + newFilter("", "", "", OpOR, + newFilter("", "Param", "Value1", OpEQ), + newFilter("", "Param", "Value2", OpEQ), + )), + } + c := NewContext(new(Netmap)) + p := newPlacementPolicy(1, nil, nil, fs) + require.NoError(t, c.processFilters(p)) + + t.Run("Good", func(t *testing.T) { + n := getTestNode("Storage", "SSD", "Rating", "10", "IntField", "100", "Param", "Value1") + require.True(t, c.applyFilter("Main", n)) + }) + t.Run("InvalidStorage", func(t *testing.T) { + n := getTestNode("Storage", "HDD", "Rating", "10", "IntField", "100", "Param", "Value1") + require.False(t, c.applyFilter("Main", n)) + }) + t.Run("InvalidRating", func(t *testing.T) { + n := getTestNode("Storage", "SSD", "Rating", "3", "IntField", "100", "Param", "Value1") + require.False(t, c.applyFilter("Main", n)) + }) + t.Run("InvalidIntField", func(t *testing.T) { + n := getTestNode("Storage", "SSD", "Rating", "3", "IntField", "str", "Param", "Value1") + require.False(t, c.applyFilter("Main", n)) + }) + t.Run("InvalidParam", func(t *testing.T) { + n := getTestNode("Storage", "SSD", "Rating", "3", "IntField", "100", "Param", "NotValue") + require.False(t, c.applyFilter("Main", n)) + }) +} + +func testFilter() *Filter { + f := NewFilter() + f.SetOperation(OpGE) + f.SetName("name") + f.SetKey("key") + f.SetValue("value") + + return f +} + +func TestFilterFromV2(t *testing.T) { + t.Run("nil from V2", func(t *testing.T) { + var x *netmap.Filter + + require.Nil(t, NewFilterFromV2(x)) + }) + + t.Run("nil to V2", func(t *testing.T) { + var x *Filter + + require.Nil(t, x.ToV2()) + }) + + fV2 := new(netmap.Filter) + fV2.SetOp(netmap.GE) + fV2.SetName("name") + fV2.SetKey("key") + fV2.SetValue("value") + + f := NewFilterFromV2(fV2) + + require.Equal(t, fV2, f.ToV2()) +} + +func TestFilter_Key(t *testing.T) { + f := NewFilter() + key := "some key" + + f.SetKey(key) + + require.Equal(t, key, f.Key()) +} + +func TestFilter_Value(t *testing.T) { + f := NewFilter() + val := "some value" + + f.SetValue(val) + + require.Equal(t, val, f.Value()) +} + +func TestFilter_Name(t *testing.T) { + f := NewFilter() + name := "some name" + + f.SetName(name) + + require.Equal(t, name, f.Name()) +} + +func TestFilter_Operation(t *testing.T) { + f := NewFilter() + op := OpGE + + f.SetOperation(op) + + require.Equal(t, op, f.Operation()) +} + +func TestFilter_InnerFilters(t *testing.T) { + f := NewFilter() + + f1, f2 := testFilter(), testFilter() + + f.SetInnerFilters(f1, f2) + + require.Equal(t, []*Filter{f1, f2}, f.InnerFilters()) +} + +func TestFilterEncoding(t *testing.T) { + f := newFilter("name", "key", "value", OpEQ, + newFilter("name2", "key2", "value", OpOR), + ) + + t.Run("binary", func(t *testing.T) { + data, err := f.Marshal() + require.NoError(t, err) + + f2 := NewFilter() + require.NoError(t, f2.Unmarshal(data)) + + require.Equal(t, f, f2) + }) + + t.Run("json", func(t *testing.T) { + data, err := f.MarshalJSON() + require.NoError(t, err) + + f2 := NewFilter() + require.NoError(t, f2.UnmarshalJSON(data)) + + require.Equal(t, f, f2) + }) +} + +func TestNewFilter(t *testing.T) { + t.Run("default values", func(t *testing.T) { + filter := NewFilter() + + // check initial values + require.Empty(t, filter.Name()) + require.Empty(t, filter.Key()) + require.Empty(t, filter.Value()) + require.Zero(t, filter.Operation()) + require.Nil(t, filter.InnerFilters()) + + // convert to v2 message + filterV2 := filter.ToV2() + + require.Empty(t, filterV2.GetName()) + require.Empty(t, filterV2.GetKey()) + require.Empty(t, filterV2.GetValue()) + require.Equal(t, netmap.UnspecifiedOperation, filterV2.GetOp()) + require.Nil(t, filterV2.GetFilters()) + }) +} diff --git a/netmap/helper_test.go b/netmap/helper_test.go new file mode 100644 index 0000000..73b82ae --- /dev/null +++ b/netmap/helper_test.go @@ -0,0 +1,93 @@ +package netmap + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func newFilter(name string, k, v string, op Operation, fs ...*Filter) *Filter { + f := NewFilter() + f.SetName(name) + f.SetKey(k) + f.SetOperation(op) + f.SetValue(v) + f.SetInnerFilters(fs...) + return f +} + +func newSelector(name string, attr string, c Clause, count uint32, filter string) *Selector { + s := NewSelector() + s.SetName(name) + s.SetAttribute(attr) + s.SetCount(count) + s.SetClause(c) + s.SetFilter(filter) + return s +} + +func newPlacementPolicy(bf uint32, rs []*Replica, ss []*Selector, fs []*Filter) *PlacementPolicy { + p := NewPlacementPolicy() + p.SetContainerBackupFactor(bf) + p.SetReplicas(rs...) + p.SetSelectors(ss...) + p.SetFilters(fs...) + return p +} + +func newReplica(c uint32, s string) *Replica { + r := NewReplica() + r.SetCount(c) + r.SetSelector(s) + return r +} + +func nodeInfoFromAttributes(props ...string) NodeInfo { + attrs := make([]*NodeAttribute, len(props)/2) + for i := range attrs { + attrs[i] = NewNodeAttribute() + attrs[i].SetKey(props[i*2]) + attrs[i].SetValue(props[i*2+1]) + } + n := NewNodeInfo() + n.SetAttributes(attrs...) + return *n +} + +func getTestNode(props ...string) *Node { + m := make(map[string]string, len(props)/2) + for i := 0; i < len(props); i += 2 { + m[props[i]] = props[i+1] + } + return &Node{AttrMap: m} +} + +type enumIface interface { + FromString(string) bool + String() string +} + +type enumStringItem struct { + val enumIface + str string +} + +func testEnumStrings(t *testing.T, e enumIface, items []enumStringItem) { + for _, item := range items { + require.Equal(t, item.str, item.val.String()) + + s := item.val.String() + + require.True(t, e.FromString(s), s) + + require.EqualValues(t, item.val, e, item.val) + } + + // incorrect strings + for _, str := range []string{ + "some string", + "undefined", + } { + require.False(t, e.FromString(str)) + } +} diff --git a/netmap/netmap.go b/netmap/netmap.go new file mode 100644 index 0000000..369d9ac --- /dev/null +++ b/netmap/netmap.go @@ -0,0 +1,100 @@ +package netmap + +import ( + "fmt" + + "github.com/nspcc-dev/hrw" +) + +const defaultCBF = 3 + +// Netmap represents netmap which contains preprocessed nodes. +type Netmap struct { + Nodes Nodes +} + +// NewNetmap constructs netmap from the list of raw nodes. +func NewNetmap(nodes Nodes) (*Netmap, error) { + return &Netmap{ + Nodes: nodes, + }, nil +} + +func flattenNodes(ns []Nodes) Nodes { + result := make(Nodes, 0, len(ns)) + for i := range ns { + result = append(result, ns[i]...) + } + + return result +} + +// GetPlacementVectors returns placement vectors for an object given containerNodes cnt. +func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes, error) { + h := hrw.Hash(pivot) + wf := GetDefaultWeightFunc(m.Nodes) + result := make([]Nodes, len(cnt.Replicas())) + + for i, rep := range cnt.Replicas() { + result[i] = make(Nodes, len(rep)) + copy(result[i], rep) + hrw.SortSliceByWeightValue(result[i], result[i].Weights(wf), h) + } + + return result, nil +} + +// GetContainerNodes returns nodes corresponding to each replica. +// Order of returned nodes corresponds to order of replicas in p. +// pivot is a seed for HRW sorting. +func (m *Netmap) GetContainerNodes(p *PlacementPolicy, pivot []byte) (ContainerNodes, error) { + c := NewContext(m) + c.setPivot(pivot) + c.setCBF(p.ContainerBackupFactor()) + + if err := c.processFilters(p); err != nil { + return nil, err + } + + if err := c.processSelectors(p); err != nil { + return nil, err + } + + result := make([]Nodes, len(p.Replicas())) + + for i, r := range p.Replicas() { + if r == nil { + return nil, fmt.Errorf("%w: REPLICA", ErrMissingField) + } + + if r.Selector() == "" { + if len(p.Selectors()) == 0 { + s := new(Selector) + s.SetCount(r.Count()) + s.SetFilter(MainFilterName) + + nodes, err := c.getSelection(p, s) + if err != nil { + return nil, err + } + + result[i] = flattenNodes(nodes) + } + + for _, s := range p.Selectors() { + result[i] = append(result[i], flattenNodes(c.Selections[s.Name()])...) + } + + continue + } + + nodes, ok := c.Selections[r.Selector()] + if !ok { + return nil, fmt.Errorf("%w: REPLICA '%s'", ErrSelectorNotFound, r.Selector()) + } + + result[i] = append(result[i], flattenNodes(nodes)...) + } + + return containerNodes(result), nil +} diff --git a/netmap/network_info.go b/netmap/network_info.go new file mode 100644 index 0000000..573f91b --- /dev/null +++ b/netmap/network_info.go @@ -0,0 +1,204 @@ +package netmap + +import ( + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +// NetworkInfo represents v2-compatible structure +// with information about NeoFS network. +type NetworkInfo netmap.NetworkInfo + +// NewNetworkInfoFromV2 wraps v2 NetworkInfo message to NetworkInfo. +// +// Nil netmap.NetworkInfo converts to nil. +func NewNetworkInfoFromV2(iV2 *netmap.NetworkInfo) *NetworkInfo { + return (*NetworkInfo)(iV2) +} + +// NewNetworkInfo creates and initializes blank NetworkInfo. +// +// Defaults: +// - curEpoch: 0; +// - magicNum: 0; +// - msPerBlock: 0; +// - network config: nil. +func NewNetworkInfo() *NetworkInfo { + return NewNetworkInfoFromV2(new(netmap.NetworkInfo)) +} + +// ToV2 converts NetworkInfo to v2 NetworkInfo. +// +// Nil NetworkInfo converts to nil. +func (i *NetworkInfo) ToV2() *netmap.NetworkInfo { + return (*netmap.NetworkInfo)(i) +} + +// CurrentEpoch returns current epoch of the NeoFS network. +func (i *NetworkInfo) CurrentEpoch() uint64 { + return (*netmap.NetworkInfo)(i).GetCurrentEpoch() +} + +// SetCurrentEpoch sets current epoch of the NeoFS network. +func (i *NetworkInfo) SetCurrentEpoch(epoch uint64) { + (*netmap.NetworkInfo)(i).SetCurrentEpoch(epoch) +} + +// MagicNumber returns magic number of the sidechain. +func (i *NetworkInfo) MagicNumber() uint64 { + return (*netmap.NetworkInfo)(i).GetMagicNumber() +} + +// SetMagicNumber sets magic number of the sidechain. +func (i *NetworkInfo) SetMagicNumber(epoch uint64) { + (*netmap.NetworkInfo)(i).SetMagicNumber(epoch) +} + +// MsPerBlock returns MillisecondsPerBlock network parameter. +func (i *NetworkInfo) MsPerBlock() int64 { + return (*netmap.NetworkInfo)(i). + GetMsPerBlock() +} + +// SetMsPerBlock sets MillisecondsPerBlock network parameter. +func (i *NetworkInfo) SetMsPerBlock(v int64) { + (*netmap.NetworkInfo)(i). + SetMsPerBlock(v) +} + +// NetworkConfig returns NeoFS network configuration. +func (i *NetworkInfo) NetworkConfig() *NetworkConfig { + return NewNetworkConfigFromV2( + (*netmap.NetworkInfo)(i). + GetNetworkConfig(), + ) +} + +// SetNetworkConfig sets NeoFS network configuration. +func (i *NetworkInfo) SetNetworkConfig(v *NetworkConfig) { + (*netmap.NetworkInfo)(i). + SetNetworkConfig(v.ToV2()) +} + +// Marshal marshals NetworkInfo into a protobuf binary form. +func (i *NetworkInfo) Marshal() ([]byte, error) { + return (*netmap.NetworkInfo)(i).StableMarshal(nil) +} + +// Unmarshal unmarshals protobuf binary representation of NetworkInfo. +func (i *NetworkInfo) Unmarshal(data []byte) error { + return (*netmap.NetworkInfo)(i).Unmarshal(data) +} + +// MarshalJSON encodes NetworkInfo to protobuf JSON format. +func (i *NetworkInfo) MarshalJSON() ([]byte, error) { + return (*netmap.NetworkInfo)(i).MarshalJSON() +} + +// UnmarshalJSON decodes NetworkInfo from protobuf JSON format. +func (i *NetworkInfo) UnmarshalJSON(data []byte) error { + return (*netmap.NetworkInfo)(i).UnmarshalJSON(data) +} + +// NetworkParameter represents v2-compatible NeoFS network parameter. +type NetworkParameter netmap.NetworkParameter + +// NewNetworkParameterFromV2 wraps v2 NetworkParameter message to NetworkParameter. +// +// Nil netmap.NetworkParameter converts to nil. +func NewNetworkParameterFromV2(pv2 *netmap.NetworkParameter) *NetworkParameter { + return (*NetworkParameter)(pv2) +} + +// NewNetworkParameter creates and initializes blank NetworkParameter. +// +// Defaults: +// - key: nil; +// - value: nil. +func NewNetworkParameter() *NetworkParameter { + return NewNetworkParameterFromV2(new(netmap.NetworkParameter)) +} + +// ToV2 converts NetworkParameter to v2 NetworkParameter. +// +// Nil NetworkParameter converts to nil. +func (x *NetworkParameter) ToV2() *netmap.NetworkParameter { + return (*netmap.NetworkParameter)(x) +} + +// Key returns key to network parameter. +func (x *NetworkParameter) Key() []byte { + return (*netmap.NetworkParameter)(x).GetKey() +} + +// SetKey sets key to the network parameter. +func (x *NetworkParameter) SetKey(key []byte) { + (*netmap.NetworkParameter)(x).SetKey(key) +} + +// Value returns value of the network parameter. +func (x *NetworkParameter) Value() []byte { + return (*netmap.NetworkParameter)(x).GetValue() +} + +// SetValue sets value of the network parameter. +func (x *NetworkParameter) SetValue(val []byte) { + (*netmap.NetworkParameter)(x).SetValue(val) +} + +// NetworkConfig represents v2-compatible NeoFS network configuration. +type NetworkConfig netmap.NetworkConfig + +// NewNetworkConfigFromV2 wraps v2 NetworkConfig message to NetworkConfig. +// +// Nil netmap.NetworkConfig converts to nil. +func NewNetworkConfigFromV2(cv2 *netmap.NetworkConfig) *NetworkConfig { + return (*NetworkConfig)(cv2) +} + +// NewNetworkConfig creates and initializes blank NetworkConfig. +// +// Defaults: +// - parameters num: 0. +func NewNetworkConfig() *NetworkConfig { + return NewNetworkConfigFromV2(new(netmap.NetworkConfig)) +} + +// ToV2 converts NetworkConfig to v2 NetworkConfig. +// +// Nil NetworkConfig converts to nil. +func (x *NetworkConfig) ToV2() *netmap.NetworkConfig { + return (*netmap.NetworkConfig)(x) +} + +// NumberOfParameters returns number of network parameters. +func (x *NetworkConfig) NumberOfParameters() int { + return (*netmap.NetworkConfig)(x).NumberOfParameters() +} + +// IterateAddresses iterates over network parameters. +// Breaks iteration on f's true return. +// +// Handler should not be nil. +func (x *NetworkConfig) IterateParameters(f func(*NetworkParameter) bool) { + (*netmap.NetworkConfig)(x). + IterateParameters(func(p *netmap.NetworkParameter) bool { + return f(NewNetworkParameterFromV2(p)) + }) +} + +// Value returns value of the network parameter. +func (x *NetworkConfig) SetParameters(ps ...*NetworkParameter) { + var psV2 []*netmap.NetworkParameter + + if ps != nil { + ln := len(ps) + + psV2 = make([]*netmap.NetworkParameter, 0, ln) + + for i := 0; i < ln; i++ { + psV2 = append(psV2, ps[i].ToV2()) + } + } + + (*netmap.NetworkConfig)(x).SetParameters(psV2...) +} diff --git a/netmap/network_info_test.go b/netmap/network_info_test.go new file mode 100644 index 0000000..33317b8 --- /dev/null +++ b/netmap/network_info_test.go @@ -0,0 +1,214 @@ +package netmap_test + +import ( + "testing" + + . "github.com/nspcc-dev/neofs-sdk-go/netmap" + netmaptest "github.com/nspcc-dev/neofs-sdk-go/netmap/test" + "github.com/stretchr/testify/require" +) + +func TestNetworkParameter_Key(t *testing.T) { + i := NewNetworkParameter() + + k := []byte("key") + + i.SetKey(k) + + require.Equal(t, k, i.Key()) + require.Equal(t, k, i.ToV2().GetKey()) +} + +func TestNetworkParameter_Value(t *testing.T) { + i := NewNetworkParameter() + + v := []byte("value") + + i.SetValue(v) + + require.Equal(t, v, i.Value()) + require.Equal(t, v, i.ToV2().GetValue()) +} + +func TestNewNetworkParameterFromV2(t *testing.T) { + t.Run("nil", func(t *testing.T) { + require.Nil(t, NewNetworkParameterFromV2(nil)) + }) +} + +func TestNetworkParameter_ToV2(t *testing.T) { + t.Run("nil", func(t *testing.T) { + var x *NetworkParameter + + require.Nil(t, x.ToV2()) + }) +} + +func TestNewNetworkParameter(t *testing.T) { + x := NewNetworkParameter() + + // check initial values + require.Nil(t, x.Key()) + require.Nil(t, x.Value()) + + // convert to v2 message + xV2 := x.ToV2() + + require.Nil(t, xV2.GetKey()) + require.Nil(t, xV2.GetValue()) +} + +func TestNetworkConfig_SetParameters(t *testing.T) { + x := NewNetworkConfig() + + require.Zero(t, x.NumberOfParameters()) + + called := 0 + + x.IterateParameters(func(p *NetworkParameter) bool { + called++ + return false + }) + + require.Zero(t, called) + + pps := []*NetworkParameter{ + netmaptest.NetworkParameter(), + netmaptest.NetworkParameter(), + } + + x.SetParameters(pps...) + + require.EqualValues(t, len(pps), x.NumberOfParameters()) + + var dst []*NetworkParameter + + x.IterateParameters(func(p *NetworkParameter) bool { + dst = append(dst, p) + called++ + return false + }) + + require.Equal(t, pps, dst) + require.Equal(t, len(pps), called) +} + +func TestNewNetworkConfigFromV2(t *testing.T) { + t.Run("nil", func(t *testing.T) { + require.Nil(t, NewNetworkConfigFromV2(nil)) + }) +} + +func TestNetworkConfig_ToV2(t *testing.T) { + t.Run("nil", func(t *testing.T) { + var x *NetworkConfig + require.Nil(t, x.ToV2()) + }) +} + +func TestNewNetworkConfig(t *testing.T) { + x := NewNetworkConfig() + + // check initial values + require.Zero(t, x.NumberOfParameters()) + + // convert to v2 message + xV2 := x.ToV2() + + require.Zero(t, xV2.NumberOfParameters()) +} + +func TestNetworkInfo_CurrentEpoch(t *testing.T) { + i := NewNetworkInfo() + e := uint64(13) + + i.SetCurrentEpoch(e) + + require.Equal(t, e, i.CurrentEpoch()) + require.Equal(t, e, i.ToV2().GetCurrentEpoch()) +} + +func TestNetworkInfo_MagicNumber(t *testing.T) { + i := NewNetworkInfo() + m := uint64(666) + + i.SetMagicNumber(m) + + require.Equal(t, m, i.MagicNumber()) + require.Equal(t, m, i.ToV2().GetMagicNumber()) +} + +func TestNetworkInfo_MsPerBlock(t *testing.T) { + i := NewNetworkInfo() + + const ms = 987 + + i.SetMsPerBlock(ms) + + require.EqualValues(t, ms, i.MsPerBlock()) + require.EqualValues(t, ms, i.ToV2().GetMsPerBlock()) +} + +func TestNetworkInfo_Config(t *testing.T) { + i := NewNetworkInfo() + + c := netmaptest.NetworkConfig() + + i.SetNetworkConfig(c) + + require.Equal(t, c, i.NetworkConfig()) +} + +func TestNetworkInfoEncoding(t *testing.T) { + i := netmaptest.NetworkInfo() + + t.Run("binary", func(t *testing.T) { + data, err := i.Marshal() + require.NoError(t, err) + + i2 := NewNetworkInfo() + require.NoError(t, i2.Unmarshal(data)) + + require.Equal(t, i, i2) + }) + + t.Run("json", func(t *testing.T) { + data, err := i.MarshalJSON() + require.NoError(t, err) + + i2 := NewNetworkInfo() + require.NoError(t, i2.UnmarshalJSON(data)) + + require.Equal(t, i, i2) + }) +} + +func TestNewNetworkInfoFromV2(t *testing.T) { + t.Run("nil", func(t *testing.T) { + require.Nil(t, NewNetworkInfoFromV2(nil)) + }) +} + +func TestNetworkInfo_ToV2(t *testing.T) { + t.Run("nil", func(t *testing.T) { + var x *NetworkInfo + + require.Nil(t, x.ToV2()) + }) +} + +func TestNewNetworkInfo(t *testing.T) { + ni := NewNetworkInfo() + + // check initial values + require.Zero(t, ni.CurrentEpoch()) + require.Zero(t, ni.MagicNumber()) + require.Zero(t, ni.MsPerBlock()) + + // convert to v2 message + niV2 := ni.ToV2() + + require.Zero(t, niV2.GetCurrentEpoch()) + require.Zero(t, niV2.GetMagicNumber()) + require.Zero(t, niV2.GetMsPerBlock()) +} diff --git a/netmap/node_info.go b/netmap/node_info.go new file mode 100644 index 0000000..cd235b0 --- /dev/null +++ b/netmap/node_info.go @@ -0,0 +1,429 @@ +package netmap + +import ( + "strconv" + + "github.com/nspcc-dev/hrw" + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +type ( + // Node is a wrapper over NodeInfo. + Node struct { + ID uint64 + Index int + Capacity uint64 + Price uint64 + AttrMap map[string]string + + *NodeInfo + } + + // Nodes represents slice of graph leafs. + Nodes []*Node +) + +// NodeState is an enumeration of various states of the NeoFS node. +type NodeState uint32 + +// NodeAttribute represents v2 compatible attribute of the NeoFS Storage Node. +type NodeAttribute netmap.Attribute + +// NodeInfo represents v2 compatible descriptor of the NeoFS node. +type NodeInfo netmap.NodeInfo + +const ( + _ NodeState = iota + + // NodeStateOffline is network unavailable state. + NodeStateOffline + + // NodeStateOnline is an active state in the network. + NodeStateOnline +) + +// Enumeration of well-known attributes. +const ( + // AttrPrice is a key to the node attribute that indicates the + // price in GAS tokens for storing one GB of data during one Epoch. + AttrPrice = "Price" + + // AttrCapacity is a key to the node attribute that indicates the + // total available disk space in Gigabytes. + AttrCapacity = "Capacity" + + // AttrSubnet is a key to the node attribute that indicates the + // string ID of node's storage subnet. + AttrSubnet = "Subnet" + + // AttrUNLOCODE is a key to the node attribute that indicates the + // node's geographic location in UN/LOCODE format. + AttrUNLOCODE = "UN-LOCODE" + + // AttrCountryCode is a key to the node attribute that indicates the + // Country code in ISO 3166-1_alpha-2 format. + AttrCountryCode = "CountryCode" + + // AttrCountry is a key to the node attribute that indicates the + // country short name in English, as defined in ISO-3166. + AttrCountry = "Country" + + // AttrLocation is a key to the node attribute that indicates the + // place name of the node location. + AttrLocation = "Location" + + // AttrSubDivCode is a key to the node attribute that indicates the + // country's administrative subdivision where node is located + // in ISO 3166-2 format. + AttrSubDivCode = "SubDivCode" + + // AttrSubDiv is a key to the node attribute that indicates the + // country's administrative subdivision name, as defined in + // ISO 3166-2. + AttrSubDiv = "SubDiv" + + // AttrContinent is a key to the node attribute that indicates the + // node's continent name according to the Seven-Continent model. + AttrContinent = "Continent" +) + +var _ hrw.Hasher = (*Node)(nil) + +// Hash is a function from hrw.Hasher interface. It is implemented +// to support weighted hrw therefore sort function sorts nodes +// based on their `N` value. +func (n Node) Hash() uint64 { + return n.ID +} + +// NodesFromInfo converts slice of NodeInfo to a generic node slice. +func NodesFromInfo(infos []NodeInfo) Nodes { + nodes := make(Nodes, len(infos)) + for i := range infos { + nodes[i] = newNodeV2(i, &infos[i]) + } + + return nodes +} + +func newNodeV2(index int, ni *NodeInfo) *Node { + n := &Node{ + ID: hrw.Hash(ni.PublicKey()), + Index: index, + AttrMap: make(map[string]string, len(ni.Attributes())), + NodeInfo: ni, + } + + for _, attr := range ni.Attributes() { + switch attr.Key() { + case AttrCapacity: + n.Capacity, _ = strconv.ParseUint(attr.Value(), 10, 64) + case AttrPrice: + n.Price, _ = strconv.ParseUint(attr.Value(), 10, 64) + } + + n.AttrMap[attr.Key()] = attr.Value() + } + + return n +} + +// Weights returns slice of nodes weights W. +func (n Nodes) Weights(wf weightFunc) []float64 { + w := make([]float64, 0, len(n)) + for i := range n { + w = append(w, wf(n[i])) + } + + return w +} + +// Attribute returns value of attribute k. +func (n *Node) Attribute(k string) string { + return n.AttrMap[k] +} + +// GetBucketWeight computes weight for a Bucket. +func GetBucketWeight(ns Nodes, a aggregator, wf weightFunc) float64 { + for i := range ns { + a.Add(wf(ns[i])) + } + + return a.Compute() +} + +// NodeStateFromV2 converts v2 NodeState to NodeState. +func NodeStateFromV2(s netmap.NodeState) NodeState { + switch s { + default: + return 0 + case netmap.Online: + return NodeStateOnline + case netmap.Offline: + return NodeStateOffline + } +} + +// ToV2 converts NodeState to v2 NodeState. +func (s NodeState) ToV2() netmap.NodeState { + switch s { + default: + return netmap.UnspecifiedState + case NodeStateOffline: + return netmap.Offline + case NodeStateOnline: + return netmap.Online + } +} + +// String returns string representation of NodeState. +// +// String mapping: +// * NodeStateOnline: ONLINE; +// * NodeStateOffline: OFFLINE; +// * default: UNSPECIFIED. +func (s NodeState) String() string { + return s.ToV2().String() +} + +// FromString parses NodeState from a string representation. +// It is a reverse action to String(). +// +// Returns true if s was parsed successfully. +func (s *NodeState) FromString(str string) bool { + var g netmap.NodeState + + ok := g.FromString(str) + + if ok { + *s = NodeStateFromV2(g) + } + + return ok +} + +// NewNodeAttribute creates and returns new NodeAttribute instance. +// +// Defaults: +// - key: ""; +// - value: ""; +// - parents: nil. +func NewNodeAttribute() *NodeAttribute { + return NewNodeAttributeFromV2(new(netmap.Attribute)) +} + +// NodeAttributeFromV2 converts v2 node Attribute to NodeAttribute. +// +// Nil netmap.Attribute converts to nil. +func NewNodeAttributeFromV2(a *netmap.Attribute) *NodeAttribute { + return (*NodeAttribute)(a) +} + +// ToV2 converts NodeAttribute to v2 node Attribute. +// +// Nil NodeAttribute converts to nil. +func (a *NodeAttribute) ToV2() *netmap.Attribute { + return (*netmap.Attribute)(a) +} + +// Key returns key to the node attribute. +func (a *NodeAttribute) Key() string { + return (*netmap.Attribute)(a). + GetKey() +} + +// SetKey sets key to the node attribute. +func (a *NodeAttribute) SetKey(key string) { + (*netmap.Attribute)(a). + SetKey(key) +} + +// Value returns value of the node attribute. +func (a *NodeAttribute) Value() string { + return (*netmap.Attribute)(a). + GetValue() +} + +// SetValue sets value of the node attribute. +func (a *NodeAttribute) SetValue(val string) { + (*netmap.Attribute)(a). + SetValue(val) +} + +// ParentKeys returns list of parent keys. +func (a *NodeAttribute) ParentKeys() []string { + return (*netmap.Attribute)(a). + GetParents() +} + +// SetParentKeys sets list of parent keys. +func (a *NodeAttribute) SetParentKeys(keys ...string) { + (*netmap.Attribute)(a). + SetParents(keys) +} + +// Marshal marshals NodeAttribute into a protobuf binary form. +func (a *NodeAttribute) Marshal() ([]byte, error) { + return (*netmap.Attribute)(a).StableMarshal(nil) +} + +// Unmarshal unmarshals protobuf binary representation of NodeAttribute. +func (a *NodeAttribute) Unmarshal(data []byte) error { + return (*netmap.Attribute)(a).Unmarshal(data) +} + +// MarshalJSON encodes NodeAttribute to protobuf JSON format. +func (a *NodeAttribute) MarshalJSON() ([]byte, error) { + return (*netmap.Attribute)(a).MarshalJSON() +} + +// UnmarshalJSON decodes NodeAttribute from protobuf JSON format. +func (a *NodeAttribute) UnmarshalJSON(data []byte) error { + return (*netmap.Attribute)(a).UnmarshalJSON(data) +} + +// NewNodeInfo creates and returns new NodeInfo instance. +// +// Defaults: +// - publicKey: nil; +// - address: ""; +// - attributes nil; +// - state: 0. +func NewNodeInfo() *NodeInfo { + return NewNodeInfoFromV2(new(netmap.NodeInfo)) +} + +// NewNodeInfoFromV2 converts v2 NodeInfo to NodeInfo. +// +// Nil netmap.NodeInfo converts to nil. +func NewNodeInfoFromV2(i *netmap.NodeInfo) *NodeInfo { + return (*NodeInfo)(i) +} + +// ToV2 converts NodeInfo to v2 NodeInfo. +// +// Nil NodeInfo converts to nil. +func (i *NodeInfo) ToV2() *netmap.NodeInfo { + return (*netmap.NodeInfo)(i) +} + +// PublicKey returns public key of the node in a binary format. +func (i *NodeInfo) PublicKey() []byte { + return (*netmap.NodeInfo)(i).GetPublicKey() +} + +// SetPublicKey sets public key of the node in a binary format. +func (i *NodeInfo) SetPublicKey(key []byte) { + (*netmap.NodeInfo)(i).SetPublicKey(key) +} + +// Address returns network endpoint address of the node. +// +// Deprecated: use IterateAddresses method. +func (i *NodeInfo) Address() (addr string) { + i.IterateAddresses(func(s string) bool { + addr = s + return true + }) + + return +} + +// SetAddress sets network endpoint address of the node. +// +// Deprecated: use SetAddresses method. +func (i *NodeInfo) SetAddress(addr string) { + i.SetAddresses(addr) +} + +// NumberOfAddresses returns number of network addresses of the node. +func (i *NodeInfo) NumberOfAddresses() int { + return (*netmap.NodeInfo)(i).NumberOfAddresses() +} + +// IterateAddresses iterates over network addresses of the node. +// Breaks iteration on f's true return. +// +// Handler should not be nil. +func (i *NodeInfo) IterateAddresses(f func(string) bool) { + (*netmap.NodeInfo)(i).IterateAddresses(f) +} + +// IterateAllAddresses is a helper function to unconditionally +// iterate over all node addresses. +func IterateAllAddresses(i *NodeInfo, f func(string)) { + i.IterateAddresses(func(addr string) bool { + f(addr) + return false + }) +} + +// SetAddresses sets list of network addresses of the node. +func (i *NodeInfo) SetAddresses(v ...string) { + (*netmap.NodeInfo)(i).SetAddresses(v...) +} + +// Attributes returns list of the node attributes. +func (i *NodeInfo) Attributes() []*NodeAttribute { + if i == nil { + return nil + } + + as := (*netmap.NodeInfo)(i).GetAttributes() + + if as == nil { + return nil + } + + res := make([]*NodeAttribute, 0, len(as)) + + for i := range as { + res = append(res, NewNodeAttributeFromV2(as[i])) + } + + return res +} + +// SetAttributes sets list of the node attributes. +func (i *NodeInfo) SetAttributes(as ...*NodeAttribute) { + asV2 := make([]*netmap.Attribute, 0, len(as)) + + for i := range as { + asV2 = append(asV2, as[i].ToV2()) + } + + (*netmap.NodeInfo)(i). + SetAttributes(asV2) +} + +// State returns node state. +func (i *NodeInfo) State() NodeState { + return NodeStateFromV2( + (*netmap.NodeInfo)(i).GetState(), + ) +} + +// SetState sets node state. +func (i *NodeInfo) SetState(s NodeState) { + (*netmap.NodeInfo)(i).SetState(s.ToV2()) +} + +// Marshal marshals NodeInfo into a protobuf binary form. +func (i *NodeInfo) Marshal() ([]byte, error) { + return (*netmap.NodeInfo)(i).StableMarshal(nil) +} + +// Unmarshal unmarshals protobuf binary representation of NodeInfo. +func (i *NodeInfo) Unmarshal(data []byte) error { + return (*netmap.NodeInfo)(i).Unmarshal(data) +} + +// MarshalJSON encodes NodeInfo to protobuf JSON format. +func (i *NodeInfo) MarshalJSON() ([]byte, error) { + return (*netmap.NodeInfo)(i).MarshalJSON() +} + +// UnmarshalJSON decodes NodeInfo from protobuf JSON format. +func (i *NodeInfo) UnmarshalJSON(data []byte) error { + return (*netmap.NodeInfo)(i).UnmarshalJSON(data) +} diff --git a/netmap/node_info_test.go b/netmap/node_info_test.go new file mode 100644 index 0000000..ebcd393 --- /dev/null +++ b/netmap/node_info_test.go @@ -0,0 +1,263 @@ +package netmap + +import ( + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap" + testv2 "github.com/nspcc-dev/neofs-api-go/v2/netmap/test" + "github.com/stretchr/testify/require" +) + +func TestNodeStateFromV2(t *testing.T) { + for _, item := range []struct { + s NodeState + sV2 netmap.NodeState + }{ + { + s: 0, + sV2: netmap.UnspecifiedState, + }, + { + s: NodeStateOnline, + sV2: netmap.Online, + }, + { + s: NodeStateOffline, + sV2: netmap.Offline, + }, + } { + require.Equal(t, item.s, NodeStateFromV2(item.sV2)) + require.Equal(t, item.sV2, item.s.ToV2()) + } +} + +func TestNodeAttributeFromV2(t *testing.T) { + t.Run("from nil", func(t *testing.T) { + var x *netmap.Attribute + + require.Nil(t, NewNodeAttributeFromV2(x)) + }) + + t.Run("from non-nil", func(t *testing.T) { + aV2 := testv2.GenerateAttribute(false) + + a := NewNodeAttributeFromV2(aV2) + + require.Equal(t, aV2, a.ToV2()) + }) +} + +func TestNodeAttribute_ToV2(t *testing.T) { + t.Run("nil", func(t *testing.T) { + var x *NodeAttribute + + require.Nil(t, x.ToV2()) + }) +} + +func TestNodeAttribute_Key(t *testing.T) { + a := NewNodeAttribute() + key := "some key" + + a.SetKey(key) + + require.Equal(t, key, a.Key()) +} + +func TestNodeAttribute_Value(t *testing.T) { + a := NewNodeAttribute() + val := "some value" + + a.SetValue(val) + + require.Equal(t, val, a.Value()) +} + +func TestNodeAttribute_ParentKeys(t *testing.T) { + a := NewNodeAttribute() + keys := []string{"par1", "par2"} + + a.SetParentKeys(keys...) + + require.Equal(t, keys, a.ParentKeys()) +} + +func testNodeAttribute() *NodeAttribute { + a := new(NodeAttribute) + a.SetKey("key") + a.SetValue("value") + a.SetParentKeys("par1", "par2") + + return a +} + +func TestNodeInfoFromV2(t *testing.T) { + t.Run("from nil", func(t *testing.T) { + var x *netmap.NodeInfo + + require.Nil(t, NewNodeInfoFromV2(x)) + }) + + t.Run("from non-nil", func(t *testing.T) { + iV2 := testv2.GenerateNodeInfo(false) + + i := NewNodeInfoFromV2(iV2) + + require.Equal(t, iV2, i.ToV2()) + }) +} + +func TestNodeInfo_ToV2(t *testing.T) { + t.Run("nil", func(t *testing.T) { + var x *NodeInfo + + require.Nil(t, x.ToV2()) + }) +} + +func TestNodeInfo_PublicKey(t *testing.T) { + i := new(NodeInfo) + key := []byte{1, 2, 3} + + i.SetPublicKey(key) + + require.Equal(t, key, i.PublicKey()) +} + +func TestNodeInfo_IterateAddresses(t *testing.T) { + i := new(NodeInfo) + + as := []string{"127.0.0.1:8080", "127.0.0.1:8081"} + + i.SetAddresses(as...) + + as2 := make([]string, 0, i.NumberOfAddresses()) + + IterateAllAddresses(i, func(addr string) { + as2 = append(as2, addr) + }) + + require.Equal(t, as, as2) + require.EqualValues(t, len(as), i.NumberOfAddresses()) +} + +func TestNodeInfo_State(t *testing.T) { + i := new(NodeInfo) + s := NodeStateOnline + + i.SetState(s) + + require.Equal(t, s, i.State()) +} + +func TestNodeInfo_Attributes(t *testing.T) { + i := new(NodeInfo) + as := []*NodeAttribute{testNodeAttribute(), testNodeAttribute()} + + i.SetAttributes(as...) + + require.Equal(t, as, i.Attributes()) +} + +func TestNodeAttributeEncoding(t *testing.T) { + a := testNodeAttribute() + + t.Run("binary", func(t *testing.T) { + data, err := a.Marshal() + require.NoError(t, err) + + a2 := NewNodeAttribute() + require.NoError(t, a2.Unmarshal(data)) + + require.Equal(t, a, a2) + }) + + t.Run("json", func(t *testing.T) { + data, err := a.MarshalJSON() + require.NoError(t, err) + + a2 := NewNodeAttribute() + require.NoError(t, a2.UnmarshalJSON(data)) + + require.Equal(t, a, a2) + }) +} + +func TestNodeInfoEncoding(t *testing.T) { + i := NewNodeInfo() + i.SetPublicKey([]byte{1, 2, 3}) + i.SetAddresses("192.168.0.1", "192.168.0.2") + i.SetState(NodeStateOnline) + i.SetAttributes(testNodeAttribute()) + + t.Run("binary", func(t *testing.T) { + data, err := i.Marshal() + require.NoError(t, err) + + i2 := NewNodeInfo() + require.NoError(t, i2.Unmarshal(data)) + + require.Equal(t, i, i2) + }) + + t.Run("json", func(t *testing.T) { + data, err := i.MarshalJSON() + require.NoError(t, err) + + i2 := NewNodeInfo() + require.NoError(t, i2.UnmarshalJSON(data)) + + require.Equal(t, i, i2) + }) +} + +func TestNewNodeAttribute(t *testing.T) { + t.Run("default values", func(t *testing.T) { + attr := NewNodeAttribute() + + // check initial values + require.Empty(t, attr.Key()) + require.Empty(t, attr.Value()) + require.Nil(t, attr.ParentKeys()) + + // convert to v2 message + attrV2 := attr.ToV2() + + require.Empty(t, attrV2.GetKey()) + require.Empty(t, attrV2.GetValue()) + require.Nil(t, attrV2.GetParents()) + }) +} + +func TestNewNodeInfo(t *testing.T) { + t.Run("default values", func(t *testing.T) { + ni := NewNodeInfo() + + // check initial values + require.Nil(t, ni.PublicKey()) + + require.Zero(t, ni.NumberOfAddresses()) + require.Nil(t, ni.Attributes()) + require.Zero(t, ni.State()) + + // convert to v2 message + niV2 := ni.ToV2() + + require.Nil(t, niV2.GetPublicKey()) + require.Zero(t, niV2.NumberOfAddresses()) + require.Nil(t, niV2.GetAttributes()) + require.EqualValues(t, netmap.UnspecifiedState, niV2.GetState()) + }) +} + +func TestNodeState_String(t *testing.T) { + toPtr := func(v NodeState) *NodeState { + return &v + } + + testEnumStrings(t, new(NodeState), []enumStringItem{ + {val: toPtr(NodeStateOnline), str: "ONLINE"}, + {val: toPtr(NodeStateOffline), str: "OFFLINE"}, + {val: toPtr(0), str: "UNSPECIFIED"}, + }) +} diff --git a/netmap/operation.go b/netmap/operation.go new file mode 100644 index 0000000..af3e042 --- /dev/null +++ b/netmap/operation.go @@ -0,0 +1,116 @@ +package netmap + +import ( + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +// Operation is an enumeration of v2-compatible filtering operations. +type Operation uint32 + +const ( + _ Operation = iota + + // OpEQ is an "Equal" operation. + OpEQ + + // OpNE is a "Not equal" operation. + OpNE + + // OpGT is a "Greater than" operation. + OpGT + + // OpGE is a "Greater than or equal to" operation. + OpGE + + // OpLT is a "Less than" operation. + OpLT + + // OpLE is a "Less than or equal to" operation. + OpLE + + // OpOR is an "OR" operation. + OpOR + + // OpAND is an "AND" operation. + OpAND +) + +// OperationFromV2 converts v2 Operation to Operation. +func OperationFromV2(op netmap.Operation) Operation { + switch op { + default: + return 0 + case netmap.OR: + return OpOR + case netmap.AND: + return OpAND + case netmap.GE: + return OpGE + case netmap.GT: + return OpGT + case netmap.LE: + return OpLE + case netmap.LT: + return OpLT + case netmap.EQ: + return OpEQ + case netmap.NE: + return OpNE + } +} + +// ToV2 converts Operation to v2 Operation. +func (op Operation) ToV2() netmap.Operation { + switch op { + default: + return netmap.UnspecifiedOperation + case OpOR: + return netmap.OR + case OpAND: + return netmap.AND + case OpGE: + return netmap.GE + case OpGT: + return netmap.GT + case OpLE: + return netmap.LE + case OpLT: + return netmap.LT + case OpEQ: + return netmap.EQ + case OpNE: + return netmap.NE + } +} + +// String returns string representation of Operation. +// +// String mapping: +// * OpNE: NE; +// * OpEQ: EQ; +// * OpLT: LT; +// * OpLE: LE; +// * OpGT: GT; +// * OpGE: GE; +// * OpAND: AND; +// * OpOR: OR; +// * default: OPERATION_UNSPECIFIED. +func (op Operation) String() string { + return op.ToV2().String() +} + +// FromString parses Operation from a string representation. +// It is a reverse action to String(). +// +// Returns true if s was parsed successfully. +func (op *Operation) FromString(s string) bool { + var g netmap.Operation + + ok := g.FromString(s) + + if ok { + *op = OperationFromV2(g) + } + + return ok +} diff --git a/netmap/operation_test.go b/netmap/operation_test.go new file mode 100644 index 0000000..e8b74e3 --- /dev/null +++ b/netmap/operation_test.go @@ -0,0 +1,73 @@ +package netmap + +import ( + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap" + "github.com/stretchr/testify/require" +) + +func TestOperationFromV2(t *testing.T) { + for _, item := range []struct { + op Operation + opV2 netmap.Operation + }{ + { + op: 0, + opV2: netmap.UnspecifiedOperation, + }, + { + op: OpEQ, + opV2: netmap.EQ, + }, + { + op: OpNE, + opV2: netmap.NE, + }, + { + op: OpOR, + opV2: netmap.OR, + }, + { + op: OpAND, + opV2: netmap.AND, + }, + { + op: OpLE, + opV2: netmap.LE, + }, + { + op: OpLT, + opV2: netmap.LT, + }, + { + op: OpGT, + opV2: netmap.GT, + }, + { + op: OpGE, + opV2: netmap.GE, + }, + } { + require.Equal(t, item.op, OperationFromV2(item.opV2)) + require.Equal(t, item.opV2, item.op.ToV2()) + } +} + +func TestOperation_String(t *testing.T) { + toPtr := func(v Operation) *Operation { + return &v + } + + testEnumStrings(t, new(Operation), []enumStringItem{ + {val: toPtr(OpEQ), str: "EQ"}, + {val: toPtr(OpNE), str: "NE"}, + {val: toPtr(OpGT), str: "GT"}, + {val: toPtr(OpGE), str: "GE"}, + {val: toPtr(OpLT), str: "LT"}, + {val: toPtr(OpLE), str: "LE"}, + {val: toPtr(OpAND), str: "AND"}, + {val: toPtr(OpOR), str: "OR"}, + {val: toPtr(0), str: "OPERATION_UNSPECIFIED"}, + }) +} diff --git a/netmap/policy.go b/netmap/policy.go new file mode 100644 index 0000000..f3ace16 --- /dev/null +++ b/netmap/policy.go @@ -0,0 +1,145 @@ +package netmap + +import ( + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +// PlacementPolicy represents v2-compatible placement policy. +type PlacementPolicy netmap.PlacementPolicy + +// NewPlacementPolicy creates and returns new PlacementPolicy instance. +// +// Defaults: +// - backupFactor: 0; +// - replicas nil; +// - selectors nil; +// - filters nil. +func NewPlacementPolicy() *PlacementPolicy { + return NewPlacementPolicyFromV2(new(netmap.PlacementPolicy)) +} + +// NewPlacementPolicyFromV2 converts v2 PlacementPolicy to PlacementPolicy. +// +// Nil netmap.PlacementPolicy converts to nil. +func NewPlacementPolicyFromV2(f *netmap.PlacementPolicy) *PlacementPolicy { + return (*PlacementPolicy)(f) +} + +// ToV2 converts PlacementPolicy to v2 PlacementPolicy. +// +// Nil PlacementPolicy converts to nil. +func (p *PlacementPolicy) ToV2() *netmap.PlacementPolicy { + return (*netmap.PlacementPolicy)(p) +} + +// Replicas returns list of object replica descriptors. +func (p *PlacementPolicy) Replicas() []*Replica { + rs := (*netmap.PlacementPolicy)(p). + GetReplicas() + + if rs == nil { + return nil + } + + res := make([]*Replica, 0, len(rs)) + + for i := range rs { + res = append(res, NewReplicaFromV2(rs[i])) + } + + return res +} + +// SetReplicas sets list of object replica descriptors. +func (p *PlacementPolicy) SetReplicas(rs ...*Replica) { + var rsV2 []*netmap.Replica + + if rs != nil { + rsV2 = make([]*netmap.Replica, 0, len(rs)) + + for i := range rs { + rsV2 = append(rsV2, rs[i].ToV2()) + } + } + + (*netmap.PlacementPolicy)(p).SetReplicas(rsV2) +} + +// ContainerBackupFactor returns container backup factor. +func (p *PlacementPolicy) ContainerBackupFactor() uint32 { + return (*netmap.PlacementPolicy)(p). + GetContainerBackupFactor() +} + +// SetContainerBackupFactor sets container backup factor. +func (p *PlacementPolicy) SetContainerBackupFactor(f uint32) { + (*netmap.PlacementPolicy)(p). + SetContainerBackupFactor(f) +} + +// Selector returns set of selectors to form the container's nodes subset. +func (p *PlacementPolicy) Selectors() []*Selector { + rs := (*netmap.PlacementPolicy)(p). + GetSelectors() + + if rs == nil { + return nil + } + + res := make([]*Selector, 0, len(rs)) + + for i := range rs { + res = append(res, NewSelectorFromV2(rs[i])) + } + + return res +} + +// SetSelectors sets set of selectors to form the container's nodes subset. +func (p *PlacementPolicy) SetSelectors(ss ...*Selector) { + var ssV2 []*netmap.Selector + + if ss != nil { + ssV2 = make([]*netmap.Selector, 0, len(ss)) + + for i := range ss { + ssV2 = append(ssV2, ss[i].ToV2()) + } + } + + (*netmap.PlacementPolicy)(p).SetSelectors(ssV2) +} + +// Filters returns list of named filters to reference in selectors. +func (p *PlacementPolicy) Filters() []*Filter { + return filtersFromV2( + (*netmap.PlacementPolicy)(p). + GetFilters(), + ) +} + +// SetFilters sets list of named filters to reference in selectors. +func (p *PlacementPolicy) SetFilters(fs ...*Filter) { + (*netmap.PlacementPolicy)(p). + SetFilters(filtersToV2(fs)) +} + +// Marshal marshals PlacementPolicy into a protobuf binary form. +func (p *PlacementPolicy) Marshal() ([]byte, error) { + return (*netmap.PlacementPolicy)(p).StableMarshal(nil) +} + +// Unmarshal unmarshals protobuf binary representation of PlacementPolicy. +func (p *PlacementPolicy) Unmarshal(data []byte) error { + return (*netmap.PlacementPolicy)(p).Unmarshal(data) +} + +// MarshalJSON encodes PlacementPolicy to protobuf JSON format. +func (p *PlacementPolicy) MarshalJSON() ([]byte, error) { + return (*netmap.PlacementPolicy)(p).MarshalJSON() +} + +// UnmarshalJSON decodes PlacementPolicy from protobuf JSON format. +func (p *PlacementPolicy) UnmarshalJSON(data []byte) error { + return (*netmap.PlacementPolicy)(p).UnmarshalJSON(data) +} diff --git a/netmap/policy_test.go b/netmap/policy_test.go new file mode 100644 index 0000000..43b45ed --- /dev/null +++ b/netmap/policy_test.go @@ -0,0 +1,180 @@ +package netmap + +import ( + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPlacementPolicy_CBFWithEmptySelector(t *testing.T) { + nodes := []NodeInfo{ + nodeInfoFromAttributes("ID", "1", "Attr", "Same"), + nodeInfoFromAttributes("ID", "2", "Attr", "Same"), + nodeInfoFromAttributes("ID", "3", "Attr", "Same"), + nodeInfoFromAttributes("ID", "4", "Attr", "Same"), + } + + p1 := newPlacementPolicy(0, + []*Replica{newReplica(2, "")}, + nil, // selectors + nil, // filters + ) + + p2 := newPlacementPolicy(3, + []*Replica{newReplica(2, "")}, + nil, // selectors + nil, // filters + ) + + p3 := newPlacementPolicy(3, + []*Replica{newReplica(2, "X")}, + []*Selector{newSelector("X", "", ClauseDistinct, 2, "*")}, + nil, // filters + ) + + p4 := newPlacementPolicy(3, + []*Replica{newReplica(2, "X")}, + []*Selector{newSelector("X", "Attr", ClauseSame, 2, "*")}, + nil, // filters + ) + + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + + v, err := nm.GetContainerNodes(p1, nil) + require.NoError(t, err) + assert.Len(t, v.Flatten(), 4) + + v, err = nm.GetContainerNodes(p2, nil) + require.NoError(t, err) + assert.Len(t, v.Flatten(), 4) + + v, err = nm.GetContainerNodes(p3, nil) + require.NoError(t, err) + assert.Len(t, v.Flatten(), 4) + + v, err = nm.GetContainerNodes(p4, nil) + require.NoError(t, err) + assert.Len(t, v.Flatten(), 4) +} + +func TestPlacementPolicyFromV2(t *testing.T) { + pV2 := new(netmap.PlacementPolicy) + + pV2.SetReplicas([]*netmap.Replica{ + testReplica().ToV2(), + testReplica().ToV2(), + }) + + pV2.SetContainerBackupFactor(3) + + pV2.SetSelectors([]*netmap.Selector{ + testSelector().ToV2(), + testSelector().ToV2(), + }) + + pV2.SetFilters([]*netmap.Filter{ + testFilter().ToV2(), + testFilter().ToV2(), + }) + + p := NewPlacementPolicyFromV2(pV2) + + require.Equal(t, pV2, p.ToV2()) +} + +func TestPlacementPolicy_Replicas(t *testing.T) { + p := NewPlacementPolicy() + rs := []*Replica{testReplica(), testReplica()} + + p.SetReplicas(rs...) + + require.Equal(t, rs, p.Replicas()) +} + +func TestPlacementPolicy_ContainerBackupFactor(t *testing.T) { + p := NewPlacementPolicy() + f := uint32(3) + + p.SetContainerBackupFactor(f) + + require.Equal(t, f, p.ContainerBackupFactor()) +} + +func TestPlacementPolicy_Selectors(t *testing.T) { + p := NewPlacementPolicy() + ss := []*Selector{testSelector(), testSelector()} + + p.SetSelectors(ss...) + + require.Equal(t, ss, p.Selectors()) +} + +func TestPlacementPolicy_Filters(t *testing.T) { + p := NewPlacementPolicy() + fs := []*Filter{testFilter(), testFilter()} + + p.SetFilters(fs...) + + require.Equal(t, fs, p.Filters()) +} + +func TestPlacementPolicyEncoding(t *testing.T) { + p := newPlacementPolicy(3, nil, nil, nil) + + t.Run("binary", func(t *testing.T) { + data, err := p.Marshal() + require.NoError(t, err) + + p2 := NewPlacementPolicy() + require.NoError(t, p2.Unmarshal(data)) + + require.Equal(t, p, p2) + }) + + t.Run("json", func(t *testing.T) { + data, err := p.MarshalJSON() + require.NoError(t, err) + + p2 := NewPlacementPolicy() + require.NoError(t, p2.UnmarshalJSON(data)) + + require.Equal(t, p, p2) + }) +} + +func TestNewPlacementPolicy(t *testing.T) { + t.Run("nil", func(t *testing.T) { + var x *PlacementPolicy + + require.Nil(t, x.ToV2()) + }) + + t.Run("default values", func(t *testing.T) { + pp := NewPlacementPolicy() + + // check initial values + require.Nil(t, pp.Replicas()) + require.Nil(t, pp.Filters()) + require.Nil(t, pp.Selectors()) + require.Zero(t, pp.ContainerBackupFactor()) + + // convert to v2 message + ppV2 := pp.ToV2() + + require.Nil(t, ppV2.GetReplicas()) + require.Nil(t, ppV2.GetFilters()) + require.Nil(t, ppV2.GetSelectors()) + require.Zero(t, ppV2.GetContainerBackupFactor()) + }) +} + +func TestNewPlacementPolicyFromV2(t *testing.T) { + t.Run("from nil", func(t *testing.T) { + var x *netmap.PlacementPolicy + + require.Nil(t, NewPlacementPolicyFromV2(x)) + }) +} diff --git a/netmap/replica.go b/netmap/replica.go new file mode 100644 index 0000000..3535cc1 --- /dev/null +++ b/netmap/replica.go @@ -0,0 +1,71 @@ +package netmap + +import ( + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +// Replica represents v2-compatible object replica descriptor. +type Replica netmap.Replica + +// NewReplica creates and returns new Replica instance. +// +// Defaults: +// - count: 0; +// - selector: "". +func NewReplica() *Replica { + return NewReplicaFromV2(new(netmap.Replica)) +} + +// NewReplicaFromV2 converts v2 Replica to Replica. +// +// Nil netmap.Replica converts to nil. +func NewReplicaFromV2(f *netmap.Replica) *Replica { + return (*Replica)(f) +} + +// ToV2 converts Replica to v2 Replica. +// +// Nil Replica converts to nil. +func (r *Replica) ToV2() *netmap.Replica { + return (*netmap.Replica)(r) +} + +// Count returns number of object replicas. +func (r *Replica) Count() uint32 { + return (*netmap.Replica)(r).GetCount() +} + +// SetCount sets number of object replicas. +func (r *Replica) SetCount(c uint32) { + (*netmap.Replica)(r).SetCount(c) +} + +// Selector returns name of selector bucket to put replicas. +func (r *Replica) Selector() string { + return (*netmap.Replica)(r).GetSelector() +} + +// SetSelector sets name of selector bucket to put replicas. +func (r *Replica) SetSelector(s string) { + (*netmap.Replica)(r).SetSelector(s) +} + +// Marshal marshals Replica into a protobuf binary form. +func (r *Replica) Marshal() ([]byte, error) { + return (*netmap.Replica)(r).StableMarshal(nil) +} + +// Unmarshal unmarshals protobuf binary representation of Replica. +func (r *Replica) Unmarshal(data []byte) error { + return (*netmap.Replica)(r).Unmarshal(data) +} + +// MarshalJSON encodes Replica to protobuf JSON format. +func (r *Replica) MarshalJSON() ([]byte, error) { + return (*netmap.Replica)(r).MarshalJSON() +} + +// UnmarshalJSON decodes Replica from protobuf JSON format. +func (r *Replica) UnmarshalJSON(data []byte) error { + return (*netmap.Replica)(r).UnmarshalJSON(data) +} diff --git a/netmap/replica_test.go b/netmap/replica_test.go new file mode 100644 index 0000000..c1b0dba --- /dev/null +++ b/netmap/replica_test.go @@ -0,0 +1,99 @@ +package netmap + +import ( + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap" + testv2 "github.com/nspcc-dev/neofs-api-go/v2/netmap/test" + "github.com/stretchr/testify/require" +) + +func testReplica() *Replica { + r := new(Replica) + r.SetCount(3) + r.SetSelector("selector") + + return r +} + +func TestReplicaFromV2(t *testing.T) { + t.Run("from nil", func(t *testing.T) { + var x *netmap.Replica + + require.Nil(t, NewReplicaFromV2(x)) + }) + + t.Run("from non-nil", func(t *testing.T) { + rV2 := testv2.GenerateReplica(false) + + r := NewReplicaFromV2(rV2) + + require.Equal(t, rV2, r.ToV2()) + }) +} + +func TestReplica_Count(t *testing.T) { + r := NewReplica() + c := uint32(3) + + r.SetCount(c) + + require.Equal(t, c, r.Count()) +} + +func TestReplica_Selector(t *testing.T) { + r := NewReplica() + s := "some selector" + + r.SetSelector(s) + + require.Equal(t, s, r.Selector()) +} + +func TestReplicaEncoding(t *testing.T) { + r := newReplica(3, "selector") + + t.Run("binary", func(t *testing.T) { + data, err := r.Marshal() + require.NoError(t, err) + + r2 := NewReplica() + require.NoError(t, r2.Unmarshal(data)) + + require.Equal(t, r, r2) + }) + + t.Run("json", func(t *testing.T) { + data, err := r.MarshalJSON() + require.NoError(t, err) + + r2 := NewReplica() + require.NoError(t, r2.UnmarshalJSON(data)) + + require.Equal(t, r, r2) + }) +} + +func TestReplica_ToV2(t *testing.T) { + t.Run("nil", func(t *testing.T) { + var x *Replica + + require.Nil(t, x.ToV2()) + }) +} + +func TestNewReplica(t *testing.T) { + t.Run("default values", func(t *testing.T) { + r := NewReplica() + + // check initial values + require.Zero(t, r.Count()) + require.Empty(t, r.Selector()) + + // convert to v2 message + rV2 := r.ToV2() + + require.Zero(t, rV2.GetCount()) + require.Empty(t, rV2.GetSelector()) + }) +} diff --git a/netmap/selector.go b/netmap/selector.go new file mode 100644 index 0000000..e9beb21 --- /dev/null +++ b/netmap/selector.go @@ -0,0 +1,264 @@ +package netmap + +import ( + "fmt" + "sort" + + "github.com/nspcc-dev/hrw" + "github.com/nspcc-dev/neofs-api-go/v2/netmap" +) + +// Selector represents v2-compatible netmap selector. +type Selector netmap.Selector + +// processSelectors processes selectors and returns error is any of them is invalid. +func (c *Context) processSelectors(p *PlacementPolicy) error { + for _, s := range p.Selectors() { + if s == nil { + return fmt.Errorf("%w: SELECT", ErrMissingField) + } else if s.Filter() != MainFilterName { + _, ok := c.Filters[s.Filter()] + if !ok { + return fmt.Errorf("%w: SELECT FROM '%s'", ErrFilterNotFound, s.Filter()) + } + } + + c.Selectors[s.Name()] = s + + result, err := c.getSelection(p, s) + if err != nil { + return err + } + + c.Selections[s.Name()] = result + } + + return nil +} + +// GetNodesCount returns amount of buckets and minimum number of nodes in every bucket +// for the given selector. +func GetNodesCount(_ *PlacementPolicy, s *Selector) (int, int) { + switch s.Clause() { + case ClauseSame: + return 1, int(s.Count()) + default: + return int(s.Count()), 1 + } +} + +// getSelection returns nodes grouped by s.attribute. +// Last argument specifies if more buckets can be used to fullfill CBF. +func (c *Context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error) { + bucketCount, nodesInBucket := GetNodesCount(p, s) + buckets := c.getSelectionBase(s) + + if len(buckets) < bucketCount { + return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name()) + } + + if len(c.pivot) == 0 { + // Deterministic order in case of zero seed. + if s.Attribute() == "" { + sort.Slice(buckets, func(i, j int) bool { + return buckets[i].nodes[0].ID < buckets[j].nodes[0].ID + }) + } else { + sort.Slice(buckets, func(i, j int) bool { + return buckets[i].attr < buckets[j].attr + }) + } + } + + maxNodesInBucket := nodesInBucket * int(c.cbf) + nodes := make([]Nodes, 0, len(buckets)) + fallback := make([]Nodes, 0, len(buckets)) + + for i := range buckets { + ns := buckets[i].nodes + if len(ns) >= maxNodesInBucket { + nodes = append(nodes, ns[:maxNodesInBucket]) + } else if len(ns) >= nodesInBucket { + fallback = append(fallback, ns) + } + } + + if len(nodes) < bucketCount { + // Fallback to using minimum allowed backup factor (1). + nodes = append(nodes, fallback...) + if len(nodes) < bucketCount { + return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name()) + } + } + + if len(c.pivot) != 0 { + weights := make([]float64, len(nodes)) + for i := range nodes { + weights[i] = GetBucketWeight(nodes[i], c.aggregator(), c.weightFunc) + } + + hrw.SortSliceByWeightIndex(nodes, weights, c.pivotHash) + } + + if s.Attribute() == "" { + nodes, fallback = nodes[:bucketCount], nodes[bucketCount:] + for i := range fallback { + index := i % bucketCount + if len(nodes[index]) >= maxNodesInBucket { + break + } + nodes[index] = append(nodes[index], fallback[i]...) + } + } + + return nodes[:bucketCount], nil +} + +type nodeAttrPair struct { + attr string + nodes Nodes +} + +// getSelectionBase returns nodes grouped by selector attribute. +// It it guaranteed that each pair will contain at least one node. +func (c *Context) getSelectionBase(s *Selector) []nodeAttrPair { + f := c.Filters[s.Filter()] + isMain := s.Filter() == MainFilterName + result := []nodeAttrPair{} + nodeMap := map[string]Nodes{} + attr := s.Attribute() + + for i := range c.Netmap.Nodes { + if isMain || c.match(f, c.Netmap.Nodes[i]) { + if attr == "" { + // Default attribute is transparent identifier which is different for every node. + result = append(result, nodeAttrPair{attr: "", nodes: Nodes{c.Netmap.Nodes[i]}}) + } else { + v := c.Netmap.Nodes[i].Attribute(attr) + nodeMap[v] = append(nodeMap[v], c.Netmap.Nodes[i]) + } + } + } + + if attr != "" { + for k, ns := range nodeMap { + result = append(result, nodeAttrPair{attr: k, nodes: ns}) + } + } + + if len(c.pivot) != 0 { + for i := range result { + hrw.SortSliceByWeightValue(result[i].nodes, result[i].nodes.Weights(c.weightFunc), c.pivotHash) + } + } + + return result +} + +// NewSelector creates and returns new Selector instance. +// +// Defaults: +// - name: ""; +// - attribute: ""; +// - filter: ""; +// - clause: ClauseUnspecified; +// - count: 0. +func NewSelector() *Selector { + return NewSelectorFromV2(new(netmap.Selector)) +} + +// NewSelectorFromV2 converts v2 Selector to Selector. +// +// Nil netmap.Selector converts to nil. +func NewSelectorFromV2(f *netmap.Selector) *Selector { + return (*Selector)(f) +} + +// ToV2 converts Selector to v2 Selector. +// +// Nil Selector converts to nil. +func (s *Selector) ToV2() *netmap.Selector { + return (*netmap.Selector)(s) +} + +// Name returns selector name. +func (s *Selector) Name() string { + return (*netmap.Selector)(s). + GetName() +} + +// SetName sets selector name. +func (s *Selector) SetName(name string) { + (*netmap.Selector)(s). + SetName(name) +} + +// Count returns count of nodes to select from bucket. +func (s *Selector) Count() uint32 { + return (*netmap.Selector)(s). + GetCount() +} + +// SetCount sets count of nodes to select from bucket. +func (s *Selector) SetCount(c uint32) { + (*netmap.Selector)(s). + SetCount(c) +} + +// Clause returns modifier showing how to form a bucket. +func (s *Selector) Clause() Clause { + return ClauseFromV2( + (*netmap.Selector)(s). + GetClause(), + ) +} + +// SetClause sets modifier showing how to form a bucket. +func (s *Selector) SetClause(c Clause) { + (*netmap.Selector)(s). + SetClause(c.ToV2()) +} + +// Attribute returns attribute bucket to select from. +func (s *Selector) Attribute() string { + return (*netmap.Selector)(s). + GetAttribute() +} + +// SetAttribute sets attribute bucket to select from. +func (s *Selector) SetAttribute(a string) { + (*netmap.Selector)(s). + SetAttribute(a) +} + +// Filter returns filter reference to select from. +func (s *Selector) Filter() string { + return (*netmap.Selector)(s). + GetFilter() +} + +// SetFilter sets filter reference to select from. +func (s *Selector) SetFilter(f string) { + (*netmap.Selector)(s). + SetFilter(f) +} + +// Marshal marshals Selector into a protobuf binary form. +func (s *Selector) Marshal() ([]byte, error) { + return (*netmap.Selector)(s).StableMarshal(nil) +} + +// Unmarshal unmarshals protobuf binary representation of Selector. +func (s *Selector) Unmarshal(data []byte) error { + return (*netmap.Selector)(s).Unmarshal(data) +} + +// MarshalJSON encodes Selector to protobuf JSON format. +func (s *Selector) MarshalJSON() ([]byte, error) { + return (*netmap.Selector)(s).MarshalJSON() +} + +// UnmarshalJSON decodes Selector from protobuf JSON format. +func (s *Selector) UnmarshalJSON(data []byte) error { + return (*netmap.Selector)(s).UnmarshalJSON(data) +} diff --git a/netmap/selector_test.go b/netmap/selector_test.go new file mode 100644 index 0000000..2aa9798 --- /dev/null +++ b/netmap/selector_test.go @@ -0,0 +1,530 @@ +package netmap + +import ( + "errors" + "fmt" + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap" + testv2 "github.com/nspcc-dev/neofs-api-go/v2/netmap/test" + "github.com/stretchr/testify/require" +) + +func TestPlacementPolicy_UnspecifiedClause(t *testing.T) { + p := newPlacementPolicy(1, + []*Replica{newReplica(1, "X")}, + []*Selector{ + newSelector("X", "", ClauseDistinct, 4, "*"), + }, + nil, + ) + nodes := []NodeInfo{ + nodeInfoFromAttributes("ID", "1", "Country", "RU", "City", "St.Petersburg", "SSD", "0"), + nodeInfoFromAttributes("ID", "2", "Country", "RU", "City", "St.Petersburg", "SSD", "1"), + nodeInfoFromAttributes("ID", "3", "Country", "RU", "City", "Moscow", "SSD", "1"), + nodeInfoFromAttributes("ID", "4", "Country", "RU", "City", "Moscow", "SSD", "1"), + } + + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + v, err := nm.GetContainerNodes(p, nil) + require.NoError(t, err) + require.Equal(t, 4, len(v.Flatten())) +} + +func TestPlacementPolicy_Minimal(t *testing.T) { + nodes := []NodeInfo{ + nodeInfoFromAttributes("City", "Saint-Petersburg"), + nodeInfoFromAttributes("City", "Moscow"), + nodeInfoFromAttributes("City", "Berlin"), + nodeInfoFromAttributes("City", "Paris"), + } + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + + runTest := func(t *testing.T, rep uint32, expectError bool) { + p := newPlacementPolicy(0, + []*Replica{newReplica(rep, "")}, + nil, nil) + + v, err := nm.GetContainerNodes(p, nil) + + if expectError { + require.Error(t, err) + return + } + + require.NoError(t, err) + count := int(rep * defaultCBF) + if count > len(nm.Nodes) { + count = len(nm.Nodes) + } + require.EqualValues(t, count, len(v.Flatten())) + } + + t.Run("REP 1", func(t *testing.T) { + runTest(t, 1, false) + }) + t.Run("REP 3", func(t *testing.T) { + runTest(t, 3, false) + }) + t.Run("REP 5", func(t *testing.T) { + runTest(t, 5, true) + }) +} + +// Issue #215. +func TestPlacementPolicy_MultipleREP(t *testing.T) { + p := newPlacementPolicy(1, + []*Replica{ + newReplica(1, "LOC_SPB_PLACE"), + newReplica(1, "LOC_MSK_PLACE"), + }, + []*Selector{ + newSelector("LOC_SPB_PLACE", "", ClauseUnspecified, 1, "LOC_SPB"), + newSelector("LOC_MSK_PLACE", "", ClauseUnspecified, 1, "LOC_MSK"), + }, + []*Filter{ + newFilter("LOC_SPB", "City", "Saint-Petersburg", OpEQ), + newFilter("LOC_MSK", "City", "Moscow", OpEQ), + }, + ) + nodes := []NodeInfo{ + nodeInfoFromAttributes("City", "Saint-Petersburg"), + nodeInfoFromAttributes("City", "Moscow"), + nodeInfoFromAttributes("City", "Berlin"), + nodeInfoFromAttributes("City", "Paris"), + } + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + + v, err := nm.GetContainerNodes(p, nil) + require.NoError(t, err) + + rs := v.Replicas() + require.Equal(t, 2, len(rs)) + require.Equal(t, 1, len(rs[0])) + require.Equal(t, "Saint-Petersburg", rs[0][0].Attribute("City")) + require.Equal(t, 1, len(rs[1])) + require.Equal(t, "Moscow", rs[1][0].Attribute("City")) +} + +func TestPlacementPolicy_DefaultCBF(t *testing.T) { + p := newPlacementPolicy(0, + []*Replica{ + newReplica(1, "EU"), + }, + []*Selector{ + newSelector("EU", "Location", ClauseSame, 1, "*"), + }, + nil) + nodes := []NodeInfo{ + nodeInfoFromAttributes("Location", "Europe", "Country", "RU", "City", "St.Petersburg"), + nodeInfoFromAttributes("Location", "Europe", "Country", "RU", "City", "Moscow"), + nodeInfoFromAttributes("Location", "Europe", "Country", "DE", "City", "Berlin"), + nodeInfoFromAttributes("Location", "Europe", "Country", "FR", "City", "Paris"), + } + + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + v, err := nm.GetContainerNodes(p, nil) + require.NoError(t, err) + require.Equal(t, defaultCBF, len(v.Flatten())) +} + +func TestPlacementPolicy_GetPlacementVectors(t *testing.T) { + p := newPlacementPolicy(2, + []*Replica{ + newReplica(1, "SPB"), + newReplica(2, "Americas"), + }, + []*Selector{ + newSelector("SPB", "City", ClauseSame, 1, "SPBSSD"), + newSelector("Americas", "City", ClauseDistinct, 2, "Americas"), + }, + []*Filter{ + newFilter("SPBSSD", "", "", OpAND, + newFilter("", "Country", "RU", OpEQ), + newFilter("", "City", "St.Petersburg", OpEQ), + newFilter("", "SSD", "1", OpEQ)), + newFilter("Americas", "", "", OpOR, + newFilter("", "Continent", "NA", OpEQ), + newFilter("", "Continent", "SA", OpEQ)), + }) + nodes := []NodeInfo{ + nodeInfoFromAttributes("ID", "1", "Country", "RU", "City", "St.Petersburg", "SSD", "0"), + nodeInfoFromAttributes("ID", "2", "Country", "RU", "City", "St.Petersburg", "SSD", "1"), + nodeInfoFromAttributes("ID", "3", "Country", "RU", "City", "Moscow", "SSD", "1"), + nodeInfoFromAttributes("ID", "4", "Country", "RU", "City", "Moscow", "SSD", "1"), + nodeInfoFromAttributes("ID", "5", "Country", "RU", "City", "St.Petersburg", "SSD", "1"), + nodeInfoFromAttributes("ID", "6", "Continent", "NA", "City", "NewYork"), + nodeInfoFromAttributes("ID", "7", "Continent", "AF", "City", "Cairo"), + nodeInfoFromAttributes("ID", "8", "Continent", "AF", "City", "Cairo"), + nodeInfoFromAttributes("ID", "9", "Continent", "SA", "City", "Lima"), + nodeInfoFromAttributes("ID", "10", "Continent", "AF", "City", "Cairo"), + nodeInfoFromAttributes("ID", "11", "Continent", "NA", "City", "NewYork"), + nodeInfoFromAttributes("ID", "12", "Continent", "NA", "City", "LosAngeles"), + nodeInfoFromAttributes("ID", "13", "Continent", "SA", "City", "Lima"), + } + + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + v, err := nm.GetContainerNodes(p, nil) + require.NoError(t, err) + require.Equal(t, 2, len(v.Replicas())) + require.Equal(t, 6, len(v.Flatten())) + + require.Equal(t, 2, len(v.Replicas()[0])) + ids := map[string]struct{}{} + for _, ni := range v.Replicas()[0] { + require.Equal(t, "RU", ni.Attribute("Country")) + require.Equal(t, "St.Petersburg", ni.Attribute("City")) + require.Equal(t, "1", ni.Attribute("SSD")) + ids[ni.Attribute("ID")] = struct{}{} + } + require.Equal(t, len(v.Replicas()[0]), len(ids), "not all nodes we distinct") + + require.Equal(t, 4, len(v.Replicas()[1])) // 2 cities * 2 HRWB + ids = map[string]struct{}{} + for _, ni := range v.Replicas()[1] { + require.Contains(t, []string{"NA", "SA"}, ni.Attribute("Continent")) + ids[ni.Attribute("ID")] = struct{}{} + } + require.Equal(t, len(v.Replicas()[1]), len(ids), "not all nodes we distinct") +} + +func TestPlacementPolicy_LowerBound(t *testing.T) { + p := newPlacementPolicy( + 2, // backup factor + []*Replica{ + newReplica(1, "X"), + }, + []*Selector{ + newSelector("X", "Country", ClauseSame, 2, "*"), + }, + nil, // filters + ) + + nodes := []NodeInfo{ + nodeInfoFromAttributes("ID", "1", "Country", "DE"), + nodeInfoFromAttributes("ID", "2", "Country", "DE"), + nodeInfoFromAttributes("ID", "3", "Country", "DE"), + } + + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + + v, err := nm.GetContainerNodes(p, nil) + require.NoError(t, err) + + require.Equal(t, 3, len(v.Flatten())) +} + +func TestIssue213(t *testing.T) { + p := newPlacementPolicy(1, + []*Replica{ + newReplica(4, ""), + }, + []*Selector{ + newSelector("", "", ClauseDistinct, 4, "LOC_EU"), + }, + []*Filter{ + newFilter("LOC_EU", "Location", "Europe", OpEQ), + }) + nodes := []NodeInfo{ + nodeInfoFromAttributes("Location", "Europe", "Country", "Russia", "City", "Moscow"), + nodeInfoFromAttributes("Location", "Europe", "Country", "Russia", "City", "Saint-Petersburg"), + nodeInfoFromAttributes("Location", "Europe", "Country", "Sweden", "City", "Stockholm"), + nodeInfoFromAttributes("Location", "Europe", "Country", "Finalnd", "City", "Helsinki"), + } + + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + + v, err := nm.GetContainerNodes(p, nil) + require.NoError(t, err) + require.Equal(t, 4, len(v.Flatten())) +} + +func TestPlacementPolicy_ProcessSelectors(t *testing.T) { + p := newPlacementPolicy(2, nil, + []*Selector{ + newSelector("SameRU", "City", ClauseSame, 2, "FromRU"), + newSelector("DistinctRU", "City", ClauseDistinct, 2, "FromRU"), + newSelector("Good", "Country", ClauseDistinct, 2, "Good"), + newSelector("Main", "Country", ClauseDistinct, 3, "*"), + }, + []*Filter{ + newFilter("FromRU", "Country", "Russia", OpEQ), + newFilter("Good", "Rating", "4", OpGE), + }) + nodes := []NodeInfo{ + nodeInfoFromAttributes("Country", "Russia", "Rating", "1", "City", "SPB"), + nodeInfoFromAttributes("Country", "Germany", "Rating", "5", "City", "Berlin"), + nodeInfoFromAttributes("Country", "Russia", "Rating", "6", "City", "Moscow"), + nodeInfoFromAttributes("Country", "France", "Rating", "4", "City", "Paris"), + nodeInfoFromAttributes("Country", "France", "Rating", "1", "City", "Lyon"), + nodeInfoFromAttributes("Country", "Russia", "Rating", "5", "City", "SPB"), + nodeInfoFromAttributes("Country", "Russia", "Rating", "7", "City", "Moscow"), + nodeInfoFromAttributes("Country", "Germany", "Rating", "3", "City", "Darmstadt"), + nodeInfoFromAttributes("Country", "Germany", "Rating", "7", "City", "Frankfurt"), + nodeInfoFromAttributes("Country", "Russia", "Rating", "9", "City", "SPB"), + nodeInfoFromAttributes("Country", "Russia", "Rating", "9", "City", "SPB"), + } + + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + c := NewContext(nm) + c.setCBF(p.ContainerBackupFactor()) + require.NoError(t, c.processFilters(p)) + require.NoError(t, c.processSelectors(p)) + + for _, s := range p.Selectors() { + sel := c.Selections[s.Name()] + s := c.Selectors[s.Name()] + bucketCount, nodesInBucket := GetNodesCount(p, s) + nodesInBucket *= int(c.cbf) + targ := fmt.Sprintf("selector '%s'", s.Name()) + require.Equal(t, bucketCount, len(sel), targ) + for _, res := range sel { + require.Equal(t, nodesInBucket, len(res), targ) + for j := range res { + require.True(t, c.applyFilter(s.Filter(), res[j]), targ) + } + } + } +} + +func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) { + p := newPlacementPolicy(1, nil, + []*Selector{ + newSelector("Main", "Country", ClauseDistinct, 3, "*"), + }, nil) + + // bucket weight order: RU > DE > FR + nodes := []NodeInfo{ + nodeInfoFromAttributes("Country", "Germany", AttrPrice, "2", AttrCapacity, "10000"), + nodeInfoFromAttributes("Country", "Germany", AttrPrice, "4", AttrCapacity, "1"), + nodeInfoFromAttributes("Country", "France", AttrPrice, "3", AttrCapacity, "10"), + nodeInfoFromAttributes("Country", "Russia", AttrPrice, "2", AttrCapacity, "10000"), + nodeInfoFromAttributes("Country", "Russia", AttrPrice, "1", AttrCapacity, "10000"), + nodeInfoFromAttributes("Country", "Russia", AttrCapacity, "10000"), + nodeInfoFromAttributes("Country", "France", AttrPrice, "100", AttrCapacity, "1"), + nodeInfoFromAttributes("Country", "France", AttrPrice, "7", AttrCapacity, "10000"), + nodeInfoFromAttributes("Country", "Russia", AttrPrice, "2", AttrCapacity, "1"), + } + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + c := NewContext(nm) + c.setPivot([]byte("containerID")) + c.setCBF(p.ContainerBackupFactor()) + c.weightFunc = newWeightFunc(newMaxNorm(10000), newReverseMinNorm(1)) + c.aggregator = func() aggregator { + return new(maxAgg) + } + + require.NoError(t, c.processFilters(p)) + require.NoError(t, c.processSelectors(p)) + + cnt := c.Selections["Main"] + expected := []Nodes{ + {{Index: 4, Capacity: 10000, Price: 1}}, // best RU + {{Index: 0, Capacity: 10000, Price: 2}}, // best DE + {{Index: 7, Capacity: 10000, Price: 7}}, // best FR + } + require.Equal(t, len(expected), len(cnt)) + for i := range expected { + require.Equal(t, len(expected[i]), len(cnt[i])) + require.Equal(t, expected[i][0].Index, cnt[i][0].Index) + require.Equal(t, expected[i][0].Capacity, cnt[i][0].Capacity) + require.Equal(t, expected[i][0].Price, cnt[i][0].Price) + } + + res, err := nm.GetPlacementVectors(containerNodes(cnt), []byte("objectID")) + require.NoError(t, err) + require.Equal(t, res, cnt) +} + +func newMaxNorm(max float64) normalizer { + return &maxNorm{max: max} +} + +func TestPlacementPolicy_ProcessSelectorsInvalid(t *testing.T) { + testCases := []struct { + name string + p *PlacementPolicy + err error + }{ + { + "MissingSelector", + newPlacementPolicy(2, nil, + []*Selector{nil}, + []*Filter{}), + ErrMissingField, + }, + { + "InvalidFilterReference", + newPlacementPolicy(1, nil, + []*Selector{newSelector("MyStore", "Country", ClauseDistinct, 1, "FromNL")}, + []*Filter{newFilter("FromRU", "Country", "Russia", OpEQ)}), + ErrFilterNotFound, + }, + { + "NotEnoughNodes (backup factor)", + newPlacementPolicy(2, nil, + []*Selector{newSelector("MyStore", "Country", ClauseDistinct, 2, "FromRU")}, + []*Filter{newFilter("FromRU", "Country", "Russia", OpEQ)}), + ErrNotEnoughNodes, + }, + { + "NotEnoughNodes (buckets)", + newPlacementPolicy(1, nil, + []*Selector{newSelector("MyStore", "Country", ClauseDistinct, 2, "FromRU")}, + []*Filter{newFilter("FromRU", "Country", "Russia", OpEQ)}), + ErrNotEnoughNodes, + }, + } + nodes := []NodeInfo{ + nodeInfoFromAttributes("Country", "Russia"), + nodeInfoFromAttributes("Country", "Germany"), + nodeInfoFromAttributes(), + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + nm, err := NewNetmap(NodesFromInfo(nodes)) + require.NoError(t, err) + c := NewContext(nm) + c.setCBF(tc.p.ContainerBackupFactor()) + require.NoError(t, c.processFilters(tc.p)) + + err = c.processSelectors(tc.p) + require.True(t, errors.Is(err, tc.err), "got: %v", err) + }) + } +} + +func testSelector() *Selector { + s := new(Selector) + s.SetName("name") + s.SetCount(3) + s.SetFilter("filter") + s.SetAttribute("attribute") + s.SetClause(ClauseDistinct) + + return s +} + +func TestSelector_Name(t *testing.T) { + s := NewSelector() + name := "some name" + + s.SetName(name) + + require.Equal(t, name, s.Name()) +} + +func TestSelector_Count(t *testing.T) { + s := NewSelector() + c := uint32(3) + + s.SetCount(c) + + require.Equal(t, c, s.Count()) +} + +func TestSelector_Clause(t *testing.T) { + s := NewSelector() + c := ClauseSame + + s.SetClause(c) + + require.Equal(t, c, s.Clause()) +} + +func TestSelector_Attribute(t *testing.T) { + s := NewSelector() + a := "some attribute" + + s.SetAttribute(a) + + require.Equal(t, a, s.Attribute()) +} + +func TestSelector_Filter(t *testing.T) { + s := NewSelector() + f := "some filter" + + s.SetFilter(f) + + require.Equal(t, f, s.Filter()) +} + +func TestSelectorEncoding(t *testing.T) { + s := newSelector("name", "atte", ClauseSame, 1, "filter") + + t.Run("binary", func(t *testing.T) { + data, err := s.Marshal() + require.NoError(t, err) + + s2 := NewSelector() + require.NoError(t, s2.Unmarshal(data)) + + require.Equal(t, s, s2) + }) + + t.Run("json", func(t *testing.T) { + data, err := s.MarshalJSON() + require.NoError(t, err) + + s2 := NewSelector() + require.NoError(t, s2.UnmarshalJSON(data)) + + require.Equal(t, s, s2) + }) +} + +func TestSelector_ToV2(t *testing.T) { + t.Run("nil", func(t *testing.T) { + var x *Selector + + require.Nil(t, x.ToV2()) + }) +} + +func TestNewSelectorFromV2(t *testing.T) { + t.Run("from nil", func(t *testing.T) { + var x *netmap.Selector + + require.Nil(t, NewSelectorFromV2(x)) + }) + + t.Run("from non-nil", func(t *testing.T) { + sV2 := testv2.GenerateSelector(false) + + s := NewSelectorFromV2(sV2) + + require.Equal(t, sV2, s.ToV2()) + }) +} + +func TestNewSelector(t *testing.T) { + t.Run("default values", func(t *testing.T) { + s := NewSelector() + + // check initial values + require.Zero(t, s.Count()) + require.Equal(t, ClauseUnspecified, s.Clause()) + require.Empty(t, s.Attribute()) + require.Empty(t, s.Name()) + require.Empty(t, s.Filter()) + + // convert to v2 message + sV2 := s.ToV2() + + require.Zero(t, sV2.GetCount()) + require.Equal(t, netmap.UnspecifiedClause, sV2.GetClause()) + require.Empty(t, sV2.GetAttribute()) + require.Empty(t, sV2.GetName()) + require.Empty(t, sV2.GetFilter()) + }) +} diff --git a/netmap/test/generate.go b/netmap/test/generate.go new file mode 100644 index 0000000..3f0826b --- /dev/null +++ b/netmap/test/generate.go @@ -0,0 +1,37 @@ +package test + +import "github.com/nspcc-dev/neofs-sdk-go/netmap" + +// NetworkParameter returns random netmap.NetworkParameter. +func NetworkParameter() *netmap.NetworkParameter { + x := netmap.NewNetworkParameter() + + x.SetKey([]byte("key")) + x.SetValue([]byte("value")) + + return x +} + +// NetworkConfig returns random netmap.NetworkConfig. +func NetworkConfig() *netmap.NetworkConfig { + x := netmap.NewNetworkConfig() + + x.SetParameters( + NetworkParameter(), + NetworkParameter(), + ) + + return x +} + +// NetworkInfo returns random netmap.NetworkInfo. +func NetworkInfo() *netmap.NetworkInfo { + x := netmap.NewNetworkInfo() + + x.SetCurrentEpoch(21) + x.SetMagicNumber(32) + x.SetMsPerBlock(43) + x.SetNetworkConfig(NetworkConfig()) + + return x +}