[#223] placement: Fix local and single-success placement traversal

In previous implementation placement traverser processed incorrectly with
local placement build. Also entity incorrectly traversed the placement
vectors for fixed number read operations until success. The erroneous
behavior was due to the use of a vector number of successes instead of
a scalar number in these scenarios.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-12-01 17:10:12 +03:00 committed by Alex Vanin
parent f6e5644b85
commit 5470d94416
2 changed files with 73 additions and 36 deletions

View file

@ -35,7 +35,9 @@ type Traverser struct {
}
type cfg struct {
rem int
trackCopies bool
flatSuccess *uint32
addr *object.Address
@ -52,7 +54,8 @@ var errNilPolicy = errors.New("placement policy is nil")
func defaultCfg() *cfg {
return &cfg{
addr: object.NewAddress(),
trackCopies: true,
addr: object.NewAddress(),
}
}
@ -77,17 +80,21 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
return nil, errors.Wrap(err, "could not build placement")
}
rs := cfg.policy.Replicas()
rem := make([]int, 0, len(rs))
var rem []int
if cfg.flatSuccess != nil {
ns = flatNodes(ns)
rem = []int{int(*cfg.flatSuccess)}
} else {
rs := cfg.policy.Replicas()
rem = make([]int, 0, len(rs))
for i := range rs {
cnt := cfg.rem
if cnt == 0 {
cnt = int(rs[i].Count())
for i := range rs {
if cfg.trackCopies {
rem = append(rem, int(rs[i].Count()))
} else {
rem = append(rem, -1)
}
}
rem = append(rem, cnt)
}
return &Traverser{
@ -97,6 +104,20 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
}, nil
}
func flatNodes(ns []netmap.Nodes) []netmap.Nodes {
sz := 0
for i := range ns {
sz += len(ns[i])
}
flat := make(netmap.Nodes, 0, sz)
for i := range ns {
flat = append(flat, ns[i]...)
}
return []netmap.Nodes{flat}
}
// Next returns next unprocessed address of the object placement.
//
// Returns nil if no nodes left or traversal operation succeeded.
@ -193,13 +214,13 @@ func ForObject(id *object.ID) Option {
}
}
// SuccessAfter is a success number setting option.
// SuccessAfter is a flat success number setting option.
//
// Option has no effect if the number is not positive.
func SuccessAfter(v int) Option {
func SuccessAfter(v uint32) Option {
return func(c *cfg) {
if v > 0 {
c.rem = v
c.flatSuccess = &v
}
}
}
@ -207,6 +228,6 @@ func SuccessAfter(v int) Option {
// WithoutSuccessTracking disables success tracking in traversal.
func WithoutSuccessTracking() Option {
return func(c *cfg) {
c.rem = -1
c.trackCopies = false
}
}

View file

@ -7,6 +7,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/stretchr/testify/require"
)
@ -114,34 +115,21 @@ func TestTraverserObjectScenarios(t *testing.T) {
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{vectors: nodesCopy}),
UseBuilder(&testBuilder{
vectors: nodesCopy,
}),
SuccessAfter(1),
)
require.NoError(t, err)
fn := func(curVector int) {
for i := 0; i < selectors[curVector]; i++ {
addrs := tr.Next()
require.Len(t, addrs, 1)
require.Equal(t, nodes[curVector][i].Address(), addrs[0].String())
}
require.Empty(t, tr.Next())
require.False(t, tr.Success())
tr.SubmitSuccess()
for i := 0; i < len(nodes[0]); i++ {
require.NotNil(t, tr.Next())
}
for i := range selectors {
fn(i)
n, err := network.AddressFromString(nodes[1][0].Address())
require.NoError(t, err)
if i < len(selectors)-1 {
require.False(t, tr.Success())
} else {
require.True(t, tr.Success())
}
}
require.Equal(t, []*network.Address{n}, tr.Next())
})
t.Run("put scenario", func(t *testing.T) {
@ -186,4 +174,32 @@ func TestTraverserObjectScenarios(t *testing.T) {
}
}
})
t.Run("local operation scenario", func(t *testing.T) {
selectors := []int{2, 3}
replicas := []int{1, 2}
nodes, cnr := testPlacement(t, selectors, replicas)
tr, err := NewTraverser(
ForContainer(cnr),
UseBuilder(&testBuilder{
vectors: []netmap.Nodes{{nodes[1][1]}}, // single node (local)
}),
SuccessAfter(1),
)
require.NoError(t, err)
require.NotEmpty(t, tr.Next())
require.False(t, tr.Success())
// add 1 OK
tr.SubmitSuccess()
// nothing more to do
require.Empty(t, tr.Next())
// common success
require.True(t, tr.Success())
})
}