[#189] sdk/netmap: Get rid of the use of v2 types in the placement code

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-11-05 15:45:06 +03:00 committed by Alex Vanin
parent 40d7c9cd93
commit 9834ed4e68
9 changed files with 195 additions and 225 deletions

View file

@ -4,7 +4,6 @@ import (
"errors" "errors"
"github.com/nspcc-dev/hrw" "github.com/nspcc-dev/hrw"
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
) )
// Context contains references to named filters and cached numeric values. // Context contains references to named filters and cached numeric values.
@ -12,14 +11,14 @@ type Context struct {
// Netmap is a netmap structure to operate on. // Netmap is a netmap structure to operate on.
Netmap *Netmap Netmap *Netmap
// Filters stores processed filters. // Filters stores processed filters.
Filters map[string]*netmap.Filter Filters map[string]*Filter
// Selectors stores processed selectors. // Selectors stores processed selectors.
Selectors map[string]*netmap.Selector Selectors map[string]*Selector
// Selections stores result of selector processing. // Selections stores result of selector processing.
Selections map[string][]Nodes Selections map[string][]Nodes
// numCache stores parsed numeric values. // numCache stores parsed numeric values.
numCache map[*netmap.Filter]uint64 numCache map[*Filter]uint64
// pivot is a seed for HRW. // pivot is a seed for HRW.
pivot []byte pivot []byte
// pivotHash is a saved HRW hash of pivot // pivotHash is a saved HRW hash of pivot
@ -50,11 +49,11 @@ var (
func NewContext(nm *Netmap) *Context { func NewContext(nm *Netmap) *Context {
return &Context{ return &Context{
Netmap: nm, Netmap: nm,
Filters: make(map[string]*netmap.Filter), Filters: make(map[string]*Filter),
Selectors: make(map[string]*netmap.Selector), Selectors: make(map[string]*Selector),
Selections: make(map[string][]Nodes), Selections: make(map[string][]Nodes),
numCache: make(map[*netmap.Filter]uint64), numCache: make(map[*Filter]uint64),
aggregator: newMeanIQRAgg, aggregator: newMeanIQRAgg,
weightFunc: GetDefaultWeightFunc(nm.Nodes), weightFunc: GetDefaultWeightFunc(nm.Nodes),
} }

View file

@ -20,8 +20,8 @@ func (c *Context) applyFilter(name string, b *Node) bool {
} }
// processFilters processes filters and returns error is any of them is invalid. // processFilters processes filters and returns error is any of them is invalid.
func (c *Context) processFilters(p *netmap.PlacementPolicy) error { func (c *Context) processFilters(p *PlacementPolicy) error {
for _, f := range p.GetFilters() { for _, f := range p.Filters() {
if err := c.processFilter(f, true); err != nil { if err := c.processFilter(f, true); err != nil {
return err return err
} }
@ -29,46 +29,46 @@ func (c *Context) processFilters(p *netmap.PlacementPolicy) error {
return nil return nil
} }
func (c *Context) processFilter(f *netmap.Filter, top bool) error { func (c *Context) processFilter(f *Filter, top bool) error {
if f == nil { if f == nil {
return fmt.Errorf("%w: FILTER", ErrMissingField) return fmt.Errorf("%w: FILTER", ErrMissingField)
} }
if f.GetName() == MainFilterName { if f.Name() == MainFilterName {
return fmt.Errorf("%w: '*' is reserved", ErrInvalidFilterName) return fmt.Errorf("%w: '*' is reserved", ErrInvalidFilterName)
} }
if top && f.GetName() == "" { if top && f.Name() == "" {
return ErrUnnamedTopFilter return ErrUnnamedTopFilter
} }
if !top && f.GetName() != "" && c.Filters[f.GetName()] == nil { if !top && f.Name() != "" && c.Filters[f.Name()] == nil {
return fmt.Errorf("%w: '%s'", ErrFilterNotFound, f.GetName()) return fmt.Errorf("%w: '%s'", ErrFilterNotFound, f.Name())
} }
switch f.GetOp() { switch f.Operation() {
case netmap.AND, netmap.OR: case OpAND, OpOR:
for _, flt := range f.GetFilters() { for _, flt := range f.InnerFilters() {
if err := c.processFilter(flt, false); err != nil { if err := c.processFilter(flt, false); err != nil {
return err return err
} }
} }
default: default:
if len(f.GetFilters()) != 0 { if len(f.InnerFilters()) != 0 {
return ErrNonEmptyFilters return ErrNonEmptyFilters
} else if !top && f.GetName() != "" { // named reference } else if !top && f.Name() != "" { // named reference
return nil return nil
} }
switch f.GetOp() { switch f.Operation() {
case netmap.EQ, netmap.NE: case OpEQ, OpNE:
case netmap.GT, netmap.GE, netmap.LT, netmap.LE: case OpGT, OpGE, OpLT, OpLE:
n, err := strconv.ParseUint(f.GetValue(), 10, 64) n, err := strconv.ParseUint(f.Value(), 10, 64)
if err != nil { if err != nil {
return fmt.Errorf("%w: '%s'", ErrInvalidNumber, f.GetValue()) return fmt.Errorf("%w: '%s'", ErrInvalidNumber, f.Value())
} }
c.numCache[f] = n c.numCache[f] = n
default: default:
return fmt.Errorf("%w: %d", ErrInvalidFilterOp, f.GetOp()) return fmt.Errorf("%w: %s", ErrInvalidFilterOp, f.Operation())
} }
} }
if top { if top {
c.Filters[f.GetName()] = f c.Filters[f.Name()] = f
} }
return nil return nil
} }
@ -76,54 +76,54 @@ func (c *Context) processFilter(f *netmap.Filter, top bool) error {
// match matches f against b. It returns no errors because // match matches f against b. It returns no errors because
// filter should have been parsed during context creation // filter should have been parsed during context creation
// and missing node properties are considered as a regular fail. // and missing node properties are considered as a regular fail.
func (c *Context) match(f *netmap.Filter, b *Node) bool { func (c *Context) match(f *Filter, b *Node) bool {
switch f.GetOp() { switch f.Operation() {
case netmap.AND, netmap.OR: case OpAND, OpOR:
for _, lf := range f.GetFilters() { for _, lf := range f.InnerFilters() {
if lf.GetName() != "" { if lf.Name() != "" {
lf = c.Filters[lf.GetName()] lf = c.Filters[lf.Name()]
} }
ok := c.match(lf, b) ok := c.match(lf, b)
if ok == (f.GetOp() == netmap.OR) { if ok == (f.Operation() == OpOR) {
return ok return ok
} }
} }
return f.GetOp() == netmap.AND return f.Operation() == OpAND
default: default:
return c.matchKeyValue(f, b) return c.matchKeyValue(f, b)
} }
} }
func (c *Context) matchKeyValue(f *netmap.Filter, b *Node) bool { func (c *Context) matchKeyValue(f *Filter, b *Node) bool {
switch f.GetOp() { switch f.Operation() {
case netmap.EQ: case OpEQ:
return b.Attribute(f.GetKey()) == f.GetValue() return b.Attribute(f.Key()) == f.Value()
case netmap.NE: case OpNE:
return b.Attribute(f.GetKey()) != f.GetValue() return b.Attribute(f.Key()) != f.Value()
default: default:
var attr uint64 var attr uint64
switch f.GetKey() { switch f.Key() {
case PriceAttr: case PriceAttr:
attr = b.Price attr = b.Price
case CapacityAttr: case CapacityAttr:
attr = b.Capacity attr = b.Capacity
default: default:
var err error var err error
attr, err = strconv.ParseUint(b.Attribute(f.GetKey()), 10, 64) attr, err = strconv.ParseUint(b.Attribute(f.Key()), 10, 64)
if err != nil { if err != nil {
// Note: because filters are somewhat independent from nodes attributes, // Note: because filters are somewhat independent from nodes attributes,
// We don't report an error here, and fail filter instead. // We don't report an error here, and fail filter instead.
return false return false
} }
} }
switch f.GetOp() { switch f.Operation() {
case netmap.GT: case OpGT:
return attr > c.numCache[f] return attr > c.numCache[f]
case netmap.GE: case OpGE:
return attr >= c.numCache[f] return attr >= c.numCache[f]
case netmap.LT: case OpLT:
return attr < c.numCache[f] return attr < c.numCache[f]
case netmap.LE: case OpLE:
return attr <= c.numCache[f] return attr <= c.numCache[f]
} }
} }

View file

@ -9,12 +9,12 @@ import (
) )
func TestContext_ProcessFilters(t *testing.T) { func TestContext_ProcessFilters(t *testing.T) {
fs := []*netmap.Filter{ fs := []*Filter{
newFilter("StorageSSD", "Storage", "SSD", netmap.EQ), newFilter("StorageSSD", "Storage", "SSD", OpEQ),
newFilter("GoodRating", "Rating", "4", netmap.GE), newFilter("GoodRating", "Rating", "4", OpGE),
newFilter("Main", "", "", netmap.AND, newFilter("Main", "", "", OpAND,
newFilter("StorageSSD", "", "", 0), newFilter("StorageSSD", "", "", 0),
newFilter("", "IntField", "123", netmap.LT), newFilter("", "IntField", "123", OpLT),
newFilter("GoodRating", "", "", 0)), newFilter("GoodRating", "", "", 0)),
} }
nm, err := NewNetmap(nil) nm, err := NewNetmap(nil)
@ -24,49 +24,49 @@ func TestContext_ProcessFilters(t *testing.T) {
require.NoError(t, c.processFilters(p)) require.NoError(t, c.processFilters(p))
require.Equal(t, 3, len(c.Filters)) require.Equal(t, 3, len(c.Filters))
for _, f := range fs { for _, f := range fs {
require.Equal(t, f, c.Filters[f.GetName()]) require.Equal(t, f, c.Filters[f.Name()])
} }
require.Equal(t, uint64(4), c.numCache[fs[1]]) require.Equal(t, uint64(4), c.numCache[fs[1]])
require.Equal(t, uint64(123), c.numCache[fs[2].GetFilters()[1]]) require.Equal(t, uint64(123), c.numCache[fs[2].InnerFilters()[1]])
} }
func TestContext_ProcessFiltersInvalid(t *testing.T) { func TestContext_ProcessFiltersInvalid(t *testing.T) {
errTestCases := []struct { errTestCases := []struct {
name string name string
filter *netmap.Filter filter *Filter
err error err error
}{ }{
{ {
"UnnamedTop", "UnnamedTop",
newFilter("", "Storage", "SSD", netmap.EQ), newFilter("", "Storage", "SSD", OpEQ),
ErrUnnamedTopFilter, ErrUnnamedTopFilter,
}, },
{ {
"InvalidReference", "InvalidReference",
newFilter("Main", "", "", netmap.AND, newFilter("Main", "", "", OpAND,
newFilter("StorageSSD", "", "", 0)), newFilter("StorageSSD", "", "", 0)),
ErrFilterNotFound, ErrFilterNotFound,
}, },
{ {
"NonEmptyKeyed", "NonEmptyKeyed",
newFilter("Main", "Storage", "SSD", netmap.EQ, newFilter("Main", "Storage", "SSD", OpEQ,
newFilter("StorageSSD", "", "", 0)), newFilter("StorageSSD", "", "", 0)),
ErrNonEmptyFilters, ErrNonEmptyFilters,
}, },
{ {
"InvalidNumber", "InvalidNumber",
newFilter("Main", "Rating", "three", netmap.GE), newFilter("Main", "Rating", "three", OpGE),
ErrInvalidNumber, ErrInvalidNumber,
}, },
{ {
"InvalidOp", "InvalidOp",
newFilter("Main", "Rating", "3", netmap.UnspecifiedOperation), newFilter("Main", "Rating", "3", 0),
ErrInvalidFilterOp, ErrInvalidFilterOp,
}, },
{ {
"InvalidName", "InvalidName",
newFilter("*", "Rating", "3", netmap.GE), newFilter("*", "Rating", "3", OpGE),
ErrInvalidFilterName, ErrInvalidFilterName,
}, },
{ {
@ -78,7 +78,7 @@ func TestContext_ProcessFiltersInvalid(t *testing.T) {
for _, tc := range errTestCases { for _, tc := range errTestCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
c := NewContext(new(Netmap)) c := NewContext(new(Netmap))
p := newPlacementPolicy(1, nil, nil, []*netmap.Filter{tc.filter}) p := newPlacementPolicy(1, nil, nil, []*Filter{tc.filter})
err := c.processFilters(p) err := c.processFilters(p)
require.True(t, errors.Is(err, tc.err), "got: %v", err) require.True(t, errors.Is(err, tc.err), "got: %v", err)
}) })
@ -93,87 +93,87 @@ func TestFilter_MatchSimple(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
ok bool ok bool
f *netmap.Filter f *Filter
}{ }{
{ {
"GE_true", true, "GE_true", true,
newFilter("Main", "Rating", "4", netmap.GE), newFilter("Main", "Rating", "4", OpGE),
}, },
{ {
"GE_false", false, "GE_false", false,
newFilter("Main", "Rating", "5", netmap.GE), newFilter("Main", "Rating", "5", OpGE),
}, },
{ {
"GT_true", true, "GT_true", true,
newFilter("Main", "Rating", "3", netmap.GT), newFilter("Main", "Rating", "3", OpGT),
}, },
{ {
"GT_false", false, "GT_false", false,
newFilter("Main", "Rating", "4", netmap.GT), newFilter("Main", "Rating", "4", OpGT),
}, },
{ {
"LE_true", true, "LE_true", true,
newFilter("Main", "Rating", "4", netmap.LE), newFilter("Main", "Rating", "4", OpLE),
}, },
{ {
"LE_false", false, "LE_false", false,
newFilter("Main", "Rating", "3", netmap.LE), newFilter("Main", "Rating", "3", OpLE),
}, },
{ {
"LT_true", true, "LT_true", true,
newFilter("Main", "Rating", "5", netmap.LT), newFilter("Main", "Rating", "5", OpLT),
}, },
{ {
"LT_false", false, "LT_false", false,
newFilter("Main", "Rating", "4", netmap.LT), newFilter("Main", "Rating", "4", OpLT),
}, },
{ {
"EQ_true", true, "EQ_true", true,
newFilter("Main", "Country", "Germany", netmap.EQ), newFilter("Main", "Country", "Germany", OpEQ),
}, },
{ {
"EQ_false", false, "EQ_false", false,
newFilter("Main", "Country", "China", netmap.EQ), newFilter("Main", "Country", "China", OpEQ),
}, },
{ {
"NE_true", true, "NE_true", true,
newFilter("Main", "Country", "France", netmap.NE), newFilter("Main", "Country", "France", OpNE),
}, },
{ {
"NE_false", false, "NE_false", false,
newFilter("Main", "Country", "Germany", netmap.NE), newFilter("Main", "Country", "Germany", OpNE),
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
c := NewContext(new(Netmap)) c := NewContext(new(Netmap))
p := newPlacementPolicy(1, nil, nil, []*netmap.Filter{tc.f}) p := newPlacementPolicy(1, nil, nil, []*Filter{tc.f})
require.NoError(t, c.processFilters(p)) require.NoError(t, c.processFilters(p))
require.Equal(t, tc.ok, c.match(tc.f, b)) require.Equal(t, tc.ok, c.match(tc.f, b))
} }
t.Run("InvalidOp", func(t *testing.T) { t.Run("InvalidOp", func(t *testing.T) {
f := newFilter("Main", "Rating", "5", netmap.EQ) f := newFilter("Main", "Rating", "5", OpEQ)
c := NewContext(new(Netmap)) c := NewContext(new(Netmap))
p := newPlacementPolicy(1, nil, nil, []*netmap.Filter{f}) p := newPlacementPolicy(1, nil, nil, []*Filter{f})
require.NoError(t, c.processFilters(p)) require.NoError(t, c.processFilters(p))
// just for the coverage // just for the coverage
f.SetOp(netmap.UnspecifiedOperation) f.SetOperation(0)
require.False(t, c.match(f, b)) require.False(t, c.match(f, b))
}) })
} }
func TestFilter_Match(t *testing.T) { func TestFilter_Match(t *testing.T) {
fs := []*netmap.Filter{ fs := []*Filter{
newFilter("StorageSSD", "Storage", "SSD", netmap.EQ), newFilter("StorageSSD", "Storage", "SSD", OpEQ),
newFilter("GoodRating", "Rating", "4", netmap.GE), newFilter("GoodRating", "Rating", "4", OpGE),
newFilter("Main", "", "", netmap.AND, newFilter("Main", "", "", OpAND,
newFilter("StorageSSD", "", "", 0), newFilter("StorageSSD", "", "", 0),
newFilter("", "IntField", "123", netmap.LT), newFilter("", "IntField", "123", OpLT),
newFilter("GoodRating", "", "", 0), newFilter("GoodRating", "", "", 0),
newFilter("", "", "", netmap.OR, newFilter("", "", "", OpOR,
newFilter("", "Param", "Value1", netmap.EQ), newFilter("", "Param", "Value1", OpEQ),
newFilter("", "Param", "Value2", netmap.EQ), newFilter("", "Param", "Value2", OpEQ),
)), )),
} }
c := NewContext(new(Netmap)) c := NewContext(new(Netmap))

View file

@ -1,21 +1,17 @@
package netmap package netmap
import ( func newFilter(name string, k, v string, op Operation, fs ...*Filter) *Filter {
"github.com/nspcc-dev/neofs-api-go/v2/netmap" f := NewFilter()
)
func newFilter(name string, k, v string, op netmap.Operation, fs ...*netmap.Filter) *netmap.Filter {
f := new(netmap.Filter)
f.SetName(name) f.SetName(name)
f.SetKey(k) f.SetKey(k)
f.SetOp(op) f.SetOperation(op)
f.SetValue(v) f.SetValue(v)
f.SetFilters(fs) f.SetInnerFilters(fs...)
return f return f
} }
func newSelector(name string, attr string, c netmap.Clause, count uint32, filter string) *netmap.Selector { func newSelector(name string, attr string, c Clause, count uint32, filter string) *Selector {
s := new(netmap.Selector) s := NewSelector()
s.SetName(name) s.SetName(name)
s.SetAttribute(attr) s.SetAttribute(attr)
s.SetCount(count) s.SetCount(count)
@ -24,32 +20,32 @@ func newSelector(name string, attr string, c netmap.Clause, count uint32, filter
return s return s
} }
func newPlacementPolicy(bf uint32, rs []*netmap.Replica, ss []*netmap.Selector, fs []*netmap.Filter) *netmap.PlacementPolicy { func newPlacementPolicy(bf uint32, rs []*Replica, ss []*Selector, fs []*Filter) *PlacementPolicy {
p := new(netmap.PlacementPolicy) p := NewPlacementPolicy()
p.SetContainerBackupFactor(bf) p.SetContainerBackupFactor(bf)
p.SetReplicas(rs) p.SetReplicas(rs...)
p.SetSelectors(ss) p.SetSelectors(ss...)
p.SetFilters(fs) p.SetFilters(fs...)
return p return p
} }
func newReplica(c uint32, s string) *netmap.Replica { func newReplica(c uint32, s string) *Replica {
r := new(netmap.Replica) r := NewReplica()
r.SetCount(c) r.SetCount(c)
r.SetSelector(s) r.SetSelector(s)
return r return r
} }
func nodeInfoFromAttributes(props ...string) netmap.NodeInfo { func nodeInfoFromAttributes(props ...string) NodeInfo {
attrs := make([]*netmap.Attribute, len(props)/2) attrs := make([]*NodeAttribute, len(props)/2)
for i := range attrs { for i := range attrs {
attrs[i] = new(netmap.Attribute) attrs[i] = NewNodeAttribute()
attrs[i].SetKey(props[i*2]) attrs[i].SetKey(props[i*2])
attrs[i].SetValue(props[i*2+1]) attrs[i].SetValue(props[i*2+1])
} }
var n netmap.NodeInfo n := NewNodeInfo()
n.SetAttributes(attrs) n.SetAttributes(attrs...)
return n return *n
} }
func getTestNode(props ...string) *Node { func getTestNode(props ...string) *Node {

View file

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"github.com/nspcc-dev/hrw" "github.com/nspcc-dev/hrw"
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
) )
// Netmap represents netmap which contains preprocessed nodes. // Netmap represents netmap which contains preprocessed nodes.
@ -43,7 +42,7 @@ func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes,
// GetContainerNodes returns nodes corresponding to each replica. // GetContainerNodes returns nodes corresponding to each replica.
// Order of returned nodes corresponds to order of replicas in p. // Order of returned nodes corresponds to order of replicas in p.
// pivot is a seed for HRW sorting. // pivot is a seed for HRW sorting.
func (m *Netmap) GetContainerNodes(p *netmap.PlacementPolicy, pivot []byte) (ContainerNodes, error) { func (m *Netmap) GetContainerNodes(p *PlacementPolicy, pivot []byte) (ContainerNodes, error) {
c := NewContext(m) c := NewContext(m)
c.setPivot(pivot) c.setPivot(pivot)
if err := c.processFilters(p); err != nil { if err := c.processFilters(p); err != nil {
@ -52,19 +51,19 @@ func (m *Netmap) GetContainerNodes(p *netmap.PlacementPolicy, pivot []byte) (Con
if err := c.processSelectors(p); err != nil { if err := c.processSelectors(p); err != nil {
return nil, err return nil, err
} }
result := make([]Nodes, len(p.GetReplicas())) result := make([]Nodes, len(p.Replicas()))
for i, r := range p.GetReplicas() { for i, r := range p.Replicas() {
if r == nil { if r == nil {
return nil, fmt.Errorf("%w: REPLICA", ErrMissingField) return nil, fmt.Errorf("%w: REPLICA", ErrMissingField)
} }
if r.GetSelector() == "" { if r.Selector() == "" {
for _, s := range p.GetSelectors() { for _, s := range p.Selectors() {
result[i] = append(result[i], flattenNodes(c.Selections[s.GetName()])...) result[i] = append(result[i], flattenNodes(c.Selections[s.Name()])...)
} }
} }
nodes, ok := c.Selections[r.GetSelector()] nodes, ok := c.Selections[r.Selector()]
if !ok { if !ok {
return nil, fmt.Errorf("%w: REPLICA '%s'", ErrSelectorNotFound, r.GetSelector()) return nil, fmt.Errorf("%w: REPLICA '%s'", ErrSelectorNotFound, r.Selector())
} }
result[i] = append(result[i], flattenNodes(nodes)...) result[i] = append(result[i], flattenNodes(nodes)...)

View file

@ -16,7 +16,7 @@ type (
Price uint64 Price uint64
AttrMap map[string]string AttrMap map[string]string
InfoV2 *netmap.NodeInfo *NodeInfo
} }
// Nodes represents slice of graph leafs. // Nodes represents slice of graph leafs.
@ -57,19 +57,8 @@ func (n Node) Hash() uint64 {
return n.ID return n.ID
} }
// NetworkAddress returns network address // NodesFromInfo converts slice of NodeInfo to a generic node slice.
// of the node in a string format. func NodesFromInfo(infos []NodeInfo) Nodes {
func (n Node) NetworkAddress() string {
return n.InfoV2.GetAddress()
}
// PublicKey returns public key of the node in bytes.
func (n Node) PublicKey() []byte {
return n.InfoV2.GetPublicKey()
}
// NodesFromV2 converts slice of v2 netmap.NodeInfo to a generic node slice.
func NodesFromV2(infos []netmap.NodeInfo) Nodes {
nodes := make(Nodes, len(infos)) nodes := make(Nodes, len(infos))
for i := range infos { for i := range infos {
nodes[i] = newNodeV2(i, &infos[i]) nodes[i] = newNodeV2(i, &infos[i])
@ -77,21 +66,21 @@ func NodesFromV2(infos []netmap.NodeInfo) Nodes {
return nodes return nodes
} }
func newNodeV2(index int, ni *netmap.NodeInfo) *Node { func newNodeV2(index int, ni *NodeInfo) *Node {
n := &Node{ n := &Node{
ID: hrw.Hash(ni.GetPublicKey()), ID: hrw.Hash(ni.PublicKey()),
Index: index, Index: index,
AttrMap: make(map[string]string, len(ni.GetAttributes())), AttrMap: make(map[string]string, len(ni.Attributes())),
InfoV2: ni, NodeInfo: ni,
} }
for _, attr := range ni.GetAttributes() { for _, attr := range ni.Attributes() {
switch attr.GetKey() { switch attr.Key() {
case CapacityAttr: case CapacityAttr:
n.Capacity, _ = strconv.ParseUint(attr.GetValue(), 10, 64) n.Capacity, _ = strconv.ParseUint(attr.Value(), 10, 64)
case PriceAttr: case PriceAttr:
n.Price, _ = strconv.ParseUint(attr.GetValue(), 10, 64) n.Price, _ = strconv.ParseUint(attr.Value(), 10, 64)
} }
n.AttrMap[attr.GetKey()] = attr.GetValue() n.AttrMap[attr.Key()] = attr.Value()
} }
return n return n
} }

View file

@ -7,19 +7,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestNode_NetworkAddress(t *testing.T) {
addr := "127.0.0.1:8080"
nV2 := new(netmap.NodeInfo)
nV2.SetAddress(addr)
n := Node{
InfoV2: nV2,
}
require.Equal(t, addr, n.NetworkAddress())
}
func TestNodeStateFromV2(t *testing.T) { func TestNodeStateFromV2(t *testing.T) {
for _, item := range []struct { for _, item := range []struct {
s NodeState s NodeState

View file

@ -12,48 +12,48 @@ import (
type Selector netmap.Selector type Selector netmap.Selector
// processSelectors processes selectors and returns error is any of them is invalid. // processSelectors processes selectors and returns error is any of them is invalid.
func (c *Context) processSelectors(p *netmap.PlacementPolicy) error { func (c *Context) processSelectors(p *PlacementPolicy) error {
for _, s := range p.GetSelectors() { for _, s := range p.Selectors() {
if s == nil { if s == nil {
return fmt.Errorf("%w: SELECT", ErrMissingField) return fmt.Errorf("%w: SELECT", ErrMissingField)
} else if s.GetFilter() != MainFilterName { } else if s.Filter() != MainFilterName {
_, ok := c.Filters[s.GetFilter()] _, ok := c.Filters[s.Filter()]
if !ok { if !ok {
return fmt.Errorf("%w: SELECT FROM '%s'", ErrFilterNotFound, s.GetFilter()) return fmt.Errorf("%w: SELECT FROM '%s'", ErrFilterNotFound, s.Filter())
} }
} }
c.Selectors[s.GetName()] = s c.Selectors[s.Name()] = s
result, err := c.getSelection(p, s) result, err := c.getSelection(p, s)
if err != nil { if err != nil {
return err return err
} }
c.Selections[s.GetName()] = result c.Selections[s.Name()] = result
} }
return nil return nil
} }
// GetNodesCount returns amount of buckets and nodes in every bucket // GetNodesCount returns amount of buckets and nodes in every bucket
// for a given placement policy. // for a given placement policy.
func GetNodesCount(p *netmap.PlacementPolicy, s *netmap.Selector) (int, int) { func GetNodesCount(p *PlacementPolicy, s *Selector) (int, int) {
switch s.GetClause() { switch s.Clause() {
case netmap.Same: case ClauseSame:
return 1, int(p.GetContainerBackupFactor() * s.GetCount()) return 1, int(p.ContainerBackupFactor() * s.Count())
default: default:
return int(s.GetCount()), int(p.GetContainerBackupFactor()) return int(s.Count()), int(p.ContainerBackupFactor())
} }
} }
// getSelection returns nodes grouped by s.attribute. // getSelection returns nodes grouped by s.attribute.
func (c *Context) getSelection(p *netmap.PlacementPolicy, s *netmap.Selector) ([]Nodes, error) { func (c *Context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error) {
bucketCount, nodesInBucket := GetNodesCount(p, s) bucketCount, nodesInBucket := GetNodesCount(p, s)
buckets := c.getSelectionBase(s) buckets := c.getSelectionBase(s)
if len(buckets) < bucketCount { if len(buckets) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.GetName()) return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name())
} }
if len(c.pivot) == 0 { if len(c.pivot) == 0 {
// Deterministic order in case of zero seed. // Deterministic order in case of zero seed.
if s.GetAttribute() == "" { if s.Attribute() == "" {
sort.Slice(buckets, func(i, j int) bool { sort.Slice(buckets, func(i, j int) bool {
return buckets[i].nodes[0].ID < buckets[j].nodes[0].ID return buckets[i].nodes[0].ID < buckets[j].nodes[0].ID
}) })
@ -72,7 +72,7 @@ func (c *Context) getSelection(p *netmap.PlacementPolicy, s *netmap.Selector) ([
} }
} }
if len(nodes) < bucketCount { if len(nodes) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.GetName()) return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name())
} }
if len(c.pivot) != 0 { if len(c.pivot) != 0 {
weights := make([]float64, len(nodes)) weights := make([]float64, len(nodes))
@ -91,12 +91,12 @@ type nodeAttrPair struct {
// getSelectionBase returns nodes grouped by selector attribute. // getSelectionBase returns nodes grouped by selector attribute.
// It it guaranteed that each pair will contain at least one node. // It it guaranteed that each pair will contain at least one node.
func (c *Context) getSelectionBase(s *netmap.Selector) []nodeAttrPair { func (c *Context) getSelectionBase(s *Selector) []nodeAttrPair {
f := c.Filters[s.GetFilter()] f := c.Filters[s.Filter()]
isMain := s.GetFilter() == MainFilterName isMain := s.Filter() == MainFilterName
result := []nodeAttrPair{} result := []nodeAttrPair{}
nodeMap := map[string]Nodes{} nodeMap := map[string]Nodes{}
attr := s.GetAttribute() attr := s.Attribute()
for i := range c.Netmap.Nodes { for i := range c.Netmap.Nodes {
if isMain || c.match(f, c.Netmap.Nodes[i]) { if isMain || c.match(f, c.Netmap.Nodes[i]) {
if attr == "" { if attr == "" {

View file

@ -11,20 +11,20 @@ import (
func TestPlacementPolicy_UnspecifiedClause(t *testing.T) { func TestPlacementPolicy_UnspecifiedClause(t *testing.T) {
p := newPlacementPolicy(1, p := newPlacementPolicy(1,
[]*netmap.Replica{newReplica(1, "X")}, []*Replica{newReplica(1, "X")},
[]*netmap.Selector{ []*Selector{
newSelector("X", "", netmap.Distinct, 4, "*"), newSelector("X", "", ClauseDistinct, 4, "*"),
}, },
nil, nil,
) )
nodes := []netmap.NodeInfo{ nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "RU", "City", "St.Petersburg", "SSD", "0"), nodeInfoFromAttributes("ID", "1", "Country", "RU", "City", "St.Petersburg", "SSD", "0"),
nodeInfoFromAttributes("ID", "2", "Country", "RU", "City", "St.Petersburg", "SSD", "1"), nodeInfoFromAttributes("ID", "2", "Country", "RU", "City", "St.Petersburg", "SSD", "1"),
nodeInfoFromAttributes("ID", "3", "Country", "RU", "City", "Moscow", "SSD", "1"), nodeInfoFromAttributes("ID", "3", "Country", "RU", "City", "Moscow", "SSD", "1"),
nodeInfoFromAttributes("ID", "4", "Country", "RU", "City", "Moscow", "SSD", "1"), nodeInfoFromAttributes("ID", "4", "Country", "RU", "City", "Moscow", "SSD", "1"),
} }
nm, err := NewNetmap(NodesFromV2(nodes)) nm, err := NewNetmap(NodesFromInfo(nodes))
require.NoError(t, err) require.NoError(t, err)
v, err := nm.GetContainerNodes(p, nil) v, err := nm.GetContainerNodes(p, nil)
require.NoError(t, err) require.NoError(t, err)
@ -33,24 +33,24 @@ func TestPlacementPolicy_UnspecifiedClause(t *testing.T) {
func TestPlacementPolicy_GetPlacementVectors(t *testing.T) { func TestPlacementPolicy_GetPlacementVectors(t *testing.T) {
p := newPlacementPolicy(2, p := newPlacementPolicy(2,
[]*netmap.Replica{ []*Replica{
newReplica(1, "SPB"), newReplica(1, "SPB"),
newReplica(2, "Americas"), newReplica(2, "Americas"),
}, },
[]*netmap.Selector{ []*Selector{
newSelector("SPB", "City", netmap.Same, 1, "SPBSSD"), newSelector("SPB", "City", ClauseSame, 1, "SPBSSD"),
newSelector("Americas", "City", netmap.Distinct, 2, "Americas"), newSelector("Americas", "City", ClauseDistinct, 2, "Americas"),
}, },
[]*netmap.Filter{ []*Filter{
newFilter("SPBSSD", "", "", netmap.AND, newFilter("SPBSSD", "", "", OpAND,
newFilter("", "Country", "RU", netmap.EQ), newFilter("", "Country", "RU", OpEQ),
newFilter("", "City", "St.Petersburg", netmap.EQ), newFilter("", "City", "St.Petersburg", OpEQ),
newFilter("", "SSD", "1", netmap.EQ)), newFilter("", "SSD", "1", OpEQ)),
newFilter("Americas", "", "", netmap.OR, newFilter("Americas", "", "", OpOR,
newFilter("", "Continent", "NA", netmap.EQ), newFilter("", "Continent", "NA", OpEQ),
newFilter("", "Continent", "SA", netmap.EQ)), newFilter("", "Continent", "SA", OpEQ)),
}) })
nodes := []netmap.NodeInfo{ nodes := []NodeInfo{
nodeInfoFromAttributes("ID", "1", "Country", "RU", "City", "St.Petersburg", "SSD", "0"), nodeInfoFromAttributes("ID", "1", "Country", "RU", "City", "St.Petersburg", "SSD", "0"),
nodeInfoFromAttributes("ID", "2", "Country", "RU", "City", "St.Petersburg", "SSD", "1"), nodeInfoFromAttributes("ID", "2", "Country", "RU", "City", "St.Petersburg", "SSD", "1"),
nodeInfoFromAttributes("ID", "3", "Country", "RU", "City", "Moscow", "SSD", "1"), nodeInfoFromAttributes("ID", "3", "Country", "RU", "City", "Moscow", "SSD", "1"),
@ -66,7 +66,7 @@ func TestPlacementPolicy_GetPlacementVectors(t *testing.T) {
nodeInfoFromAttributes("ID", "13", "Continent", "SA", "City", "Lima"), nodeInfoFromAttributes("ID", "13", "Continent", "SA", "City", "Lima"),
} }
nm, err := NewNetmap(NodesFromV2(nodes)) nm, err := NewNetmap(NodesFromInfo(nodes))
require.NoError(t, err) require.NoError(t, err)
v, err := nm.GetContainerNodes(p, nil) v, err := nm.GetContainerNodes(p, nil)
require.NoError(t, err) require.NoError(t, err)
@ -94,17 +94,17 @@ func TestPlacementPolicy_GetPlacementVectors(t *testing.T) {
func TestPlacementPolicy_ProcessSelectors(t *testing.T) { func TestPlacementPolicy_ProcessSelectors(t *testing.T) {
p := newPlacementPolicy(2, nil, p := newPlacementPolicy(2, nil,
[]*netmap.Selector{ []*Selector{
newSelector("SameRU", "City", netmap.Same, 2, "FromRU"), newSelector("SameRU", "City", ClauseSame, 2, "FromRU"),
newSelector("DistinctRU", "City", netmap.Distinct, 2, "FromRU"), newSelector("DistinctRU", "City", ClauseDistinct, 2, "FromRU"),
newSelector("Good", "Country", netmap.Distinct, 2, "Good"), newSelector("Good", "Country", ClauseDistinct, 2, "Good"),
newSelector("Main", "Country", netmap.Distinct, 3, "*"), newSelector("Main", "Country", ClauseDistinct, 3, "*"),
}, },
[]*netmap.Filter{ []*Filter{
newFilter("FromRU", "Country", "Russia", netmap.EQ), newFilter("FromRU", "Country", "Russia", OpEQ),
newFilter("Good", "Rating", "4", netmap.GE), newFilter("Good", "Rating", "4", OpGE),
}) })
nodes := []netmap.NodeInfo{ nodes := []NodeInfo{
nodeInfoFromAttributes("Country", "Russia", "Rating", "1", "City", "SPB"), nodeInfoFromAttributes("Country", "Russia", "Rating", "1", "City", "SPB"),
nodeInfoFromAttributes("Country", "Germany", "Rating", "5", "City", "Berlin"), nodeInfoFromAttributes("Country", "Germany", "Rating", "5", "City", "Berlin"),
nodeInfoFromAttributes("Country", "Russia", "Rating", "6", "City", "Moscow"), nodeInfoFromAttributes("Country", "Russia", "Rating", "6", "City", "Moscow"),
@ -118,22 +118,22 @@ func TestPlacementPolicy_ProcessSelectors(t *testing.T) {
nodeInfoFromAttributes("Country", "Russia", "Rating", "9", "City", "SPB"), nodeInfoFromAttributes("Country", "Russia", "Rating", "9", "City", "SPB"),
} }
nm, err := NewNetmap(NodesFromV2(nodes)) nm, err := NewNetmap(NodesFromInfo(nodes))
require.NoError(t, err) require.NoError(t, err)
c := NewContext(nm) c := NewContext(nm)
require.NoError(t, c.processFilters(p)) require.NoError(t, c.processFilters(p))
require.NoError(t, c.processSelectors(p)) require.NoError(t, c.processSelectors(p))
for _, s := range p.GetSelectors() { for _, s := range p.Selectors() {
sel := c.Selections[s.GetName()] sel := c.Selections[s.Name()]
s := c.Selectors[s.GetName()] s := c.Selectors[s.Name()]
bucketCount, nodesInBucket := GetNodesCount(p, s) bucketCount, nodesInBucket := GetNodesCount(p, s)
targ := fmt.Sprintf("selector '%s'", s.GetName()) targ := fmt.Sprintf("selector '%s'", s.Name())
require.Equal(t, bucketCount, len(sel), targ) require.Equal(t, bucketCount, len(sel), targ)
for _, res := range sel { for _, res := range sel {
require.Equal(t, nodesInBucket, len(res), targ) require.Equal(t, nodesInBucket, len(res), targ)
for j := range res { for j := range res {
require.True(t, c.applyFilter(s.GetFilter(), res[j]), targ) require.True(t, c.applyFilter(s.Filter(), res[j]), targ)
} }
} }
} }
@ -142,12 +142,12 @@ func TestPlacementPolicy_ProcessSelectors(t *testing.T) {
func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) { func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) {
p := newPlacementPolicy(1, nil, p := newPlacementPolicy(1, nil,
[]*netmap.Selector{ []*Selector{
newSelector("Main", "Country", netmap.Distinct, 3, "*"), newSelector("Main", "Country", ClauseDistinct, 3, "*"),
}, nil) }, nil)
// bucket weight order: RU > DE > FR // bucket weight order: RU > DE > FR
nodes := []netmap.NodeInfo{ nodes := []NodeInfo{
nodeInfoFromAttributes("Country", "Germany", PriceAttr, "2", CapacityAttr, "10000"), nodeInfoFromAttributes("Country", "Germany", PriceAttr, "2", CapacityAttr, "10000"),
nodeInfoFromAttributes("Country", "Germany", PriceAttr, "4", CapacityAttr, "1"), nodeInfoFromAttributes("Country", "Germany", PriceAttr, "4", CapacityAttr, "1"),
nodeInfoFromAttributes("Country", "France", PriceAttr, "3", CapacityAttr, "10"), nodeInfoFromAttributes("Country", "France", PriceAttr, "3", CapacityAttr, "10"),
@ -158,7 +158,7 @@ func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) {
nodeInfoFromAttributes("Country", "France", PriceAttr, "7", CapacityAttr, "10000"), nodeInfoFromAttributes("Country", "France", PriceAttr, "7", CapacityAttr, "10000"),
nodeInfoFromAttributes("Country", "Russia", PriceAttr, "2", CapacityAttr, "1"), nodeInfoFromAttributes("Country", "Russia", PriceAttr, "2", CapacityAttr, "1"),
} }
nm, err := NewNetmap(NodesFromV2(nodes)) nm, err := NewNetmap(NodesFromInfo(nodes))
require.NoError(t, err) require.NoError(t, err)
c := NewContext(nm) c := NewContext(nm)
c.setPivot([]byte("containerID")) c.setPivot([]byte("containerID"))
@ -189,46 +189,46 @@ func TestPlacementPolicy_ProcessSelectorsHRW(t *testing.T) {
func TestPlacementPolicy_ProcessSelectorsInvalid(t *testing.T) { func TestPlacementPolicy_ProcessSelectorsInvalid(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
p *netmap.PlacementPolicy p *PlacementPolicy
err error err error
}{ }{
{ {
"MissingSelector", "MissingSelector",
newPlacementPolicy(2, nil, newPlacementPolicy(2, nil,
[]*netmap.Selector{nil}, []*Selector{nil},
[]*netmap.Filter{}), []*Filter{}),
ErrMissingField, ErrMissingField,
}, },
{ {
"InvalidFilterReference", "InvalidFilterReference",
newPlacementPolicy(1, nil, newPlacementPolicy(1, nil,
[]*netmap.Selector{newSelector("MyStore", "Country", netmap.Distinct, 1, "FromNL")}, []*Selector{newSelector("MyStore", "Country", ClauseDistinct, 1, "FromNL")},
[]*netmap.Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}), []*Filter{newFilter("FromRU", "Country", "Russia", OpEQ)}),
ErrFilterNotFound, ErrFilterNotFound,
}, },
{ {
"NotEnoughNodes (backup factor)", "NotEnoughNodes (backup factor)",
newPlacementPolicy(2, nil, newPlacementPolicy(2, nil,
[]*netmap.Selector{newSelector("MyStore", "Country", netmap.Distinct, 1, "FromRU")}, []*Selector{newSelector("MyStore", "Country", ClauseDistinct, 1, "FromRU")},
[]*netmap.Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}), []*Filter{newFilter("FromRU", "Country", "Russia", OpEQ)}),
ErrNotEnoughNodes, ErrNotEnoughNodes,
}, },
{ {
"NotEnoughNodes (buckets)", "NotEnoughNodes (buckets)",
newPlacementPolicy(1, nil, newPlacementPolicy(1, nil,
[]*netmap.Selector{newSelector("MyStore", "Country", netmap.Distinct, 2, "FromRU")}, []*Selector{newSelector("MyStore", "Country", ClauseDistinct, 2, "FromRU")},
[]*netmap.Filter{newFilter("FromRU", "Country", "Russia", netmap.EQ)}), []*Filter{newFilter("FromRU", "Country", "Russia", OpEQ)}),
ErrNotEnoughNodes, ErrNotEnoughNodes,
}, },
} }
nodes := []netmap.NodeInfo{ nodes := []NodeInfo{
nodeInfoFromAttributes("Country", "Russia"), nodeInfoFromAttributes("Country", "Russia"),
nodeInfoFromAttributes("Country", "Germany"), nodeInfoFromAttributes("Country", "Germany"),
nodeInfoFromAttributes(), nodeInfoFromAttributes(),
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
nm, err := NewNetmap(NodesFromV2(nodes)) nm, err := NewNetmap(NodesFromInfo(nodes))
require.NoError(t, err) require.NoError(t, err)
c := NewContext(nm) c := NewContext(nm)
require.NoError(t, c.processFilters(tc.p)) require.NoError(t, c.processFilters(tc.p))