forked from TrueCloudLab/frostfs-node
[#1689] Remove deprecated NodeInfo.IterateNetworkEndpoints()
Change-Id: Ic78f18aed11fab34ee3147ceea657296b89fe60c Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
56d09a9957
commit
bf06c4fb4b
11 changed files with 83 additions and 82 deletions
|
@ -62,9 +62,9 @@ func prettyPrintNodeInfo(cmd *cobra.Command, i netmap.NodeInfo) {
|
||||||
|
|
||||||
cmd.Println("state:", stateWord)
|
cmd.Println("state:", stateWord)
|
||||||
|
|
||||||
netmap.IterateNetworkEndpoints(i, func(s string) {
|
for s := range i.NetworkEndpoints() {
|
||||||
cmd.Println("address:", s)
|
cmd.Println("address:", s)
|
||||||
})
|
}
|
||||||
|
|
||||||
i.IterateAttributes(func(key, value string) {
|
i.IterateAttributes(func(key, value string) {
|
||||||
cmd.Printf("attribute: %s=%s\n", key, value)
|
cmd.Printf("attribute: %s=%s\n", key, value)
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||||
|
@ -460,17 +461,11 @@ func createClient(ctx context.Context, cmd *cobra.Command, candidate netmapSDK.N
|
||||||
var cli *client.Client
|
var cli *client.Client
|
||||||
var addresses []string
|
var addresses []string
|
||||||
if preferInternal, _ := cmd.Flags().GetBool(preferInternalAddressesFlag); preferInternal {
|
if preferInternal, _ := cmd.Flags().GetBool(preferInternalAddressesFlag); preferInternal {
|
||||||
candidate.IterateNetworkEndpoints(func(s string) bool {
|
addresses = slices.AppendSeq(addresses, candidate.NetworkEndpoints())
|
||||||
addresses = append(addresses, s)
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
addresses = append(addresses, candidate.ExternalAddresses()...)
|
addresses = append(addresses, candidate.ExternalAddresses()...)
|
||||||
} else {
|
} else {
|
||||||
addresses = append(addresses, candidate.ExternalAddresses()...)
|
addresses = append(addresses, candidate.ExternalAddresses()...)
|
||||||
candidate.IterateNetworkEndpoints(func(s string) bool {
|
addresses = slices.AppendSeq(addresses, candidate.NetworkEndpoints())
|
||||||
addresses = append(addresses, s)
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var lastErr error
|
var lastErr error
|
||||||
|
|
|
@ -235,16 +235,8 @@ func (s *lruNetmapSource) updateCandidates(ctx context.Context, d time.Duration)
|
||||||
slices.Compare(n1.ExternalAddresses(), n2.ExternalAddresses()) != 0 {
|
slices.Compare(n1.ExternalAddresses(), n2.ExternalAddresses()) != 0 {
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
var ne1 []string
|
ne1 := slices.Collect(n1.NetworkEndpoints())
|
||||||
n1.IterateNetworkEndpoints(func(s string) bool {
|
ne2 := slices.Collect(n2.NetworkEndpoints())
|
||||||
ne1 = append(ne1, s)
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
var ne2 []string
|
|
||||||
n2.IterateNetworkEndpoints(func(s string) bool {
|
|
||||||
ne2 = append(ne2, s)
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
return slices.Compare(ne1, ne2)
|
return slices.Compare(ne1, ne2)
|
||||||
})
|
})
|
||||||
if ret != 0 {
|
if ret != 0 {
|
||||||
|
@ -364,15 +356,9 @@ func getNetMapNodesToUpdate(nm *netmapSDK.NetMap, candidates []netmapSDK.NodeInf
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeEndpoints := make([]string, 0, nm.Nodes()[i].NumberOfNetworkEndpoints())
|
nodeEndpoints := make([]string, 0, nm.Nodes()[i].NumberOfNetworkEndpoints())
|
||||||
nm.Nodes()[i].IterateNetworkEndpoints(func(s string) bool {
|
nodeEndpoints = slices.AppendSeq(nodeEndpoints, nm.Nodes()[i].NetworkEndpoints())
|
||||||
nodeEndpoints = append(nodeEndpoints, s)
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
candidateEndpoints := make([]string, 0, cnd.NumberOfNetworkEndpoints())
|
candidateEndpoints := make([]string, 0, cnd.NumberOfNetworkEndpoints())
|
||||||
cnd.IterateNetworkEndpoints(func(s string) bool {
|
candidateEndpoints = slices.AppendSeq(candidateEndpoints, cnd.NetworkEndpoints())
|
||||||
candidateEndpoints = append(candidateEndpoints, s)
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
if slices.Compare(nodeEndpoints, candidateEndpoints) != 0 {
|
if slices.Compare(nodeEndpoints, candidateEndpoints) != 0 {
|
||||||
update = true
|
update = true
|
||||||
tmp.endpoints = candidateEndpoints
|
tmp.endpoints = candidateEndpoints
|
||||||
|
|
|
@ -124,7 +124,11 @@ func nodeKeyFromNetmap(c *cfg) []byte {
|
||||||
func (c *cfg) iterateNetworkAddresses(f func(string) bool) {
|
func (c *cfg) iterateNetworkAddresses(f func(string) bool) {
|
||||||
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
||||||
if ok {
|
if ok {
|
||||||
ni.IterateNetworkEndpoints(f)
|
for s := range ni.NetworkEndpoints() {
|
||||||
|
if f(s) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,9 +27,9 @@ func PrettyPrintNodeInfo(cmd *cobra.Command, node netmap.NodeInfo,
|
||||||
|
|
||||||
cmd.Printf("%sNode %d: %s %s ", indent, index+1, hex.EncodeToString(node.PublicKey()), strState)
|
cmd.Printf("%sNode %d: %s %s ", indent, index+1, hex.EncodeToString(node.PublicKey()), strState)
|
||||||
|
|
||||||
netmap.IterateNetworkEndpoints(node, func(endpoint string) {
|
for endpoint := range node.NetworkEndpoints() {
|
||||||
cmd.Printf("%s ", endpoint)
|
cmd.Printf("%s ", endpoint)
|
||||||
})
|
}
|
||||||
cmd.Println()
|
cmd.Println()
|
||||||
|
|
||||||
if !short {
|
if !short {
|
||||||
|
|
|
@ -17,7 +17,11 @@ func (x Node) PublicKey() []byte {
|
||||||
// IterateAddresses iterates over all announced network addresses
|
// IterateAddresses iterates over all announced network addresses
|
||||||
// and passes them into f. Handler MUST NOT be nil.
|
// and passes them into f. Handler MUST NOT be nil.
|
||||||
func (x Node) IterateAddresses(f func(string) bool) {
|
func (x Node) IterateAddresses(f func(string) bool) {
|
||||||
(netmap.NodeInfo)(x).IterateNetworkEndpoints(f)
|
for s := range (netmap.NodeInfo)(x).NetworkEndpoints() {
|
||||||
|
if f(s) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NumberOfAddresses returns number of announced network addresses.
|
// NumberOfAddresses returns number of announced network addresses.
|
||||||
|
|
|
@ -35,7 +35,11 @@ var (
|
||||||
type NodeEndpointsIterator netmap.NodeInfo
|
type NodeEndpointsIterator netmap.NodeInfo
|
||||||
|
|
||||||
func (x NodeEndpointsIterator) IterateAddresses(f func(string) bool) {
|
func (x NodeEndpointsIterator) IterateAddresses(f func(string) bool) {
|
||||||
(netmap.NodeInfo)(x).IterateNetworkEndpoints(f)
|
for s := range (netmap.NodeInfo)(x).NetworkEndpoints() {
|
||||||
|
if f(s) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x NodeEndpointsIterator) NumberOfAddresses() int {
|
func (x NodeEndpointsIterator) NumberOfAddresses() int {
|
||||||
|
|
|
@ -3,6 +3,7 @@ package replicator
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"slices"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
|
@ -42,11 +43,7 @@ func (p *Replicator) HandlePullTask(ctx context.Context, task Task) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
var endpoints []string
|
endpoints := slices.Collect(node.NetworkEndpoints())
|
||||||
node.IterateNetworkEndpoints(func(s string) bool {
|
|
||||||
endpoints = append(endpoints, s)
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
p.log.Error(ctx, logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
|
p.log.Error(ctx, logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
|
||||||
zap.Stringer("object", task.Addr),
|
zap.Stringer("object", task.Addr),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
|
|
|
@ -41,7 +41,26 @@ func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo
|
||||||
var called bool
|
var called bool
|
||||||
for _, n := range cntNodes {
|
for _, n := range cntNodes {
|
||||||
var stop bool
|
var stop bool
|
||||||
n.IterateNetworkEndpoints(func(endpoint string) bool {
|
for endpoint := range n.NetworkEndpoints() {
|
||||||
|
stop = s.execOnClient(ctx, endpoint, func(c TreeServiceClient) bool {
|
||||||
|
called = true
|
||||||
|
return f(c)
|
||||||
|
})
|
||||||
|
if called {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if stop {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !called {
|
||||||
|
return errNoSuitableNode
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) execOnClient(ctx context.Context, endpoint string, f func(TreeServiceClient) bool) bool {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.IterateNetworkEndpoints",
|
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.IterateNetworkEndpoints",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("endpoint", endpoint),
|
attribute.String("endpoint", endpoint),
|
||||||
|
@ -54,17 +73,5 @@ func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
s.log.Debug(ctx, logs.TreeRedirectingTreeServiceQuery, zap.String("endpoint", endpoint))
|
s.log.Debug(ctx, logs.TreeRedirectingTreeServiceQuery, zap.String("endpoint", endpoint))
|
||||||
|
return f(c)
|
||||||
called = true
|
|
||||||
stop = f(c)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
if stop {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !called {
|
|
||||||
return errNoSuitableNode
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,29 +89,13 @@ func (s *Service) ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req
|
||||||
var lastErr error
|
var lastErr error
|
||||||
var lastAddr string
|
var lastAddr string
|
||||||
|
|
||||||
n.IterateNetworkEndpoints(func(addr string) bool {
|
for addr := range n.NetworkEndpoints() {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint",
|
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
|
|
||||||
attribute.String("address", addr),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
lastAddr = addr
|
lastAddr = addr
|
||||||
|
lastErr = s.apply(ctx, n, addr, req)
|
||||||
c, err := s.cache.get(ctx, addr)
|
if lastErr == nil {
|
||||||
if err != nil {
|
break
|
||||||
lastErr = fmt.Errorf("can't create client: %w", err)
|
}
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
|
|
||||||
_, lastErr = c.Apply(ctx, req)
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
return lastErr == nil
|
|
||||||
})
|
|
||||||
|
|
||||||
if lastErr != nil {
|
if lastErr != nil {
|
||||||
if errors.Is(lastErr, errRecentlyFailed) {
|
if errors.Is(lastErr, errRecentlyFailed) {
|
||||||
|
@ -130,6 +114,26 @@ func (s *Service) ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) apply(ctx context.Context, n netmapSDK.NodeInfo, addr string, req *ApplyRequest) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
|
||||||
|
attribute.String("address", addr),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
c, err := s.cache.get(ctx, addr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't create client: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
|
||||||
|
_, err = c.Apply(ctx, req)
|
||||||
|
cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) replicateLoop(ctx context.Context) {
|
func (s *Service) replicateLoop(ctx context.Context) {
|
||||||
for range s.replicatorWorkerCount {
|
for range s.replicatorWorkerCount {
|
||||||
go s.replicationWorker(ctx)
|
go s.replicationWorker(ctx)
|
||||||
|
|
|
@ -297,27 +297,27 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
||||||
for i, n := range nodes {
|
for i, n := range nodes {
|
||||||
errGroup.Go(func() error {
|
errGroup.Go(func() error {
|
||||||
var nodeSynced bool
|
var nodeSynced bool
|
||||||
n.IterateNetworkEndpoints(func(addr string) bool {
|
for addr := range n.NetworkEndpoints() {
|
||||||
var a network.Address
|
var a network.Address
|
||||||
if err := a.FromString(addr); err != nil {
|
if err := a.FromString(addr); err != nil {
|
||||||
s.log.Warn(ctx, logs.TreeFailedToParseAddressForTreeSynchronization, zap.Error(err), zap.String("address", addr))
|
s.log.Warn(ctx, logs.TreeFailedToParseAddressForTreeSynchronization, zap.Error(err), zap.String("address", addr))
|
||||||
return false
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
cc, err := createConnection(a, grpc.WithContextDialer(s.ds.GrpcContextDialer()))
|
cc, err := createConnection(a, grpc.WithContextDialer(s.ds.GrpcContextDialer()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn(ctx, logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
|
s.log.Warn(ctx, logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
|
||||||
return false
|
continue
|
||||||
}
|
}
|
||||||
defer cc.Close()
|
|
||||||
|
|
||||||
err = s.startStream(egCtx, cid, treeID, from, cc, nodeOperationStreams[i])
|
err = s.startStream(egCtx, cid, treeID, from, cc, nodeOperationStreams[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn(ctx, logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr))
|
s.log.Warn(ctx, logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr))
|
||||||
}
|
}
|
||||||
nodeSynced = err == nil
|
nodeSynced = err == nil
|
||||||
return true
|
_ = cc.Close()
|
||||||
})
|
break
|
||||||
|
}
|
||||||
close(nodeOperationStreams[i])
|
close(nodeOperationStreams[i])
|
||||||
if !nodeSynced {
|
if !nodeSynced {
|
||||||
allNodesSynced.Store(false)
|
allNodesSynced.Store(false)
|
||||||
|
|
Loading…
Add table
Reference in a new issue