From f7c685f682bab1dee802669a3eacffe485dd571d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 18 Sep 2020 18:14:26 +0300 Subject: [PATCH] [#31] placement: Fix incorrect selectors processing In previous implementation traverser worked like all counts of all selectors are equal to counts of corresponding replicas. Make traverser to take into account select count of all replicas. Signed-off-by: Leonard Lyubich --- .../object_manager/placement/traverser.go | 94 ++++++++--- .../placement/traverser_test.go | 155 +++++++++++++----- 2 files changed, 181 insertions(+), 68 deletions(-) diff --git a/pkg/services/object_manager/placement/traverser.go b/pkg/services/object_manager/placement/traverser.go index 94fb47b738..fa17f2ca50 100644 --- a/pkg/services/object_manager/placement/traverser.go +++ b/pkg/services/object_manager/placement/traverser.go @@ -29,11 +29,9 @@ type Option func(*cfg) type Traverser struct { mtx *sync.RWMutex - rem int - - nextI, nextJ int - vectors []netmap.Nodes + + rem []int } type cfg struct { @@ -46,11 +44,14 @@ type cfg struct { builder Builder } +const invalidOptsMsg = "invalid traverser options" + var errNilBuilder = errors.New("placement builder is nil") +var errNilPolicy = errors.New("placement policy is nil") + func defaultCfg() *cfg { return &cfg{ - rem: 1, addr: object.NewAddress(), } } @@ -66,7 +67,9 @@ func NewTraverser(opts ...Option) (*Traverser, error) { } if cfg.builder == nil { - return nil, errors.Wrap(errNilBuilder, "incomplete traverser options") + return nil, errors.Wrap(errNilBuilder, invalidOptsMsg) + } else if cfg.policy == nil { + return nil, errors.Wrap(errNilPolicy, invalidOptsMsg) } ns, err := cfg.builder.BuildPlacement(cfg.addr, cfg.policy) @@ -74,9 +77,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) { return nil, errors.Wrap(err, "could not build placement") } + ss := cfg.policy.GetSelectors() + rem := make([]int, 0, len(ss)) + + for i := range ss { + cnt := cfg.rem + + if cnt == 0 { + cnt = int(ss[i].GetCount()) + } + + rem = append(rem, cnt) + } + return &Traverser{ mtx: new(sync.RWMutex), - rem: cfg.rem, + rem: rem, vectors: ns, }, nil } @@ -84,41 +100,73 @@ func NewTraverser(opts ...Option) (*Traverser, error) { // Next returns next unprocessed address of the object placement. // // Returns nil if no nodes left or traversal operation succeeded. -func (t *Traverser) Next() *network.Address { +func (t *Traverser) Next() []*network.Address { t.mtx.Lock() defer t.mtx.Unlock() - if t.rem == 0 || t.nextI == len(t.vectors) { + t.skipEmptyVectors() + + if len(t.vectors) == 0 { + return nil + } else if len(t.vectors[0]) < t.rem[0] { return nil } - addr, err := network.AddressFromString(t.vectors[t.nextI][t.nextJ].NetworkAddress()) - if err != nil { - // TODO: log error + count := t.rem[0] + if count < 0 { + count = len(t.vectors[0]) } - if t.nextJ++; t.nextJ == len(t.vectors[t.nextI]) { - t.nextJ = 0 - t.nextI++ + addrs := make([]*network.Address, 0, count) + + for i := 0; i < count; i++ { + addr, err := network.AddressFromString(t.vectors[0][i].NetworkAddress()) + if err != nil { + // TODO: log error + return nil + } + + addrs = append(addrs, addr) } - return addr + t.vectors[0] = t.vectors[0][count:] + + return addrs +} + +func (t *Traverser) skipEmptyVectors() { + for i := 0; i < len(t.vectors); i++ { // don't use range, slice changes in body + if len(t.vectors[i]) == 0 && t.rem[i] <= 0 || t.rem[0] == 0 { + t.vectors = append(t.vectors[:i], t.vectors[i+1:]...) + t.rem = append(t.rem[:i], t.rem[i+1:]...) + i-- + } else { + break + } + } } // SubmitSuccess writes single succeeded node operation. func (t *Traverser) SubmitSuccess() { t.mtx.Lock() - t.rem-- + if len(t.rem) > 0 { + t.rem[0]-- + } t.mtx.Unlock() } // Success returns true if traversal operation succeeded. func (t *Traverser) Success() bool { t.mtx.RLock() - s := t.rem <= 0 - t.mtx.RUnlock() + defer t.mtx.RUnlock() - return s + for i := range t.rem { + if t.rem[i] > 0 { + return false + } + } + + return true } // UseBuilder is a placement builder setting option. @@ -146,12 +194,6 @@ func UseNetworkMap(nm *netmap.Netmap) Option { func ForContainer(cnr *container.Container) Option { return func(c *cfg) { c.policy = cnr.GetPlacementPolicy() - - c.rem = 0 - for _, r := range c.policy.GetReplicas() { - c.rem += int(r.GetCount()) - } - c.addr.SetContainerID(container.CalculateID(cnr)) } } diff --git a/pkg/services/object_manager/placement/traverser_test.go b/pkg/services/object_manager/placement/traverser_test.go index 291ec0702d..eb7322a4b5 100644 --- a/pkg/services/object_manager/placement/traverser_test.go +++ b/pkg/services/object_manager/placement/traverser_test.go @@ -4,6 +4,7 @@ import ( "strconv" "testing" + "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" "github.com/nspcc-dev/neofs-api-go/pkg/object" netmapV2 "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" @@ -35,86 +36,156 @@ func flattenVectors(vs []netmap.Nodes) netmap.Nodes { return v } -func testPlacement(t *testing.T, sz []int) []netmap.Nodes { - res := make([]netmap.Nodes, 0, len(sz)) +func copyVectors(v []netmap.Nodes) []netmap.Nodes { + vc := make([]netmap.Nodes, 0, len(v)) + + for i := range v { + ns := make(netmap.Nodes, len(v[i])) + copy(ns, v[i]) + + vc = append(vc, ns) + } + + return vc +} + +func testPlacement(t *testing.T, rs, ss []int) ([]netmap.Nodes, *container.Container) { + nodes := make([]netmap.Nodes, 0, len(rs)) + selectors := make([]*netmap.Selector, 0, len(rs)) num := uint32(0) - for i := range sz { - ns := make([]netmapV2.NodeInfo, 0, sz[i]) + for i := range rs { + ns := make([]netmapV2.NodeInfo, 0, rs[i]) - for j := 0; j < sz[i]; j++ { + for j := 0; j < rs[i]; j++ { ns = append(ns, testNode(num)) num++ } - res = append(res, netmap.NodesFromV2(ns)) + nodes = append(nodes, netmap.NodesFromV2(ns)) + + s := new(netmap.Selector) + s.SetCount(uint32(ss[i])) + + selectors = append(selectors, s) } - return res + policy := new(netmap.PlacementPolicy) + policy.SetSelectors(selectors) + + return nodes, container.New(container.WithPolicy(policy)) } func TestTraverserObjectScenarios(t *testing.T) { t.Run("search scenario", func(t *testing.T) { - nodes := testPlacement(t, []int{2, 3}) + replicas := []int{2, 3} + selectors := []int{1, 2} - allNodes := flattenVectors(nodes) + nodes, cnr := testPlacement(t, replicas, selectors) + + nodesCopy := copyVectors(nodes) tr, err := NewTraverser( - UseBuilder(&testBuilder{vectors: nodes}), + ForContainer(cnr), + UseBuilder(&testBuilder{vectors: nodesCopy}), WithoutSuccessTracking(), ) require.NoError(t, err) - require.True(t, tr.Success()) + for i := range replicas { + addrs := tr.Next() - for i := range allNodes { - require.Equal(t, allNodes[i].NetworkAddress(), tr.Next().String()) + require.Len(t, addrs, len(nodes[i])) + + for j, n := range nodes[i] { + require.Equal(t, n.NetworkAddress(), addrs[j].String()) + } } - require.Nil(t, tr.Next()) + require.Empty(t, tr.Next()) require.True(t, tr.Success()) }) t.Run("read scenario", func(t *testing.T) { - nodes := testPlacement(t, []int{5, 3, 4}) + replicas := []int{5, 3} + selectors := []int{2, 2} - allNodes := flattenVectors(nodes) + nodes, cnr := testPlacement(t, replicas, selectors) + + nodesCopy := copyVectors(nodes) tr, err := NewTraverser( - UseBuilder(&testBuilder{vectors: nodes}), + ForContainer(cnr), + UseBuilder(&testBuilder{vectors: nodesCopy}), + SuccessAfter(1), ) require.NoError(t, err) - for i := range allNodes[:len(allNodes)-3] { - require.Equal(t, allNodes[i].NetworkAddress(), tr.Next().String()) - } + fn := func(curVector int) { + for i := 0; i < replicas[curVector]; i++ { + addrs := tr.Next() + require.Len(t, addrs, 1) - require.False(t, tr.Success()) + require.Equal(t, nodes[curVector][i].NetworkAddress(), addrs[0].String()) + } - tr.SubmitSuccess() - - require.True(t, tr.Success()) - - require.Nil(t, tr.Next()) - }) - - t.Run("put scenario", func(t *testing.T) { - nodes := testPlacement(t, []int{3, 3, 3}) - sucCount := 3 - - tr, err := NewTraverser( - UseBuilder(&testBuilder{vectors: nodes}), - SuccessAfter(sucCount), - ) - require.NoError(t, err) - - for i := 0; i < sucCount; i++ { - require.NotNil(t, tr.Next()) + require.Empty(t, tr.Next()) require.False(t, tr.Success()) + tr.SubmitSuccess() } - require.Nil(t, tr.Next()) - require.True(t, tr.Success()) + for i := range replicas { + fn(i) + + if i < len(replicas)-1 { + require.False(t, tr.Success()) + } else { + require.True(t, tr.Success()) + } + } + }) + + t.Run("put scenario", func(t *testing.T) { + replicas := []int{5, 3} + selectors := []int{2, 2} + + nodes, cnr := testPlacement(t, replicas, selectors) + + nodesCopy := copyVectors(nodes) + + tr, err := NewTraverser( + ForContainer(cnr), + UseBuilder(&testBuilder{vectors: nodesCopy}), + ) + require.NoError(t, err) + + fn := func(curVector int) { + for i := 0; i+selectors[curVector] < replicas[curVector]; i += selectors[curVector] { + addrs := tr.Next() + require.Len(t, addrs, selectors[curVector]) + + for j := range addrs { + require.Equal(t, nodes[curVector][i+j].NetworkAddress(), addrs[j].String()) + } + } + + require.Empty(t, tr.Next()) + require.False(t, tr.Success()) + + for i := 0; i < selectors[curVector]; i++ { + tr.SubmitSuccess() + } + } + + for i := range replicas { + fn(i) + + if i < len(replicas)-1 { + require.False(t, tr.Success()) + } else { + require.True(t, tr.Success()) + } + } }) }