diff --git a/pkg/services/object_manager/placement/traverser.go b/pkg/services/object_manager/placement/traverser.go index 94fb47b73..fa17f2ca5 100644 --- a/pkg/services/object_manager/placement/traverser.go +++ b/pkg/services/object_manager/placement/traverser.go @@ -29,11 +29,9 @@ type Option func(*cfg) type Traverser struct { mtx *sync.RWMutex - rem int - - nextI, nextJ int - vectors []netmap.Nodes + + rem []int } type cfg struct { @@ -46,11 +44,14 @@ type cfg struct { builder Builder } +const invalidOptsMsg = "invalid traverser options" + var errNilBuilder = errors.New("placement builder is nil") +var errNilPolicy = errors.New("placement policy is nil") + func defaultCfg() *cfg { return &cfg{ - rem: 1, addr: object.NewAddress(), } } @@ -66,7 +67,9 @@ func NewTraverser(opts ...Option) (*Traverser, error) { } if cfg.builder == nil { - return nil, errors.Wrap(errNilBuilder, "incomplete traverser options") + return nil, errors.Wrap(errNilBuilder, invalidOptsMsg) + } else if cfg.policy == nil { + return nil, errors.Wrap(errNilPolicy, invalidOptsMsg) } ns, err := cfg.builder.BuildPlacement(cfg.addr, cfg.policy) @@ -74,9 +77,22 @@ func NewTraverser(opts ...Option) (*Traverser, error) { return nil, errors.Wrap(err, "could not build placement") } + ss := cfg.policy.GetSelectors() + rem := make([]int, 0, len(ss)) + + for i := range ss { + cnt := cfg.rem + + if cnt == 0 { + cnt = int(ss[i].GetCount()) + } + + rem = append(rem, cnt) + } + return &Traverser{ mtx: new(sync.RWMutex), - rem: cfg.rem, + rem: rem, vectors: ns, }, nil } @@ -84,41 +100,73 @@ func NewTraverser(opts ...Option) (*Traverser, error) { // Next returns next unprocessed address of the object placement. // // Returns nil if no nodes left or traversal operation succeeded. -func (t *Traverser) Next() *network.Address { +func (t *Traverser) Next() []*network.Address { t.mtx.Lock() defer t.mtx.Unlock() - if t.rem == 0 || t.nextI == len(t.vectors) { + t.skipEmptyVectors() + + if len(t.vectors) == 0 { + return nil + } else if len(t.vectors[0]) < t.rem[0] { return nil } - addr, err := network.AddressFromString(t.vectors[t.nextI][t.nextJ].NetworkAddress()) - if err != nil { - // TODO: log error + count := t.rem[0] + if count < 0 { + count = len(t.vectors[0]) } - if t.nextJ++; t.nextJ == len(t.vectors[t.nextI]) { - t.nextJ = 0 - t.nextI++ + addrs := make([]*network.Address, 0, count) + + for i := 0; i < count; i++ { + addr, err := network.AddressFromString(t.vectors[0][i].NetworkAddress()) + if err != nil { + // TODO: log error + return nil + } + + addrs = append(addrs, addr) } - return addr + t.vectors[0] = t.vectors[0][count:] + + return addrs +} + +func (t *Traverser) skipEmptyVectors() { + for i := 0; i < len(t.vectors); i++ { // don't use range, slice changes in body + if len(t.vectors[i]) == 0 && t.rem[i] <= 0 || t.rem[0] == 0 { + t.vectors = append(t.vectors[:i], t.vectors[i+1:]...) + t.rem = append(t.rem[:i], t.rem[i+1:]...) + i-- + } else { + break + } + } } // SubmitSuccess writes single succeeded node operation. func (t *Traverser) SubmitSuccess() { t.mtx.Lock() - t.rem-- + if len(t.rem) > 0 { + t.rem[0]-- + } t.mtx.Unlock() } // Success returns true if traversal operation succeeded. func (t *Traverser) Success() bool { t.mtx.RLock() - s := t.rem <= 0 - t.mtx.RUnlock() + defer t.mtx.RUnlock() - return s + for i := range t.rem { + if t.rem[i] > 0 { + return false + } + } + + return true } // UseBuilder is a placement builder setting option. @@ -146,12 +194,6 @@ func UseNetworkMap(nm *netmap.Netmap) Option { func ForContainer(cnr *container.Container) Option { return func(c *cfg) { c.policy = cnr.GetPlacementPolicy() - - c.rem = 0 - for _, r := range c.policy.GetReplicas() { - c.rem += int(r.GetCount()) - } - c.addr.SetContainerID(container.CalculateID(cnr)) } } diff --git a/pkg/services/object_manager/placement/traverser_test.go b/pkg/services/object_manager/placement/traverser_test.go index 291ec0702..eb7322a4b 100644 --- a/pkg/services/object_manager/placement/traverser_test.go +++ b/pkg/services/object_manager/placement/traverser_test.go @@ -4,6 +4,7 @@ import ( "strconv" "testing" + "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" "github.com/nspcc-dev/neofs-api-go/pkg/object" netmapV2 "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" @@ -35,86 +36,156 @@ func flattenVectors(vs []netmap.Nodes) netmap.Nodes { return v } -func testPlacement(t *testing.T, sz []int) []netmap.Nodes { - res := make([]netmap.Nodes, 0, len(sz)) +func copyVectors(v []netmap.Nodes) []netmap.Nodes { + vc := make([]netmap.Nodes, 0, len(v)) + + for i := range v { + ns := make(netmap.Nodes, len(v[i])) + copy(ns, v[i]) + + vc = append(vc, ns) + } + + return vc +} + +func testPlacement(t *testing.T, rs, ss []int) ([]netmap.Nodes, *container.Container) { + nodes := make([]netmap.Nodes, 0, len(rs)) + selectors := make([]*netmap.Selector, 0, len(rs)) num := uint32(0) - for i := range sz { - ns := make([]netmapV2.NodeInfo, 0, sz[i]) + for i := range rs { + ns := make([]netmapV2.NodeInfo, 0, rs[i]) - for j := 0; j < sz[i]; j++ { + for j := 0; j < rs[i]; j++ { ns = append(ns, testNode(num)) num++ } - res = append(res, netmap.NodesFromV2(ns)) + nodes = append(nodes, netmap.NodesFromV2(ns)) + + s := new(netmap.Selector) + s.SetCount(uint32(ss[i])) + + selectors = append(selectors, s) } - return res + policy := new(netmap.PlacementPolicy) + policy.SetSelectors(selectors) + + return nodes, container.New(container.WithPolicy(policy)) } func TestTraverserObjectScenarios(t *testing.T) { t.Run("search scenario", func(t *testing.T) { - nodes := testPlacement(t, []int{2, 3}) + replicas := []int{2, 3} + selectors := []int{1, 2} - allNodes := flattenVectors(nodes) + nodes, cnr := testPlacement(t, replicas, selectors) + + nodesCopy := copyVectors(nodes) tr, err := NewTraverser( - UseBuilder(&testBuilder{vectors: nodes}), + ForContainer(cnr), + UseBuilder(&testBuilder{vectors: nodesCopy}), WithoutSuccessTracking(), ) require.NoError(t, err) - require.True(t, tr.Success()) + for i := range replicas { + addrs := tr.Next() - for i := range allNodes { - require.Equal(t, allNodes[i].NetworkAddress(), tr.Next().String()) + require.Len(t, addrs, len(nodes[i])) + + for j, n := range nodes[i] { + require.Equal(t, n.NetworkAddress(), addrs[j].String()) + } } - require.Nil(t, tr.Next()) + require.Empty(t, tr.Next()) require.True(t, tr.Success()) }) t.Run("read scenario", func(t *testing.T) { - nodes := testPlacement(t, []int{5, 3, 4}) + replicas := []int{5, 3} + selectors := []int{2, 2} - allNodes := flattenVectors(nodes) + nodes, cnr := testPlacement(t, replicas, selectors) + + nodesCopy := copyVectors(nodes) tr, err := NewTraverser( - UseBuilder(&testBuilder{vectors: nodes}), + ForContainer(cnr), + UseBuilder(&testBuilder{vectors: nodesCopy}), + SuccessAfter(1), ) require.NoError(t, err) - for i := range allNodes[:len(allNodes)-3] { - require.Equal(t, allNodes[i].NetworkAddress(), tr.Next().String()) - } + fn := func(curVector int) { + for i := 0; i < replicas[curVector]; i++ { + addrs := tr.Next() + require.Len(t, addrs, 1) - require.False(t, tr.Success()) + require.Equal(t, nodes[curVector][i].NetworkAddress(), addrs[0].String()) + } - tr.SubmitSuccess() - - require.True(t, tr.Success()) - - require.Nil(t, tr.Next()) - }) - - t.Run("put scenario", func(t *testing.T) { - nodes := testPlacement(t, []int{3, 3, 3}) - sucCount := 3 - - tr, err := NewTraverser( - UseBuilder(&testBuilder{vectors: nodes}), - SuccessAfter(sucCount), - ) - require.NoError(t, err) - - for i := 0; i < sucCount; i++ { - require.NotNil(t, tr.Next()) + require.Empty(t, tr.Next()) require.False(t, tr.Success()) + tr.SubmitSuccess() } - require.Nil(t, tr.Next()) - require.True(t, tr.Success()) + for i := range replicas { + fn(i) + + if i < len(replicas)-1 { + require.False(t, tr.Success()) + } else { + require.True(t, tr.Success()) + } + } + }) + + t.Run("put scenario", func(t *testing.T) { + replicas := []int{5, 3} + selectors := []int{2, 2} + + nodes, cnr := testPlacement(t, replicas, selectors) + + nodesCopy := copyVectors(nodes) + + tr, err := NewTraverser( + ForContainer(cnr), + UseBuilder(&testBuilder{vectors: nodesCopy}), + ) + require.NoError(t, err) + + fn := func(curVector int) { + for i := 0; i+selectors[curVector] < replicas[curVector]; i += selectors[curVector] { + addrs := tr.Next() + require.Len(t, addrs, selectors[curVector]) + + for j := range addrs { + require.Equal(t, nodes[curVector][i+j].NetworkAddress(), addrs[j].String()) + } + } + + require.Empty(t, tr.Next()) + require.False(t, tr.Success()) + + for i := 0; i < selectors[curVector]; i++ { + tr.SubmitSuccess() + } + } + + for i := range replicas { + fn(i) + + if i < len(replicas)-1 { + require.False(t, tr.Success()) + } else { + require.True(t, tr.Success()) + } + } }) }