[] placement: Overload result of Traverser.Next method

In previous implementation `placement.Traverser.Next` method returned slice
of `network.AddressGroup` elements. There is a need to process keys of
storage nodes besides network addresses for intra-container communication.

Wrap `network.AddressGroup` in a new type `placement.Node` that summarizes
the storage node information required for communication. Return slice of
`Node` instances from `Traverser.Next` method. Fix compilation breaks in
dependent packages.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-09-06 14:35:06 +03:00 committed by Alex Vanin
parent 3c848b2cad
commit fe90456dcc
6 changed files with 29 additions and 17 deletions
pkg/services

View file

@ -78,7 +78,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
// TODO: consider parallel execution // TODO: consider parallel execution
// TODO: consider optimization: if status == SPLIT we can continue until // TODO: consider optimization: if status == SPLIT we can continue until
// we reach the best result - split info with linking object ID. // we reach the best result - split info with linking object ID.
if exec.processNode(ctx, addrs[i]) { if exec.processNode(ctx, addrs[i].Addresses()) {
exec.log.Debug("completing the operation") exec.log.Debug("completing the operation")
return true return true
} }

View file

@ -25,7 +25,7 @@ type distributedTarget struct {
nodeTargetInitializer func(network.AddressGroup) transformer.ObjectTarget nodeTargetInitializer func(network.AddressGroup) transformer.ObjectTarget
relay func(network.AddressGroup) error relay func(placement.Node) error
fmt *object.FormatValidator fmt *object.FormatValidator
@ -68,15 +68,15 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
return t.iteratePlacement(t.sendObject) return t.iteratePlacement(t.sendObject)
} }
func (t *distributedTarget) sendObject(addr network.AddressGroup) error { func (t *distributedTarget) sendObject(node placement.Node) error {
if t.relay != nil { if t.relay != nil {
err := t.relay(addr) err := t.relay(node)
if err == nil || !errors.Is(err, errLocalAddress) { if err == nil || !errors.Is(err, errLocalAddress) {
return err return err
} }
} }
target := t.nodeTargetInitializer(addr) target := t.nodeTargetInitializer(node.Addresses())
if err := target.WriteHeader(t.obj); err != nil { if err := target.WriteHeader(t.obj); err != nil {
return fmt.Errorf("could not write header: %w", err) return fmt.Errorf("could not write header: %w", err)
@ -86,7 +86,7 @@ func (t *distributedTarget) sendObject(addr network.AddressGroup) error {
return nil return nil
} }
func (t *distributedTarget) iteratePlacement(f func(network.AddressGroup) error) (*transformer.AccessIdentifiers, error) { func (t *distributedTarget) iteratePlacement(f func(placement.Node) error) (*transformer.AccessIdentifiers, error) {
traverser, err := placement.NewTraverser( traverser, err := placement.NewTraverser(
append(t.traverseOpts, placement.ForObject(t.obj.ID()))..., append(t.traverseOpts, placement.ForObject(t.obj.ID()))...,
) )
@ -111,7 +111,7 @@ loop:
defer wg.Done() defer wg.Done()
if err := f(addr); err != nil { if err := f(addr); err != nil {
svcutil.LogServiceError(t.log, "PUT", addr, err) svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
return return
} }

View file

@ -147,9 +147,11 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
var errLocalAddress = errors.New("can't relay to local address") var errLocalAddress = errors.New("can't relay to local address")
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
var relay func(network.AddressGroup) error var relay func(placement.Node) error
if p.relay != nil { if p.relay != nil {
relay = func(addr network.AddressGroup) error { relay = func(node placement.Node) error {
addr := node.Addresses()
if network.IsLocalAddress(p.localAddrSrc, addr) { if network.IsLocalAddress(p.localAddrSrc, addr) {
return errLocalAddress return errLocalAddress
} }

View file

@ -76,7 +76,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
} }
// TODO: consider parallel execution // TODO: consider parallel execution
exec.processNode(ctx, addrs[i]) exec.processNode(ctx, addrs[i].Addresses())
} }
} }

View file

@ -119,10 +119,20 @@ func flatNodes(ns []netmap.Nodes) []netmap.Nodes {
return []netmap.Nodes{flat} return []netmap.Nodes{flat}
} }
// Node is a descriptor of storage node with information required for intra-container communication.
type Node struct {
addresses network.AddressGroup
}
// Addresses returns group of network addresses.
func (x Node) Addresses() network.AddressGroup {
return x.addresses
}
// Next returns next unprocessed address of the object placement. // Next returns next unprocessed address of the object placement.
// //
// Returns nil if no nodes left or traversal operation succeeded. // Returns nil if no nodes left or traversal operation succeeded.
func (t *Traverser) Next() []network.AddressGroup { func (t *Traverser) Next() []Node {
t.mtx.Lock() t.mtx.Lock()
defer t.mtx.Unlock() defer t.mtx.Unlock()
@ -139,10 +149,10 @@ func (t *Traverser) Next() []network.AddressGroup {
count = len(t.vectors[0]) count = len(t.vectors[0])
} }
addrs := make([]network.AddressGroup, count) nodes := make([]Node, count)
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
err := addrs[i].FromIterator(t.vectors[0][i]) err := nodes[i].addresses.FromIterator(t.vectors[0][i])
if err != nil { if err != nil {
// TODO: log error // TODO: log error
return nil return nil
@ -151,7 +161,7 @@ func (t *Traverser) Next() []network.AddressGroup {
t.vectors[0] = t.vectors[0][count:] t.vectors[0] = t.vectors[0][count:]
return addrs return nodes
} }
func (t *Traverser) skipEmptyVectors() { func (t *Traverser) skipEmptyVectors() {

View file

@ -95,7 +95,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
require.Len(t, addrs, len(nodes[i])) require.Len(t, addrs, len(nodes[i]))
for j, n := range nodes[i] { for j, n := range nodes[i] {
assertSameAddress(t, n.NodeInfo, addrs[j]) assertSameAddress(t, n.NodeInfo, addrs[j].Addresses())
} }
} }
@ -129,7 +129,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
err = n.FromIterator(nodes[1][0]) err = n.FromIterator(nodes[1][0])
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []network.AddressGroup{n}, tr.Next()) require.Equal(t, []Node{{addresses: n}}, tr.Next())
}) })
t.Run("put scenario", func(t *testing.T) { t.Run("put scenario", func(t *testing.T) {
@ -152,7 +152,7 @@ func TestTraverserObjectScenarios(t *testing.T) {
require.Len(t, addrs, replicas[curVector]) require.Len(t, addrs, replicas[curVector])
for j := range addrs { for j := range addrs {
assertSameAddress(t, nodes[curVector][i+j].NodeInfo, addrs[j]) assertSameAddress(t, nodes[curVector][i+j].NodeInfo, addrs[j].Addresses())
} }
} }