diff --git a/cmd/frostfs-cli/modules/netmap/nodeinfo.go b/cmd/frostfs-cli/modules/netmap/nodeinfo.go index ae4bb329a0..316d18d2b7 100644 --- a/cmd/frostfs-cli/modules/netmap/nodeinfo.go +++ b/cmd/frostfs-cli/modules/netmap/nodeinfo.go @@ -62,9 +62,9 @@ func prettyPrintNodeInfo(cmd *cobra.Command, i netmap.NodeInfo) { cmd.Println("state:", stateWord) - netmap.IterateNetworkEndpoints(i, func(s string) { + for s := range i.NetworkEndpoints() { cmd.Println("address:", s) - }) + } i.IterateAttributes(func(key, value string) { cmd.Printf("attribute: %s=%s\n", key, value) diff --git a/cmd/frostfs-cli/modules/object/nodes.go b/cmd/frostfs-cli/modules/object/nodes.go index 734b557a43..4762386519 100644 --- a/cmd/frostfs-cli/modules/object/nodes.go +++ b/cmd/frostfs-cli/modules/object/nodes.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "slices" "sync" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client" @@ -460,17 +461,11 @@ func createClient(ctx context.Context, cmd *cobra.Command, candidate netmapSDK.N var cli *client.Client var addresses []string if preferInternal, _ := cmd.Flags().GetBool(preferInternalAddressesFlag); preferInternal { - candidate.IterateNetworkEndpoints(func(s string) bool { - addresses = append(addresses, s) - return false - }) + addresses = slices.AppendSeq(addresses, candidate.NetworkEndpoints()) addresses = append(addresses, candidate.ExternalAddresses()...) } else { addresses = append(addresses, candidate.ExternalAddresses()...) - candidate.IterateNetworkEndpoints(func(s string) bool { - addresses = append(addresses, s) - return false - }) + addresses = slices.AppendSeq(addresses, candidate.NetworkEndpoints()) } var lastErr error diff --git a/cmd/frostfs-node/cache.go b/cmd/frostfs-node/cache.go index 38cee5837b..e5df0a22d0 100644 --- a/cmd/frostfs-node/cache.go +++ b/cmd/frostfs-node/cache.go @@ -235,16 +235,8 @@ func (s *lruNetmapSource) updateCandidates(ctx context.Context, d time.Duration) slices.Compare(n1.ExternalAddresses(), n2.ExternalAddresses()) != 0 { return 1 } - var ne1 []string - n1.IterateNetworkEndpoints(func(s string) bool { - ne1 = append(ne1, s) - return false - }) - var ne2 []string - n2.IterateNetworkEndpoints(func(s string) bool { - ne2 = append(ne2, s) - return false - }) + ne1 := slices.Collect(n1.NetworkEndpoints()) + ne2 := slices.Collect(n2.NetworkEndpoints()) return slices.Compare(ne1, ne2) }) if ret != 0 { @@ -364,15 +356,9 @@ func getNetMapNodesToUpdate(nm *netmapSDK.NetMap, candidates []netmapSDK.NodeInf } nodeEndpoints := make([]string, 0, nm.Nodes()[i].NumberOfNetworkEndpoints()) - nm.Nodes()[i].IterateNetworkEndpoints(func(s string) bool { - nodeEndpoints = append(nodeEndpoints, s) - return false - }) + nodeEndpoints = slices.AppendSeq(nodeEndpoints, nm.Nodes()[i].NetworkEndpoints()) candidateEndpoints := make([]string, 0, cnd.NumberOfNetworkEndpoints()) - cnd.IterateNetworkEndpoints(func(s string) bool { - candidateEndpoints = append(candidateEndpoints, s) - return false - }) + candidateEndpoints = slices.AppendSeq(candidateEndpoints, cnd.NetworkEndpoints()) if slices.Compare(nodeEndpoints, candidateEndpoints) != 0 { update = true tmp.endpoints = candidateEndpoints diff --git a/cmd/frostfs-node/netmap.go b/cmd/frostfs-node/netmap.go index 0e90e77075..6d57edcce5 100644 --- a/cmd/frostfs-node/netmap.go +++ b/cmd/frostfs-node/netmap.go @@ -124,7 +124,11 @@ func nodeKeyFromNetmap(c *cfg) []byte { func (c *cfg) iterateNetworkAddresses(f func(string) bool) { ni, ok := c.cfgNetmap.state.getNodeInfo() if ok { - ni.IterateNetworkEndpoints(f) + for s := range ni.NetworkEndpoints() { + if f(s) { + return + } + } } } diff --git a/cmd/internal/common/netmap.go b/cmd/internal/common/netmap.go index f550552d21..334b662f82 100644 --- a/cmd/internal/common/netmap.go +++ b/cmd/internal/common/netmap.go @@ -27,9 +27,9 @@ func PrettyPrintNodeInfo(cmd *cobra.Command, node netmap.NodeInfo, cmd.Printf("%sNode %d: %s %s ", indent, index+1, hex.EncodeToString(node.PublicKey()), strState) - netmap.IterateNetworkEndpoints(node, func(endpoint string) { + for endpoint := range node.NetworkEndpoints() { cmd.Printf("%s ", endpoint) - }) + } cmd.Println() if !short { diff --git a/pkg/core/netmap/nodes.go b/pkg/core/netmap/nodes.go index b0c9e1f9e1..f01c07b194 100644 --- a/pkg/core/netmap/nodes.go +++ b/pkg/core/netmap/nodes.go @@ -17,7 +17,11 @@ func (x Node) PublicKey() []byte { // IterateAddresses iterates over all announced network addresses // and passes them into f. Handler MUST NOT be nil. func (x Node) IterateAddresses(f func(string) bool) { - (netmap.NodeInfo)(x).IterateNetworkEndpoints(f) + for s := range (netmap.NodeInfo)(x).NetworkEndpoints() { + if f(s) { + return + } + } } // NumberOfAddresses returns number of announced network addresses. diff --git a/pkg/network/validation.go b/pkg/network/validation.go index 92f6501193..73a3ef8d71 100644 --- a/pkg/network/validation.go +++ b/pkg/network/validation.go @@ -35,7 +35,11 @@ var ( type NodeEndpointsIterator netmap.NodeInfo func (x NodeEndpointsIterator) IterateAddresses(f func(string) bool) { - (netmap.NodeInfo)(x).IterateNetworkEndpoints(f) + for s := range (netmap.NodeInfo)(x).NetworkEndpoints() { + if f(s) { + return + } + } } func (x NodeEndpointsIterator) NumberOfAddresses() int { diff --git a/pkg/services/replicator/pull.go b/pkg/services/replicator/pull.go index bb38c72ad6..216fe4919c 100644 --- a/pkg/services/replicator/pull.go +++ b/pkg/services/replicator/pull.go @@ -3,6 +3,7 @@ package replicator import ( "context" "errors" + "slices" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" @@ -42,11 +43,7 @@ func (p *Replicator) HandlePullTask(ctx context.Context, task Task) { if err == nil { break } - var endpoints []string - node.IterateNetworkEndpoints(func(s string) bool { - endpoints = append(endpoints, s) - return false - }) + endpoints := slices.Collect(node.NetworkEndpoints()) p.log.Error(ctx, logs.ReplicatorCouldNotGetObjectFromRemoteStorage, zap.Stringer("object", task.Addr), zap.Error(err), diff --git a/pkg/services/tree/redirect.go b/pkg/services/tree/redirect.go index d92c749a82..3dcdc4fc79 100644 --- a/pkg/services/tree/redirect.go +++ b/pkg/services/tree/redirect.go @@ -41,24 +41,15 @@ func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo var called bool for _, n := range cntNodes { var stop bool - n.IterateNetworkEndpoints(func(endpoint string) bool { - ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.IterateNetworkEndpoints", - trace.WithAttributes( - attribute.String("endpoint", endpoint), - )) - defer span.End() - - c, err := s.cache.get(ctx, endpoint) - if err != nil { - return false + for endpoint := range n.NetworkEndpoints() { + stop = s.execOnClient(ctx, endpoint, func(c TreeServiceClient) bool { + called = true + return f(c) + }) + if called { + break } - - s.log.Debug(ctx, logs.TreeRedirectingTreeServiceQuery, zap.String("endpoint", endpoint)) - - called = true - stop = f(c) - return true - }) + } if stop { return nil } @@ -68,3 +59,19 @@ func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo } return nil } + +func (s *Service) execOnClient(ctx context.Context, endpoint string, f func(TreeServiceClient) bool) bool { + ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.IterateNetworkEndpoints", + trace.WithAttributes( + attribute.String("endpoint", endpoint), + )) + defer span.End() + + c, err := s.cache.get(ctx, endpoint) + if err != nil { + return false + } + + s.log.Debug(ctx, logs.TreeRedirectingTreeServiceQuery, zap.String("endpoint", endpoint)) + return f(c) +} diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 01a4ffde03..ee40884ebc 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -89,29 +89,13 @@ func (s *Service) ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req var lastErr error var lastAddr string - n.IterateNetworkEndpoints(func(addr string) bool { - ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint", - trace.WithAttributes( - attribute.String("public_key", hex.EncodeToString(n.PublicKey())), - attribute.String("address", addr), - ), - ) - defer span.End() - + for addr := range n.NetworkEndpoints() { lastAddr = addr - - c, err := s.cache.get(ctx, addr) - if err != nil { - lastErr = fmt.Errorf("can't create client: %w", err) - return false + lastErr = s.apply(ctx, n, addr, req) + if lastErr == nil { + break } - - ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout) - _, lastErr = c.Apply(ctx, req) - cancel() - - return lastErr == nil - }) + } if lastErr != nil { if errors.Is(lastErr, errRecentlyFailed) { @@ -130,6 +114,26 @@ func (s *Service) ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req return nil } +func (s *Service) apply(ctx context.Context, n netmapSDK.NodeInfo, addr string, req *ApplyRequest) error { + ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint", + trace.WithAttributes( + attribute.String("public_key", hex.EncodeToString(n.PublicKey())), + attribute.String("address", addr), + ), + ) + defer span.End() + + c, err := s.cache.get(ctx, addr) + if err != nil { + return fmt.Errorf("can't create client: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout) + _, err = c.Apply(ctx, req) + cancel() + return err +} + func (s *Service) replicateLoop(ctx context.Context) { for range s.replicatorWorkerCount { go s.replicationWorker(ctx) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index c3796fbd40..32297f9d70 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -297,27 +297,27 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, for i, n := range nodes { errGroup.Go(func() error { var nodeSynced bool - n.IterateNetworkEndpoints(func(addr string) bool { + for addr := range n.NetworkEndpoints() { var a network.Address if err := a.FromString(addr); err != nil { s.log.Warn(ctx, logs.TreeFailedToParseAddressForTreeSynchronization, zap.Error(err), zap.String("address", addr)) - return false + continue } cc, err := createConnection(a, grpc.WithContextDialer(s.ds.GrpcContextDialer())) if err != nil { s.log.Warn(ctx, logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr)) - return false + continue } - defer cc.Close() err = s.startStream(egCtx, cid, treeID, from, cc, nodeOperationStreams[i]) if err != nil { s.log.Warn(ctx, logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr)) } nodeSynced = err == nil - return true - }) + _ = cc.Close() + break + } close(nodeOperationStreams[i]) if !nodeSynced { allNodesSynced.Store(false)