package main import ( "crypto/tls" "errors" "net" "time" grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc" tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) const maxRecvMsgSize = 256 << 20 func initGRPC(c *cfg) { var endpointsToReconnect []string var successCount int grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) { serverOpts, ok := getGrpcServerOpts(c, sc) if !ok { return } lis, err := net.Listen("tcp", sc.Endpoint()) if err != nil { c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint()) c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err)) endpointsToReconnect = append(endpointsToReconnect, sc.Endpoint()) return } c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint()) srv := grpc.NewServer(serverOpts...) c.onShutdown(func() { stopGRPC("FrostFS Public API", srv, c.log) }) c.cfgGRPC.append(sc.Endpoint(), lis, srv) successCount++ }) if successCount == 0 { fatalOnErr(errors.New("could not listen to any gRPC endpoints")) } c.cfgGRPC.reconnectTimeout = grpcconfig.ReconnectTimeout(c.appCfg) for _, endpoint := range endpointsToReconnect { scheduleReconnect(endpoint, c) } } func scheduleReconnect(endpoint string, c *cfg) { c.wg.Add(1) go func() { defer c.wg.Done() t := time.NewTicker(c.cfgGRPC.reconnectTimeout) for { select { case <-t.C: if tryReconnect(endpoint, c) { return } case <-c.done: return } } }() } func tryReconnect(endpoint string, c *cfg) bool { c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint)) serverOpts, found := getGRPCEndpointOpts(endpoint, c) if !found { c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint)) return true } lis, err := net.Listen("tcp", endpoint) if err != nil { c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint) c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err)) c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", c.cfgGRPC.reconnectTimeout)) return false } c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint) srv := grpc.NewServer(serverOpts...) c.onShutdown(func() { stopGRPC("FrostFS Public API", srv, c.log) }) c.cfgGRPC.appendAndHandle(endpoint, lis, srv) c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint)) return true } func getGRPCEndpointOpts(endpoint string, c *cfg) (result []grpc.ServerOption, found bool) { unlock := c.LockAppConfigShared() defer unlock() grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) { if found { return } if sc.Endpoint() != endpoint { return } var ok bool result, ok = getGrpcServerOpts(c, sc) if !ok { return } found = true }) return } func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) { serverOpts := []grpc.ServerOption{ grpc.MaxRecvMsgSize(maxRecvMsgSize), grpc.ChainUnaryInterceptor( metrics.NewUnaryServerInterceptor(), tracing.NewUnaryServerInterceptor(), ), grpc.ChainStreamInterceptor( metrics.NewStreamServerInterceptor(), tracing.NewStreamServerInterceptor(), ), } tlsCfg := sc.TLS() if tlsCfg != nil { cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile()) if err != nil { c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err)) return nil, false } var cipherSuites []uint16 if !tlsCfg.UseInsecureCrypto() { // This more or less follows the list in https://wiki.mozilla.org/Security/Server_Side_TLS // excluding: // 1. TLS 1.3 suites need not be specified here. // 2. Suites that use DH key exchange are not implemented by stdlib. cipherSuites = []uint16{ tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256, tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256, } } creds := credentials.NewTLS(&tls.Config{ MinVersion: tls.VersionTLS12, CipherSuites: cipherSuites, Certificates: []tls.Certificate{cert}, }) serverOpts = append(serverOpts, grpc.Creds(creds)) } return serverOpts, true } func serveGRPC(c *cfg) { c.cfgGRPC.performAndSave(func(e string, l net.Listener, s *grpc.Server) { c.wg.Add(1) go func() { defer func() { c.log.Info(logs.FrostFSNodeStopListeningGRPCEndpoint, zap.Stringer("endpoint", l.Addr()), ) c.wg.Done() }() c.log.Info(logs.FrostFSNodeStartListeningEndpoint, zap.String("service", "gRPC"), zap.Stringer("endpoint", l.Addr()), ) if err := s.Serve(l); err != nil { c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(e) c.log.Error(logs.FrostFSNodeGRPCServerError, zap.Error(err)) c.cfgGRPC.dropConnection(e) scheduleReconnect(e, c) } }() }) } func stopGRPC(name string, s *grpc.Server, l *logger.Logger) { l = &logger.Logger{Logger: l.With(zap.String("name", name))} l.Info(logs.FrostFSNodeStoppingGRPCServer) // GracefulStop() may freeze forever, see #1270 done := make(chan struct{}) go func() { s.GracefulStop() close(done) }() select { case <-done: case <-time.After(1 * time.Minute): l.Info(logs.FrostFSNodeGRPCCannotShutdownGracefullyForcingStop) s.Stop() } l.Info(logs.FrostFSNodeGRPCServerStoppedSuccessfully) }