From f4877e7b42d84364de1d03f07d9ef199c615af15 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 30 Nov 2023 20:51:23 +0300 Subject: [PATCH] [#835] grpc: Try to reconnect if endpoint listen failed Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/accounting.go | 8 +- cmd/frostfs-node/config.go | 78 +++++++++-- cmd/frostfs-node/config/grpc/config.go | 15 ++ cmd/frostfs-node/container.go | 8 +- cmd/frostfs-node/grpc.go | 182 +++++++++++++++++-------- cmd/frostfs-node/netmap.go | 8 +- cmd/frostfs-node/object.go | 8 +- cmd/frostfs-node/session.go | 8 +- cmd/frostfs-node/tree.go | 8 +- internal/logs/logs.go | 5 + 10 files changed, 243 insertions(+), 85 deletions(-) diff --git a/cmd/frostfs-node/accounting.go b/cmd/frostfs-node/accounting.go index d04f34ff1..ec737f8a0 100644 --- a/cmd/frostfs-node/accounting.go +++ b/cmd/frostfs-node/accounting.go @@ -2,12 +2,14 @@ package main import ( "context" + "net" accountingGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting/grpc" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" accountingTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/accounting/grpc" accountingService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting" accounting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting/morph" + "google.golang.org/grpc" ) func initAccountingService(ctx context.Context, c *cfg) { @@ -28,7 +30,7 @@ func initAccountingService(ctx context.Context, c *cfg) { ), ) - for _, srv := range c.cfgGRPC.servers { - accountingGRPC.RegisterAccountingServiceServer(srv, server) - } + c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { + accountingGRPC.RegisterAccountingServiceServer(s, server) + }) } diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 27c028bae..e887c2e6c 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -460,18 +460,80 @@ func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error { return nil } +type grpcServer struct { + Listener net.Listener + Server *grpc.Server + Endpoint string +} + type cfgGRPC struct { - listeners []net.Listener - - servers []*grpc.Server - - endpoints []string - - maxChunkSize uint64 + // guard protects connections and handlers + guard sync.RWMutex + // servers must be protected with guard + servers []grpcServer + // handlers must be protected with guard + handlers []func(e string, l net.Listener, s *grpc.Server) + maxChunkSize uint64 maxAddrAmount uint64 } +func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) { + c.guard.Lock() + defer c.guard.Unlock() + + c.servers = append(c.servers, grpcServer{ + Listener: l, + Server: s, + Endpoint: e, + }) +} + +func (c *cfgGRPC) appendAndHandle(e string, l net.Listener, s *grpc.Server) { + c.guard.Lock() + defer c.guard.Unlock() + + c.servers = append(c.servers, grpcServer{ + Listener: l, + Server: s, + Endpoint: e, + }) + + for _, h := range c.handlers { + h(e, l, s) + } +} + +func (c *cfgGRPC) performAndSave(handler func(e string, l net.Listener, s *grpc.Server)) { + c.guard.Lock() + defer c.guard.Unlock() + + for _, conn := range c.servers { + handler(conn.Endpoint, conn.Listener, conn.Server) + } + + c.handlers = append(c.handlers, handler) +} + +func (c *cfgGRPC) dropConnection(endpoint string) { + c.guard.Lock() + defer c.guard.Unlock() + + pos := -1 + for idx, srv := range c.servers { + if srv.Endpoint == endpoint { + pos = idx + break + } + } + if pos < 0 { + return + } + + c.servers[pos].Server.Stop() // closes listener + c.servers = append(c.servers[0:pos], c.servers[pos+1:]...) +} + type cfgMorph struct { client *client.Client @@ -1232,7 +1294,7 @@ func (c *cfg) shutdown() { } c.ctxCancel() - c.done <- struct{}{} + close(c.done) for i := range c.closers { c.closers[len(c.closers)-1-i].fn() } diff --git a/cmd/frostfs-node/config/grpc/config.go b/cmd/frostfs-node/config/grpc/config.go index c25d2e717..37dd76426 100644 --- a/cmd/frostfs-node/config/grpc/config.go +++ b/cmd/frostfs-node/config/grpc/config.go @@ -3,6 +3,7 @@ package grpcconfig import ( "errors" "strconv" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" ) @@ -109,3 +110,17 @@ func IterateEndpoints(c *config.Config, f func(*Config)) { panic("no gRPC server configured") } } + +const DefaultReconnectInterval = time.Minute + +// ReconnectTimeout returns the value of "reconnect_interval" gRPC config parameter. +// +// Returns DefaultReconnectInterval if value is not defined or invalid. +func ReconnectTimeout(c *config.Config) time.Duration { + grpcConf := c.Sub("grpc") + ri := config.DurationSafe(grpcConf, "reconnect_interval") + if ri > 0 { + return ri + } + return DefaultReconnectInterval +} diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index 5477947d7..4fc7f5649 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -3,6 +3,7 @@ package main import ( "bytes" "context" + "net" containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -16,6 +17,7 @@ import ( cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "go.uber.org/zap" + "google.golang.org/grpc" ) func initContainerService(_ context.Context, c *cfg) { @@ -37,9 +39,9 @@ func initContainerService(_ context.Context, c *cfg) { ), ) - for _, srv := range c.cfgGRPC.servers { - containerGRPC.RegisterContainerServiceServer(srv, server) - } + c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { + containerGRPC.RegisterContainerServiceServer(s, server) + }) } func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) { diff --git a/cmd/frostfs-node/grpc.go b/cmd/frostfs-node/grpc.go index 74df7a18f..cc78440bf 100644 --- a/cmd/frostfs-node/grpc.go +++ b/cmd/frostfs-node/grpc.go @@ -3,7 +3,6 @@ package main import ( "crypto/tls" "errors" - "fmt" "net" "time" @@ -20,91 +19,154 @@ import ( const maxRecvMsgSize = 256 << 20 func initGRPC(c *cfg) { + var endpointsToReconnect []string var successCount int grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) { - serverOpts := []grpc.ServerOption{ - grpc.MaxRecvMsgSize(maxRecvMsgSize), - grpc.ChainUnaryInterceptor( - metrics.NewUnaryServerInterceptor(), - tracing.NewUnaryServerInterceptor(), - ), - grpc.ChainStreamInterceptor( - metrics.NewStreamServerInterceptor(), - tracing.NewStreamServerInterceptor(), - ), - } - - tlsCfg := sc.TLS() - - if tlsCfg != nil { - cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile()) - if err != nil { - c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err)) - return - } - - var cipherSuites []uint16 - if !tlsCfg.UseInsecureCrypto() { - // This more or less follows the list in https://wiki.mozilla.org/Security/Server_Side_TLS - // excluding: - // 1. TLS 1.3 suites need not be specified here. - // 2. Suites that use DH key exchange are not implemented by stdlib. - cipherSuites = []uint16{ - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256, - } - } - creds := credentials.NewTLS(&tls.Config{ - MinVersion: tls.VersionTLS12, - CipherSuites: cipherSuites, - Certificates: []tls.Certificate{cert}, - }) - - serverOpts = append(serverOpts, grpc.Creds(creds)) + serverOpts, ok := getGrpcServerOpts(c, sc) + if !ok { + return } lis, err := net.Listen("tcp", sc.Endpoint()) if err != nil { c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint()) c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err)) + endpointsToReconnect = append(endpointsToReconnect, sc.Endpoint()) return } c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint()) - c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis) - c.cfgGRPC.endpoints = append(c.cfgGRPC.endpoints, sc.Endpoint()) - srv := grpc.NewServer(serverOpts...) c.onShutdown(func() { stopGRPC("FrostFS Public API", srv, c.log) }) - c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv) + c.cfgGRPC.append(sc.Endpoint(), lis, srv) successCount++ }) if successCount == 0 { fatalOnErr(errors.New("could not listen to any gRPC endpoints")) } + + for _, endpoint := range endpointsToReconnect { + scheduleReconnect(endpoint, c) + } +} + +func scheduleReconnect(endpoint string, c *cfg) { + c.wg.Add(1) + go func() { + defer c.wg.Done() + + timeout := grpcconfig.ReconnectTimeout(c.appCfg) + t := time.NewTicker(timeout) + for { + select { + case <-t.C: + c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint)) + var success, found bool + grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) { + if sc.Endpoint() != endpoint { + return + } + found = true + serverOpts, ok := getGrpcServerOpts(c, sc) + if !ok { + return + } + lis, err := net.Listen("tcp", sc.Endpoint()) + if err != nil { + c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint()) + c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err)) + return + } + c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint()) + + srv := grpc.NewServer(serverOpts...) + + c.onShutdown(func() { + stopGRPC("FrostFS Public API", srv, c.log) + }) + + c.cfgGRPC.appendAndHandle(sc.Endpoint(), lis, srv) + success = true + }) + if !found { + c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint)) + return + } + + if success { + c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint)) + return + } + c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", timeout)) + case <-c.done: + return + } + } + }() +} + +func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) { + serverOpts := []grpc.ServerOption{ + grpc.MaxRecvMsgSize(maxRecvMsgSize), + grpc.ChainUnaryInterceptor( + metrics.NewUnaryServerInterceptor(), + tracing.NewUnaryServerInterceptor(), + ), + grpc.ChainStreamInterceptor( + metrics.NewStreamServerInterceptor(), + tracing.NewStreamServerInterceptor(), + ), + } + + tlsCfg := sc.TLS() + + if tlsCfg != nil { + cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile()) + if err != nil { + c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err)) + return nil, false + } + + var cipherSuites []uint16 + if !tlsCfg.UseInsecureCrypto() { + // This more or less follows the list in https://wiki.mozilla.org/Security/Server_Side_TLS + // excluding: + // 1. TLS 1.3 suites need not be specified here. + // 2. Suites that use DH key exchange are not implemented by stdlib. + cipherSuites = []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256, + } + } + creds := credentials.NewTLS(&tls.Config{ + MinVersion: tls.VersionTLS12, + CipherSuites: cipherSuites, + Certificates: []tls.Certificate{cert}, + }) + + serverOpts = append(serverOpts, grpc.Creds(creds)) + } + + return serverOpts, true } func serveGRPC(c *cfg) { - for i := range c.cfgGRPC.servers { + c.cfgGRPC.performAndSave(func(e string, l net.Listener, s *grpc.Server) { c.wg.Add(1) - srv := c.cfgGRPC.servers[i] - lis := c.cfgGRPC.listeners[i] - endpoint := c.cfgGRPC.endpoints[i] - go func() { defer func() { c.log.Info(logs.FrostFSNodeStopListeningGRPCEndpoint, - zap.Stringer("endpoint", lis.Addr()), + zap.Stringer("endpoint", l.Addr()), ) c.wg.Done() @@ -112,15 +174,17 @@ func serveGRPC(c *cfg) { c.log.Info(logs.FrostFSNodeStartListeningEndpoint, zap.String("service", "gRPC"), - zap.Stringer("endpoint", lis.Addr()), + zap.Stringer("endpoint", l.Addr()), ) - if err := srv.Serve(lis); err != nil { - c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint) - fmt.Println("gRPC server error", err) + if err := s.Serve(l); err != nil { + c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(e) + c.log.Error(logs.FrostFSNodeGRPCServerError, zap.Error(err)) + c.cfgGRPC.dropConnection(e) + scheduleReconnect(e, c) } }() - } + }) } func stopGRPC(name string, s *grpc.Server, l *logger.Logger) { diff --git a/cmd/frostfs-node/netmap.go b/cmd/frostfs-node/netmap.go index 347121e56..b21e842c5 100644 --- a/cmd/frostfs-node/netmap.go +++ b/cmd/frostfs-node/netmap.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "net" "sync/atomic" netmapGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc" @@ -21,6 +22,7 @@ import ( netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "go.uber.org/zap" + "google.golang.org/grpc" ) // primary solution of local network state dump. @@ -162,9 +164,9 @@ func initNetmapService(ctx context.Context, c *cfg) { ), ) - for _, srv := range c.cfgGRPC.servers { - netmapGRPC.RegisterNetmapServiceServer(srv, server) - } + c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { + netmapGRPC.RegisterNetmapServiceServer(s, server) + }) addNewEpochNotificationHandlers(c) } diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 59827179b..e84696e6b 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc" @@ -43,6 +44,7 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "go.uber.org/zap" + "google.golang.org/grpc" ) type objectSvc struct { @@ -204,9 +206,9 @@ func initObjectService(c *cfg) { signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg)) server := objectTransportGRPC.New(c.shared.metricsSvc) - for _, srv := range c.cfgGRPC.servers { - objectGRPC.RegisterObjectServiceServer(srv, server) - } + c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { + objectGRPC.RegisterObjectServiceServer(s, server) + }) } func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.ClientCache) { diff --git a/cmd/frostfs-node/session.go b/cmd/frostfs-node/session.go index f9c1811a1..ee21ec230 100644 --- a/cmd/frostfs-node/session.go +++ b/cmd/frostfs-node/session.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "net" "time" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" @@ -16,6 +17,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/persistent" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/temporary" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "google.golang.org/grpc" ) type sessionStorage interface { @@ -57,7 +59,7 @@ func initSessionService(c *cfg) { ), ) - for _, srv := range c.cfgGRPC.servers { - sessionGRPC.RegisterSessionServiceServer(srv, server) - } + c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { + sessionGRPC.RegisterSessionServiceServer(s, server) + }) } diff --git a/cmd/frostfs-node/tree.go b/cmd/frostfs-node/tree.go index f7c0f2a36..dced05bc2 100644 --- a/cmd/frostfs-node/tree.go +++ b/cmd/frostfs-node/tree.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "net" "time" treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree" @@ -15,6 +16,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.uber.org/zap" + "google.golang.org/grpc" ) type cnrSource struct { @@ -63,9 +65,9 @@ func initTreeService(c *cfg) { tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()), tree.WithMetrics(c.metricsCollector.TreeService())) - for _, srv := range c.cfgGRPC.servers { - tree.RegisterTreeServiceServer(srv, c.treeService) - } + c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { + tree.RegisterTreeServiceServer(s, c.treeService) + }) c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { c.treeService.Start(ctx) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 1775b18cd..c3f4fdc7f 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -423,6 +423,11 @@ const ( FrostFSNodeStoppingGRPCServer = "stopping gRPC server..." FrostFSNodeGRPCCannotShutdownGracefullyForcingStop = "gRPC cannot shutdown gracefully, forcing stop" FrostFSNodeGRPCServerStoppedSuccessfully = "gRPC server stopped successfully" + FrostFSNodeGRPCServerError = "gRPC server error" + FrostFSNodeGRPCReconnecting = "reconnecting gRPC server..." + FrostFSNodeGRPCReconnectedSuccessfully = "gRPC server reconnected successfully" + FrostFSNodeGRPCServerConfigNotFound = "gRPC server config not found" + FrostFSNodeGRPCReconnectFailed = "failed to reconnect gRPC server" FrostFSNodeWaitingForAllProcessesToStop = "waiting for all processes to stop" FrostFSNodeStartedLocalNodesMaintenance = "started local node's maintenance" FrostFSNodeStoppedLocalNodesMaintenance = "stopped local node's maintenance"