forked from TrueCloudLab/frostfs-node
[#607] placement: Make traverser to return list of address groups
Make placement `Traverser.Next` method to return ``[]network.AddressGroup` in order to support multiple addresses of the storeage nodes. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
8ac3c62518
commit
b3dd9a3254
6 changed files with 15 additions and 17 deletions
|
@ -3,7 +3,6 @@ package getsvc
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -79,7 +78,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
|
||||||
// TODO: consider parallel execution
|
// TODO: consider parallel execution
|
||||||
// TODO: consider optimization: if status == SPLIT we can continue until
|
// TODO: consider optimization: if status == SPLIT we can continue until
|
||||||
// we reach the best result - split info with linking object ID.
|
// we reach the best result - split info with linking object ID.
|
||||||
if exec.processNode(ctx, network.GroupFromAddress(addrs[i])) {
|
if exec.processNode(ctx, addrs[i]) {
|
||||||
exec.log.Debug("completing the operation")
|
exec.log.Debug("completing the operation")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,7 +110,7 @@ loop:
|
||||||
if err := t.workerPool.Submit(func() {
|
if err := t.workerPool.Submit(func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
if err := f(network.GroupFromAddress(addr)); err != nil {
|
if err := f(addr); err != nil {
|
||||||
svcutil.LogServiceError(t.log, "PUT", addr, err)
|
svcutil.LogServiceError(t.log, "PUT", addr, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ package searchsvc
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -77,7 +76,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: consider parallel execution
|
// TODO: consider parallel execution
|
||||||
exec.processNode(ctx, network.GroupFromAddress(addrs[i]))
|
exec.processNode(ctx, addrs[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,9 +7,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// LogServiceError writes debug error message of object service to provided logger.
|
// LogServiceError writes debug error message of object service to provided logger.
|
||||||
func LogServiceError(l *logger.Logger, req string, node network.Address, err error) {
|
func LogServiceError(l *logger.Logger, req string, node network.AddressGroup, err error) {
|
||||||
l.Debug("object service error",
|
l.Debug("object service error",
|
||||||
zap.Stringer("node", node),
|
zap.String("node", network.StringifyGroup(node)),
|
||||||
zap.String("request", req),
|
zap.String("request", req),
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
|
|
@ -122,7 +122,7 @@ func flatNodes(ns []netmap.Nodes) []netmap.Nodes {
|
||||||
// Next returns next unprocessed address of the object placement.
|
// Next returns next unprocessed address of the object placement.
|
||||||
//
|
//
|
||||||
// Returns nil if no nodes left or traversal operation succeeded.
|
// Returns nil if no nodes left or traversal operation succeeded.
|
||||||
func (t *Traverser) Next() []network.Address {
|
func (t *Traverser) Next() []network.AddressGroup {
|
||||||
t.mtx.Lock()
|
t.mtx.Lock()
|
||||||
defer t.mtx.Unlock()
|
defer t.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -139,10 +139,10 @@ func (t *Traverser) Next() []network.Address {
|
||||||
count = len(t.vectors[0])
|
count = len(t.vectors[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
addrs := make([]network.Address, count)
|
addrs := make([]network.AddressGroup, count)
|
||||||
|
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
err := addrs[i].FromString(t.vectors[0][i].Address())
|
err := addrs[i].FromIterator(t.vectors[0][i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: log error
|
// TODO: log error
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -65,12 +65,12 @@ func testPlacement(t *testing.T, ss, rs []int) ([]netmap.Nodes, *container.Conta
|
||||||
return nodes, container.New(container.WithPolicy(policy))
|
return nodes, container.New(container.WithPolicy(policy))
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertSameAddress(t *testing.T, ni *netmap.NodeInfo, addr network.Address) {
|
func assertSameAddress(t *testing.T, ni *netmap.NodeInfo, addr network.AddressGroup) {
|
||||||
var netAddr network.Address
|
var netAddr network.AddressGroup
|
||||||
|
|
||||||
err := netAddr.FromString(ni.Address())
|
err := netAddr.FromIterator(ni)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, netAddr.Equal(addr))
|
require.True(t, netAddr.Intersects(addr))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTraverserObjectScenarios(t *testing.T) {
|
func TestTraverserObjectScenarios(t *testing.T) {
|
||||||
|
@ -124,12 +124,12 @@ func TestTraverserObjectScenarios(t *testing.T) {
|
||||||
require.NotNil(t, tr.Next())
|
require.NotNil(t, tr.Next())
|
||||||
}
|
}
|
||||||
|
|
||||||
var n network.Address
|
var n network.AddressGroup
|
||||||
|
|
||||||
err = n.FromString(nodes[1][0].Address())
|
err = n.FromIterator(nodes[1][0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, []network.Address{n}, tr.Next())
|
require.Equal(t, []network.AddressGroup{n}, tr.Next())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put scenario", func(t *testing.T) {
|
t.Run("put scenario", func(t *testing.T) {
|
||||||
|
|
Loading…
Reference in a new issue