[#227] netmap: Do not use intermediate types for placement

Support preprocessing within the `NodeInfo` type. Provide methods for
placement directly from the `NodeInfo` type. Returns slice of slices of
`NodeInfo` from placement methods of `Netmap`. Remove no longer needed
`Node` and `Nodes` types.

```
name            old time/op    new time/op    delta
ManySelects-12    19.7µs ±14%    15.8µs ±15%  -19.70%  (p=0.000 n=20+20)

name            old alloc/op   new alloc/op   delta
ManySelects-12    8.65kB ± 0%    6.22kB ± 0%  -28.03%  (p=0.000 n=20+20)

name            old allocs/op  new allocs/op  delta
ManySelects-12      82.0 ± 0%      81.0 ± 0%   -1.22%  (p=0.000 n=20+20)
```

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2022-06-07 05:12:39 +03:00 committed by LeL
parent 2b21146185
commit 723ba5ee45
12 changed files with 305 additions and 246 deletions

View file

@ -57,7 +57,7 @@ type (
}
// weightFunc calculates n's weight.
weightFunc = func(n *Node) float64
weightFunc = func(NodeInfo) float64
)
var (
@ -76,8 +76,8 @@ var (
// newWeightFunc returns weightFunc which multiplies normalized
// capacity and price.
func newWeightFunc(capNorm, priceNorm normalizer) weightFunc {
return func(n *Node) float64 {
return capNorm.Normalize(float64(n.Capacity)) * priceNorm.Normalize(float64(n.Price))
return func(n NodeInfo) float64 {
return capNorm.Normalize(float64(n.capacity())) * priceNorm.Normalize(float64(n.price()))
}
}

View file

@ -1,19 +0,0 @@
package netmap
// ContainerNodes represents nodes in the container.
type ContainerNodes interface {
Replicas() []Nodes
Flatten() Nodes
}
type containerNodes []Nodes
// Flatten returns list of all nodes from the container.
func (c containerNodes) Flatten() Nodes {
return flattenNodes(c)
}
// Replicas return list of container replicas.
func (c containerNodes) Replicas() []Nodes {
return c
}

View file

@ -15,7 +15,7 @@ type context struct {
// Selectors stores processed selectors.
Selectors map[string]*Selector
// Selections stores result of selector processing.
Selections map[string][]Nodes
Selections map[string][]nodes
// numCache stores parsed numeric values.
numCache map[string]uint64
@ -54,11 +54,11 @@ func newContext(nm *Netmap) *context {
Netmap: nm,
Filters: make(map[string]*Filter),
Selectors: make(map[string]*Selector),
Selections: make(map[string][]Nodes),
Selections: make(map[string][]nodes),
numCache: make(map[string]uint64),
aggregator: newMeanIQRAgg,
weightFunc: GetDefaultWeightFunc(nm.Nodes),
weightFunc: GetDefaultWeightFunc(nm.nodes),
cbf: defaultCBF,
}
}
@ -79,13 +79,13 @@ func (c *context) setCBF(cbf uint32) {
}
// GetDefaultWeightFunc returns default weighting function.
func GetDefaultWeightFunc(ns Nodes) weightFunc {
func GetDefaultWeightFunc(ns nodes) weightFunc {
mean := newMeanAgg()
min := newMinAgg()
for i := range ns {
mean.Add(float64(ns[i].Capacity))
min.Add(float64(ns[i].Price))
mean.Add(float64(ns[i].capacity()))
min.Add(float64(ns[i].price()))
}
return newWeightFunc(

View file

@ -15,7 +15,7 @@ type Filter netmap.Filter
const MainFilterName = "*"
// applyFilter applies named filter to b.
func (c *context) applyFilter(name string, b *Node) bool {
func (c *context) applyFilter(name string, b NodeInfo) bool {
return name == MainFilterName || c.match(c.Filters[name], b)
}
@ -86,7 +86,7 @@ func (c *context) processFilter(f *Filter, top bool) error {
// match matches f against b. It returns no errors because
// filter should have been parsed during context creation
// and missing node properties are considered as a regular fail.
func (c *context) match(f *Filter, b *Node) bool {
func (c *context) match(f *Filter, b NodeInfo) bool {
switch f.Operation() {
case OpAND, OpOR:
for _, lf := range f.InnerFilters() {
@ -106,24 +106,24 @@ func (c *context) match(f *Filter, b *Node) bool {
}
}
func (c *context) matchKeyValue(f *Filter, b *Node) bool {
func (c *context) matchKeyValue(f *Filter, b NodeInfo) bool {
switch f.Operation() {
case OpEQ:
return b.Attribute(f.Key()) == f.Value()
return b.attribute(f.Key()) == f.Value()
case OpNE:
return b.Attribute(f.Key()) != f.Value()
return b.attribute(f.Key()) != f.Value()
default:
var attr uint64
switch f.Key() {
case AttrPrice:
attr = b.Price
attr = b.price()
case AttrCapacity:
attr = b.Capacity
attr = b.capacity()
default:
var err error
attr, err = strconv.ParseUint(b.Attribute(f.Key()), 10, 64)
attr, err = strconv.ParseUint(b.attribute(f.Key()), 10, 64)
if err != nil {
// Note: because filters are somewhat independent from nodes attributes,
// We don't report an error here, and fail filter instead.

View file

@ -17,9 +17,8 @@ func TestContext_ProcessFilters(t *testing.T) {
newFilter("", "IntField", "123", OpLT),
newFilter("GoodRating", "", "", 0)),
}
nm, err := NewNetmap(nil)
require.NoError(t, err)
c := newContext(nm)
c := newContext(new(Netmap))
p := newPlacementPolicy(1, nil, nil, fs)
require.NoError(t, c.processFilters(p))
require.Equal(t, 3, len(c.Filters))
@ -81,10 +80,16 @@ func TestContext_ProcessFiltersInvalid(t *testing.T) {
}
func TestFilter_MatchSimple_InvalidOp(t *testing.T) {
b := &Node{AttrMap: map[string]string{
"Rating": "4",
"Country": "Germany",
}}
var aRating NodeAttribute
aRating.SetKey("Rating")
aRating.SetValue("4")
var aCountry NodeAttribute
aRating.SetKey("Country")
aRating.SetValue("Germany")
var b NodeInfo
b.SetAttributes(aRating, aCountry)
f := newFilter("Main", "Rating", "5", OpEQ)
c := newContext(new(Netmap))

View file

@ -26,7 +26,7 @@ type TestCase struct {
}
}
func compareNodes(t testing.TB, expected [][]int, nodes Nodes, actual []Nodes) {
func compareNodes(t testing.TB, expected [][]int, nodes nodes, actual [][]NodeInfo) {
require.Equal(t, len(expected), len(actual))
for i := range expected {
require.Equal(t, len(expected[i]), len(actual[i]))
@ -56,9 +56,8 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
copy(srcNodes, tc.Nodes)
t.Run(tc.Name, func(t *testing.T) {
nodes := NodesFromInfo(tc.Nodes)
nm, err := NewNetmap(nodes)
require.NoError(t, err)
var nm Netmap
nm.SetNodes(tc.Nodes)
for name, tt := range tc.Tests {
t.Run(name, func(t *testing.T) {
@ -70,13 +69,12 @@ func TestPlacementPolicy_Interopability(t *testing.T) {
require.NoError(t, err)
require.Equal(t, srcNodes, tc.Nodes)
res := v.Replicas()
compareNodes(t, tt.Result, nodes, res)
compareNodes(t, tt.Result, tc.Nodes, v)
if tt.Placement.Result != nil {
res, err := nm.GetPlacementVectors(v, tt.Placement.Pivot)
require.NoError(t, err)
compareNodes(t, tt.Placement.Result, nodes, res)
compareNodes(t, tt.Placement.Result, tc.Nodes, res)
require.Equal(t, srcNodes, tc.Nodes)
}
}
@ -103,8 +101,8 @@ func BenchmarkPlacementPolicyInteropability(b *testing.B) {
require.NoError(b, json.Unmarshal(bs, &tc), "cannot unmarshal %s", ds[i].Name())
b.Run(tc.Name, func(b *testing.B) {
nodes := NodesFromInfo(tc.Nodes)
nm, err := NewNetmap(nodes)
var nm Netmap
nm.SetNodes(tc.Nodes)
require.NoError(b, err)
for name, tt := range tc.Tests {
@ -121,15 +119,14 @@ func BenchmarkPlacementPolicyInteropability(b *testing.B) {
} else {
require.NoError(b, err)
res := v.Replicas()
compareNodes(b, tt.Result, nodes, res)
compareNodes(b, tt.Result, tc.Nodes, v)
if tt.Placement.Result != nil {
b.StartTimer()
res, err := nm.GetPlacementVectors(v, tt.Placement.Pivot)
b.StopTimer()
require.NoError(b, err)
compareNodes(b, tt.Placement.Result, nodes, res)
compareNodes(b, tt.Placement.Result, tc.Nodes, res)
}
}
}
@ -149,9 +146,8 @@ func BenchmarkManySelects(b *testing.B) {
tt, ok := tc.Tests["Select"]
require.True(b, ok)
nodes := NodesFromInfo(tc.Nodes)
nm, err := NewNetmap(nodes)
require.NoError(b, err)
var nm Netmap
nm.SetNodes(tc.Nodes)
b.ResetTimer()
b.ReportAllocs()

View file

@ -1,33 +1,140 @@
package netmap
import (
"bytes"
"fmt"
"strconv"
"github.com/nspcc-dev/hrw"
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
)
const defaultCBF = 3
var _, _ hrw.Hasher = NodeInfo{}, nodes{}
// Hash implements hrw.Hasher interface.
//
// Hash is needed to support weighted HRW therefore sort function sorts nodes
// based on their public key.
func (i NodeInfo) Hash() uint64 {
return hrw.Hash(i.m.GetPublicKey())
}
func (i NodeInfo) less(i2 NodeInfo) bool {
return bytes.Compare(i.PublicKey(), i2.PublicKey()) < 0
}
// attribute returns value of the node attribute by the given key. Returns empty
// string if attribute is missing.
//
// Method is needed to internal placement needs.
func (i NodeInfo) attribute(key string) string {
as := i.m.GetAttributes()
for j := range as {
if as[j].GetKey() == key {
return as[j].GetValue()
}
}
return ""
}
func (i *NodeInfo) syncAttributes() {
as := i.m.GetAttributes()
for j := range as {
switch as[j].GetKey() {
case AttrPrice:
i.priceAttr, _ = strconv.ParseUint(as[j].GetValue(), 10, 64)
case AttrCapacity:
i.capAttr, _ = strconv.ParseUint(as[j].GetValue(), 10, 64)
}
}
}
func (i *NodeInfo) setPrice(price uint64) {
i.priceAttr = price
as := i.m.GetAttributes()
for j := range as {
if as[j].GetKey() == AttrPrice {
as[j].SetValue(strconv.FormatUint(i.capAttr, 10))
return
}
}
as = append(as, netmap.Attribute{})
as[len(as)-1].SetKey(AttrPrice)
as[len(as)-1].SetValue(strconv.FormatUint(i.capAttr, 10))
i.m.SetAttributes(as)
}
func (i *NodeInfo) price() uint64 {
return i.priceAttr
}
func (i *NodeInfo) setCapacity(capacity uint64) {
i.capAttr = capacity
as := i.m.GetAttributes()
for j := range as {
if as[j].GetKey() == AttrCapacity {
as[j].SetValue(strconv.FormatUint(i.capAttr, 10))
return
}
}
as = append(as, netmap.Attribute{})
as[len(as)-1].SetKey(AttrCapacity)
as[len(as)-1].SetValue(strconv.FormatUint(i.capAttr, 10))
i.m.SetAttributes(as)
}
func (i NodeInfo) capacity() uint64 {
return i.capAttr
}
// Netmap represents netmap which contains preprocessed nodes.
type Netmap struct {
Nodes Nodes
nodes []NodeInfo
}
// NewNetmap constructs netmap from the list of raw nodes.
func NewNetmap(nodes Nodes) (*Netmap, error) {
return &Netmap{
Nodes: nodes,
}, nil
func (m *Netmap) SetNodes(nodes []NodeInfo) {
m.nodes = nodes
}
func flattenNodes(ns []Nodes) Nodes {
type nodes []NodeInfo
// Hash is a function from hrw.Hasher interface. It is implemented
// to support weighted hrw sorting of buckets. Each bucket is already sorted by hrw,
// thus giving us needed "randomness".
func (n nodes) Hash() uint64 {
if len(n) > 0 {
return n[0].Hash()
}
return 0
}
// weights returns slice of nodes weights W.
func (n nodes) weights(wf weightFunc) []float64 {
w := make([]float64, 0, len(n))
for i := range n {
w = append(w, wf(n[i]))
}
return w
}
func flattenNodes(ns []nodes) nodes {
var sz, i int
for i = range ns {
sz += len(ns[i])
}
result := make(Nodes, 0, sz)
result := make(nodes, 0, sz)
for i := range ns {
result = append(result, ns[i]...)
@ -37,15 +144,15 @@ func flattenNodes(ns []Nodes) Nodes {
}
// GetPlacementVectors returns placement vectors for an object given containerNodes cnt.
func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes, error) {
func (m *Netmap) GetPlacementVectors(vectors [][]NodeInfo, pivot []byte) ([][]NodeInfo, error) {
h := hrw.Hash(pivot)
wf := GetDefaultWeightFunc(m.Nodes)
result := make([]Nodes, len(cnt.Replicas()))
wf := GetDefaultWeightFunc(m.nodes)
result := make([][]NodeInfo, len(vectors))
for i, rep := range cnt.Replicas() {
result[i] = make(Nodes, len(rep))
copy(result[i], rep)
hrw.SortSliceByWeightValue(result[i], result[i].Weights(wf), h)
for i := range vectors {
result[i] = make([]NodeInfo, len(vectors[i]))
copy(result[i], vectors[i])
hrw.SortSliceByWeightValue(result[i], nodes(result[i]).weights(wf), h)
}
return result, nil
@ -54,7 +161,7 @@ func (m *Netmap) GetPlacementVectors(cnt ContainerNodes, pivot []byte) ([]Nodes,
// GetContainerNodes returns nodes corresponding to each replica.
// Order of returned nodes corresponds to order of replicas in p.
// pivot is a seed for HRW sorting.
func (m *Netmap) GetContainerNodes(p *PlacementPolicy, pivot []byte) (ContainerNodes, error) {
func (m *Netmap) GetContainerNodes(p *PlacementPolicy, pivot []byte) ([][]NodeInfo, error) {
c := newContext(m)
c.setPivot(pivot)
c.setCBF(p.ContainerBackupFactor())
@ -67,7 +174,7 @@ func (m *Netmap) GetContainerNodes(p *PlacementPolicy, pivot []byte) (ContainerN
return nil, err
}
result := make([]Nodes, len(p.Replicas()))
result := make([][]NodeInfo, len(p.Replicas()))
for i, r := range p.Replicas() {
if r.Selector() == "" {
@ -99,5 +206,5 @@ func (m *Netmap) GetContainerNodes(p *PlacementPolicy, pivot []byte) (ContainerN
result[i] = append(result[i], flattenNodes(nodes)...)
}
return containerNodes(result), nil
return result, nil
}

View file

@ -1,28 +1,9 @@
package netmap
import (
"strconv"
"github.com/nspcc-dev/hrw"
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
)
type (
// Node is a wrapper over NodeInfo.
Node struct {
ID uint64
Index int
Capacity uint64
Price uint64
AttrMap map[string]string
*NodeInfo
}
// Nodes represents slice of graph leafs.
Nodes []Node
)
// NodeState is an enumeration of various states of the NeoFS node.
type NodeState uint32
@ -30,7 +11,13 @@ type NodeState uint32
type NodeAttribute netmap.Attribute
// NodeInfo represents v2 compatible descriptor of the NeoFS node.
type NodeInfo netmap.NodeInfo
type NodeInfo struct {
priceAttr uint64
capAttr uint64
m *netmap.NodeInfo
}
const (
_ NodeState = iota
@ -87,76 +74,10 @@ const (
AttrContinent = "Continent"
)
var _ hrw.Hasher = (*Node)(nil)
// Hash is a function from hrw.Hasher interface. It is implemented
// to support weighted hrw therefore sort function sorts nodes
// based on their `N` value.
func (n Node) Hash() uint64 {
return n.ID
}
// Hash is a function from hrw.Hasher interface. It is implemented
// to support weighted hrw sorting of buckets. Each bucket is already sorted by hrw,
// thus giving us needed "randomness".
func (n Nodes) Hash() uint64 {
if len(n) > 0 {
return n[0].Hash()
}
return 0
}
// NodesFromInfo converts slice of NodeInfo to a generic node slice.
func NodesFromInfo(infos []NodeInfo) Nodes {
nodes := make(Nodes, len(infos))
for i := range infos {
nodes[i] = *newNodeV2(i, &infos[i])
}
return nodes
}
func newNodeV2(index int, ni *NodeInfo) *Node {
n := &Node{
ID: hrw.Hash(ni.PublicKey()),
Index: index,
AttrMap: make(map[string]string, len(ni.Attributes())),
NodeInfo: ni,
}
for _, attr := range ni.Attributes() {
switch attr.Key() {
case AttrCapacity:
n.Capacity, _ = strconv.ParseUint(attr.Value(), 10, 64)
case AttrPrice:
n.Price, _ = strconv.ParseUint(attr.Value(), 10, 64)
}
n.AttrMap[attr.Key()] = attr.Value()
}
return n
}
// Weights returns slice of nodes weights W.
func (n Nodes) Weights(wf weightFunc) []float64 {
w := make([]float64, 0, len(n))
for i := range n {
w = append(w, wf(&n[i]))
}
return w
}
// Attribute returns value of attribute k.
func (n *Node) Attribute(k string) string {
return n.AttrMap[k]
}
// GetBucketWeight computes weight for a Bucket.
func GetBucketWeight(ns Nodes, a aggregator, wf weightFunc) float64 {
func GetBucketWeight(ns nodes, a aggregator, wf weightFunc) float64 {
for i := range ns {
a.Add(wf(&ns[i]))
a.Add(wf(ns[i]))
}
return a.Compute()
@ -307,29 +228,41 @@ func NewNodeInfo() *NodeInfo {
//
// Nil netmap.NodeInfo converts to nil.
func NewNodeInfoFromV2(i *netmap.NodeInfo) *NodeInfo {
return (*NodeInfo)(i)
var res NodeInfo
res.m = i
res.syncAttributes()
return &res
}
// ToV2 converts NodeInfo to v2 NodeInfo.
//
// Nil NodeInfo converts to nil.
func (i *NodeInfo) ToV2() *netmap.NodeInfo {
return (*netmap.NodeInfo)(i)
if i == nil {
return nil
}
return i.m
}
// PublicKey returns public key of the node in a binary format.
func (i *NodeInfo) PublicKey() []byte {
return (*netmap.NodeInfo)(i).GetPublicKey()
return i.m.GetPublicKey()
}
// SetPublicKey sets public key of the node in a binary format.
func (i *NodeInfo) SetPublicKey(key []byte) {
(*netmap.NodeInfo)(i).SetPublicKey(key)
if i.m == nil {
i.m = new(netmap.NodeInfo)
}
i.m.SetPublicKey(key)
}
// NumberOfAddresses returns number of network addresses of the node.
func (i *NodeInfo) NumberOfAddresses() int {
return (*netmap.NodeInfo)(i).NumberOfAddresses()
return i.m.NumberOfAddresses()
}
// IterateAddresses iterates over network addresses of the node.
@ -337,7 +270,7 @@ func (i *NodeInfo) NumberOfAddresses() int {
//
// Handler should not be nil.
func (i *NodeInfo) IterateAddresses(f func(string) bool) {
(*netmap.NodeInfo)(i).IterateAddresses(f)
i.m.IterateAddresses(f)
}
// IterateAllAddresses is a helper function to unconditionally
@ -351,7 +284,11 @@ func IterateAllAddresses(i *NodeInfo, f func(string)) {
// SetAddresses sets list of network addresses of the node.
func (i *NodeInfo) SetAddresses(v ...string) {
(*netmap.NodeInfo)(i).SetAddresses(v...)
if i.m == nil {
i.m = new(netmap.NodeInfo)
}
i.m.SetAddresses(v...)
}
// Attributes returns list of the node attributes.
@ -360,7 +297,7 @@ func (i *NodeInfo) Attributes() []NodeAttribute {
return nil
}
as := (*netmap.NodeInfo)(i).GetAttributes()
as := i.m.GetAttributes()
if as == nil {
return nil
@ -383,38 +320,58 @@ func (i *NodeInfo) SetAttributes(as ...NodeAttribute) {
asV2[ind] = *as[ind].ToV2()
}
(*netmap.NodeInfo)(i).
SetAttributes(asV2)
if i.m == nil {
i.m = new(netmap.NodeInfo)
}
i.m.SetAttributes(asV2)
}
// State returns node state.
func (i *NodeInfo) State() NodeState {
return NodeStateFromV2(
(*netmap.NodeInfo)(i).GetState(),
)
return NodeStateFromV2(i.m.GetState())
}
// SetState sets node state.
func (i *NodeInfo) SetState(s NodeState) {
(*netmap.NodeInfo)(i).SetState(s.ToV2())
if i.m == nil {
i.m = new(netmap.NodeInfo)
}
i.m.SetState(s.ToV2())
}
// Marshal marshals NodeInfo into a protobuf binary form.
func (i *NodeInfo) Marshal() ([]byte, error) {
return (*netmap.NodeInfo)(i).StableMarshal(nil), nil
return i.m.StableMarshal(nil), nil
}
// Unmarshal unmarshals protobuf binary representation of NodeInfo.
func (i *NodeInfo) Unmarshal(data []byte) error {
return (*netmap.NodeInfo)(i).Unmarshal(data)
if i.m == nil {
i.m = new(netmap.NodeInfo)
}
return i.m.Unmarshal(data)
}
// MarshalJSON encodes NodeInfo to protobuf JSON format.
func (i *NodeInfo) MarshalJSON() ([]byte, error) {
return (*netmap.NodeInfo)(i).MarshalJSON()
return i.m.MarshalJSON()
}
// UnmarshalJSON decodes NodeInfo from protobuf JSON format.
func (i *NodeInfo) UnmarshalJSON(data []byte) error {
return (*netmap.NodeInfo)(i).UnmarshalJSON(data)
if i.m == nil {
i.m = new(netmap.NodeInfo)
}
err := i.m.UnmarshalJSON(data)
if err != nil {
return err
}
i.syncAttributes()
return nil
}

View file

@ -95,7 +95,7 @@ func TestNodeInfoFromV2(t *testing.T) {
t.Run("from nil", func(t *testing.T) {
var x *netmap.NodeInfo
require.Nil(t, NewNodeInfoFromV2(x))
require.Nil(t, NewNodeInfoFromV2(x).m)
})
t.Run("from non-nil", func(t *testing.T) {

View file

@ -49,7 +49,7 @@ func GetNodesCount(_ *PlacementPolicy, s *Selector) (int, int) {
// getSelection returns nodes grouped by s.attribute.
// Last argument specifies if more buckets can be used to fulfill CBF.
func (c *context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error) {
func (c *context) getSelection(p *PlacementPolicy, s *Selector) ([]nodes, error) {
bucketCount, nodesInBucket := GetNodesCount(p, s)
buckets := c.getSelectionBase(p.SubnetID(), s)
@ -64,7 +64,7 @@ func (c *context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error)
if len(c.pivot) == 0 {
if s.Attribute() == "" {
sort.Slice(buckets, func(i, j int) bool {
return buckets[i].nodes[0].ID < buckets[j].nodes[0].ID
return buckets[i].nodes[0].less(buckets[j].nodes[0])
})
} else {
sort.Slice(buckets, func(i, j int) bool {
@ -74,52 +74,52 @@ func (c *context) getSelection(p *PlacementPolicy, s *Selector) ([]Nodes, error)
}
maxNodesInBucket := nodesInBucket * int(c.cbf)
nodes := make([]Nodes, 0, len(buckets))
fallback := make([]Nodes, 0, len(buckets))
res := make([]nodes, 0, len(buckets))
fallback := make([]nodes, 0, len(buckets))
for i := range buckets {
ns := buckets[i].nodes
if len(ns) >= maxNodesInBucket {
nodes = append(nodes, ns[:maxNodesInBucket])
res = append(res, ns[:maxNodesInBucket])
} else if len(ns) >= nodesInBucket {
fallback = append(fallback, ns)
}
}
if len(nodes) < bucketCount {
if len(res) < bucketCount {
// Fallback to using minimum allowed backup factor (1).
nodes = append(nodes, fallback...)
if len(nodes) < bucketCount {
res = append(res, fallback...)
if len(res) < bucketCount {
return nil, fmt.Errorf("%w: '%s'", ErrNotEnoughNodes, s.Name())
}
}
if len(c.pivot) != 0 {
weights := make([]float64, len(nodes))
for i := range nodes {
weights[i] = GetBucketWeight(nodes[i], c.aggregator(), c.weightFunc)
weights := make([]float64, len(res))
for i := range res {
weights[i] = GetBucketWeight(res[i], c.aggregator(), c.weightFunc)
}
hrw.SortSliceByWeightValue(nodes, weights, c.pivotHash)
hrw.SortSliceByWeightValue(res, weights, c.pivotHash)
}
if s.Attribute() == "" {
nodes, fallback = nodes[:bucketCount], nodes[bucketCount:]
res, fallback = res[:bucketCount], res[bucketCount:]
for i := range fallback {
index := i % bucketCount
if len(nodes[index]) >= maxNodesInBucket {
if len(res[index]) >= maxNodesInBucket {
break
}
nodes[index] = append(nodes[index], fallback[i]...)
res[index] = append(res[index], fallback[i]...)
}
}
return nodes[:bucketCount], nil
return res[:bucketCount], nil
}
type nodeAttrPair struct {
attr string
nodes Nodes
nodes nodes
}
// getSelectionBase returns nodes grouped by selector attribute.
@ -128,25 +128,25 @@ func (c *context) getSelectionBase(subnetID *subnetid.ID, s *Selector) []nodeAtt
f := c.Filters[s.Filter()]
isMain := s.Filter() == MainFilterName
result := []nodeAttrPair{}
nodeMap := map[string]Nodes{}
nodeMap := map[string][]NodeInfo{}
attr := s.Attribute()
for i := range c.Netmap.Nodes {
for i := range c.Netmap.nodes {
var sid subnetid.ID
if subnetID != nil {
sid = *subnetID
}
// TODO(fyrchik): make `BelongsToSubnet` to accept pointer
if !BelongsToSubnet(c.Netmap.Nodes[i].NodeInfo, sid) {
if !BelongsToSubnet(&c.Netmap.nodes[i], sid) {
continue
}
if isMain || c.match(f, &c.Netmap.Nodes[i]) {
if isMain || c.match(f, c.Netmap.nodes[i]) {
if attr == "" {
// Default attribute is transparent identifier which is different for every node.
result = append(result, nodeAttrPair{attr: "", nodes: Nodes{c.Netmap.Nodes[i]}})
result = append(result, nodeAttrPair{attr: "", nodes: nodes{c.Netmap.nodes[i]}})
} else {
v := c.Netmap.Nodes[i].Attribute(attr)
nodeMap[v] = append(nodeMap[v], c.Netmap.Nodes[i])
v := c.Netmap.nodes[i].attribute(attr)
nodeMap[v] = append(nodeMap[v], c.Netmap.nodes[i])
}
}
}
@ -159,7 +159,7 @@ func (c *context) getSelectionBase(subnetID *subnetid.ID, s *Selector) []nodeAtt
if len(c.pivot) != 0 {
for i := range result {
hrw.SortSliceByWeightValue(result[i].nodes, result[i].nodes.Weights(c.weightFunc), c.pivotHash)
hrw.SortSliceByWeightValue(result[i].nodes, result[i].nodes.weights(c.weightFunc), c.pivotHash)
}
}

View file

@ -16,74 +16,76 @@ import (
func BenchmarkHRWSort(b *testing.B) {
const netmapSize = 1000
nodes := make([]Nodes, netmapSize)
vectors := make([]nodes, netmapSize)
weights := make([]float64, netmapSize)
for i := range nodes {
nodes[i] = Nodes{{
ID: rand.Uint64(),
Index: i,
Capacity: 100,
Price: 1,
AttrMap: nil,
}}
for i := range vectors {
key := make([]byte, 33)
rand.Read(key)
var node NodeInfo
node.setPrice(1)
node.setCapacity(100)
node.SetPublicKey(key)
vectors[i] = nodes{node}
weights[i] = float64(rand.Uint32()%10) / 10.0
}
pivot := rand.Uint64()
b.Run("sort by index, no weight", func(b *testing.B) {
realNodes := make([]Nodes, netmapSize)
realNodes := make([]nodes, netmapSize)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
copy(realNodes, nodes)
copy(realNodes, vectors)
b.StartTimer()
hrw.SortSliceByIndex(realNodes, pivot)
}
})
b.Run("sort by value, no weight", func(b *testing.B) {
realNodes := make([]Nodes, netmapSize)
realNodes := make([]nodes, netmapSize)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
copy(realNodes, nodes)
copy(realNodes, vectors)
b.StartTimer()
hrw.SortSliceByValue(realNodes, pivot)
}
})
b.Run("only sort by index", func(b *testing.B) {
realNodes := make([]Nodes, netmapSize)
realNodes := make([]nodes, netmapSize)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
copy(realNodes, nodes)
copy(realNodes, vectors)
b.StartTimer()
hrw.SortSliceByWeightIndex(realNodes, weights, pivot)
}
})
b.Run("sort by value", func(b *testing.B) {
realNodes := make([]Nodes, netmapSize)
realNodes := make([]nodes, netmapSize)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
copy(realNodes, nodes)
copy(realNodes, vectors)
b.StartTimer()
hrw.SortSliceByWeightValue(realNodes, weights, pivot)
}
})
b.Run("sort by ID, then by index (deterministic)", func(b *testing.B) {
realNodes := make([]Nodes, netmapSize)
realNodes := make([]nodes, netmapSize)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
copy(realNodes, nodes)
copy(realNodes, vectors)
b.StartTimer()
sort.Slice(nodes, func(i, j int) bool {
return nodes[i][0].ID < nodes[j][0].ID
sort.Slice(vectors, func(i, j int) bool {
return vectors[i][0].less(vectors[j][0])
})
hrw.SortSliceByWeightIndex(realNodes, weights, pivot)
}
@ -123,8 +125,8 @@ func BenchmarkPolicyHRWType(b *testing.B) {
nodes[i].SetPublicKey(pub)
}
nm, err := NewNetmap(NodesFromInfo(nodes))
require.NoError(b, err)
var nm Netmap
nm.SetNodes(nodes)
b.ResetTimer()
for i := 0; i < b.N; i++ {
@ -150,8 +152,8 @@ func TestPlacementPolicy_DeterministicOrder(t *testing.T) {
newFilter("loc2", "Location", "Shanghai", OpNE),
})
nodes := make([]NodeInfo, netmapSize)
for i := range nodes {
nodeList := make([]NodeInfo, netmapSize)
for i := range nodeList {
var loc string
switch i % 20 {
case 0:
@ -162,20 +164,27 @@ func TestPlacementPolicy_DeterministicOrder(t *testing.T) {
// Having the same price and capacity ensures equal weights for all nodes.
// This way placement is more dependent on the initial order.
nodes[i] = nodeInfoFromAttributes("Location", loc, "Price", "1", "Capacity", "10")
nodeList[i] = nodeInfoFromAttributes("Location", loc, "Price", "1", "Capacity", "10")
pub := make([]byte, 33)
pub[0] = byte(i)
nodes[i].SetPublicKey(pub)
nodeList[i].SetPublicKey(pub)
}
nm, err := NewNetmap(NodesFromInfo(nodes))
require.NoError(t, err)
getIndices := func(t *testing.T) (int, int) {
var nm Netmap
nm.SetNodes(nodeList)
getIndices := func(t *testing.T) (uint64, uint64) {
v, err := nm.GetContainerNodes(p, []byte{1})
require.NoError(t, err)
ns := v.Flatten()
nss := make([]nodes, len(v))
for i := range v {
nss[i] = v[i]
}
ns := flattenNodes(nss)
require.Equal(t, 2, len(ns))
return ns[0].Index, ns[1].Index
return ns[0].Hash(), ns[1].Hash()
}
a, b := getIndices(t)
@ -212,9 +221,9 @@ func TestPlacementPolicy_ProcessSelectors(t *testing.T) {
nodeInfoFromAttributes("Country", "Russia", "Rating", "9", "City", "SPB"),
}
nm, err := NewNetmap(NodesFromInfo(nodes))
require.NoError(t, err)
c := newContext(nm)
var nm Netmap
nm.SetNodes(nodes)
c := newContext(&nm)
c.setCBF(p.ContainerBackupFactor())
require.NoError(t, c.processFilters(p))
require.NoError(t, c.processSelectors(p))
@ -229,7 +238,7 @@ func TestPlacementPolicy_ProcessSelectors(t *testing.T) {
for _, res := range sel {
require.Equal(t, nodesInBucket, len(res), targ)
for j := range res {
require.True(t, c.applyFilter(s.Filter(), &res[j]), targ)
require.True(t, c.applyFilter(s.Filter(), res[j]), targ)
}
}
}

View file

@ -31,7 +31,11 @@ func (i *NodeInfo) changeSubnet(id subnetid.ID, isMember bool) {
info.SetID(&idv2)
info.SetEntryFlag(isMember)
netmap.WriteSubnetInfo((*netmap.NodeInfo)(i), info)
if i.m == nil {
i.m = new(netmap.NodeInfo)
}
netmap.WriteSubnetInfo(i.m, info)
}
// ErrRemoveSubnet is returned when a node needs to leave the subnet.
@ -48,7 +52,7 @@ var ErrRemoveSubnet = netmap.ErrRemoveSubnet
func (i *NodeInfo) IterateSubnets(f func(subnetid.ID) error) error {
var id subnetid.ID
return netmap.IterateSubnets((*netmap.NodeInfo)(i), func(idv2 refs.SubnetID) error {
return netmap.IterateSubnets(i.m, func(idv2 refs.SubnetID) error {
err := id.ReadFromV2(idv2)
if err != nil {
return fmt.Errorf("invalid subnet: %w", err)