Reconnect gRPC servers #836

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:feat/grpc_init_lazy into master 2024-09-04 19:51:04 +00:00
10 changed files with 280 additions and 84 deletions

View file

@ -2,12 +2,14 @@ package main
import (
"context"
"net"
accountingGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
accountingTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/accounting/grpc"
accountingService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting"
accounting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting/morph"
"google.golang.org/grpc"
)
func initAccountingService(ctx context.Context, c *cfg) {
@ -28,7 +30,7 @@ func initAccountingService(ctx context.Context, c *cfg) {
),
)
for _, srv := range c.cfgGRPC.servers {
accountingGRPC.RegisterAccountingServiceServer(srv, server)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
accountingGRPC.RegisterAccountingServiceServer(s, server)
})
}

View file

@ -426,11 +426,26 @@ type dynamicConfiguration struct {
metrics *httpComponent
}
type appConfigGuard struct {
mtx sync.RWMutex
}
func (g *appConfigGuard) LockAppConfigShared() func() {
g.mtx.RLock()
return func() { g.mtx.RUnlock() }
}
func (g *appConfigGuard) LockAppConfigExclusive() func() {
g.mtx.Lock()
return func() { g.mtx.Unlock() }
}
type cfg struct {
applicationConfiguration
internals
shared
dynamicConfiguration
appConfigGuard
// configuration of the internal
// services
@ -460,16 +475,79 @@ func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error {
return nil
}
type grpcServer struct {
Listener net.Listener
Server *grpc.Server
Endpoint string
}
type cfgGRPC struct {
listeners []net.Listener
// guard protects connections and handlers
guard sync.RWMutex
// servers must be protected with guard
servers []grpcServer
// handlers must be protected with guard
handlers []func(e string, l net.Listener, s *grpc.Server)
servers []*grpc.Server
maxChunkSize uint64
maxAddrAmount uint64
reconnectTimeout time.Duration
}
endpoints []string
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
c.guard.Lock()
defer c.guard.Unlock()
maxChunkSize uint64
c.servers = append(c.servers, grpcServer{
Listener: l,
Server: s,
Endpoint: e,
})
}
maxAddrAmount uint64
func (c *cfgGRPC) appendAndHandle(e string, l net.Listener, s *grpc.Server) {
c.guard.Lock()
defer c.guard.Unlock()
c.servers = append(c.servers, grpcServer{
Listener: l,
Server: s,
Endpoint: e,
})
for _, h := range c.handlers {
h(e, l, s)
}
}
func (c *cfgGRPC) performAndSave(handler func(e string, l net.Listener, s *grpc.Server)) {
c.guard.Lock()
defer c.guard.Unlock()
for _, conn := range c.servers {
handler(conn.Endpoint, conn.Listener, conn.Server)
}
c.handlers = append(c.handlers, handler)
}
func (c *cfgGRPC) dropConnection(endpoint string) {
c.guard.Lock()
defer c.guard.Unlock()
pos := -1
for idx, srv := range c.servers {
if srv.Endpoint == endpoint {
pos = idx
break
}
}
if pos < 0 {
return
}
c.servers[pos].Server.Stop() // closes listener
c.servers = append(c.servers[0:pos], c.servers[pos+1:]...)
}
type cfgMorph struct {
@ -1142,7 +1220,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
}
defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
err := c.readConfig(c.appCfg)
err := c.reloadAppConfig()
if err != nil {
c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
return
@ -1209,6 +1287,13 @@ func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
}
func (c *cfg) reloadAppConfig() error {
unlock := c.LockAppConfigExclusive()
defer unlock()
return c.readConfig(c.appCfg)
}
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)
@ -1232,7 +1317,7 @@ func (c *cfg) shutdown() {
}
c.ctxCancel()
c.done <- struct{}{}
close(c.done)
for i := range c.closers {
c.closers[len(c.closers)-1-i].fn()
}

View file

@ -3,6 +3,7 @@ package grpcconfig
import (
"errors"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
)
@ -109,3 +110,17 @@ func IterateEndpoints(c *config.Config, f func(*Config)) {
panic("no gRPC server configured")
}
}
const DefaultReconnectInterval = time.Minute
// ReconnectTimeout returns the value of "reconnect_interval" gRPC config parameter.
//
// Returns DefaultReconnectInterval if value is not defined or invalid.
func ReconnectTimeout(c *config.Config) time.Duration {
grpcConf := c.Sub("grpc")
ri := config.DurationSafe(grpcConf, "reconnect_interval")
if ri > 0 {
return ri
}
return DefaultReconnectInterval
}

View file

@ -3,6 +3,7 @@ package main
import (
"bytes"
"context"
"net"
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -16,6 +17,7 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
"google.golang.org/grpc"
)
func initContainerService(_ context.Context, c *cfg) {
@ -37,9 +39,9 @@ func initContainerService(_ context.Context, c *cfg) {
),
)
for _, srv := range c.cfgGRPC.servers {
containerGRPC.RegisterContainerServiceServer(srv, server)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
containerGRPC.RegisterContainerServiceServer(s, server)
})
}
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {

View file

@ -3,7 +3,6 @@ package main
import (
"crypto/tls"
"errors"
"fmt"
"net"
"time"
@ -20,91 +19,169 @@ import (
const maxRecvMsgSize = 256 << 20
func initGRPC(c *cfg) {
var endpointsToReconnect []string
var successCount int
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
serverOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(maxRecvMsgSize),
grpc.ChainUnaryInterceptor(
metrics.NewUnaryServerInterceptor(),
tracing.NewUnaryServerInterceptor(),
),
grpc.ChainStreamInterceptor(
metrics.NewStreamServerInterceptor(),
tracing.NewStreamServerInterceptor(),
),
}
tlsCfg := sc.TLS()
if tlsCfg != nil {
cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile())
if err != nil {
c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err))
return
}
var cipherSuites []uint16
if !tlsCfg.UseInsecureCrypto() {
// This more or less follows the list in https://wiki.mozilla.org/Security/Server_Side_TLS
// excluding:
// 1. TLS 1.3 suites need not be specified here.
// 2. Suites that use DH key exchange are not implemented by stdlib.
cipherSuites = []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
}
}
creds := credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
CipherSuites: cipherSuites,
Certificates: []tls.Certificate{cert},
})
serverOpts = append(serverOpts, grpc.Creds(creds))
serverOpts, ok := getGrpcServerOpts(c, sc)
if !ok {
return
}
lis, err := net.Listen("tcp", sc.Endpoint())
if err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint())
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
endpointsToReconnect = append(endpointsToReconnect, sc.Endpoint())
return
}
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis)
c.cfgGRPC.endpoints = append(c.cfgGRPC.endpoints, sc.Endpoint())
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("FrostFS Public API", srv, c.log)
})
c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv)
c.cfgGRPC.append(sc.Endpoint(), lis, srv)
successCount++
})
if successCount == 0 {
fatalOnErr(errors.New("could not listen to any gRPC endpoints"))
}
c.cfgGRPC.reconnectTimeout = grpcconfig.ReconnectTimeout(c.appCfg)
for _, endpoint := range endpointsToReconnect {
scheduleReconnect(endpoint, c)
}
}
func scheduleReconnect(endpoint string, c *cfg) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
t := time.NewTicker(c.cfgGRPC.reconnectTimeout)
for {
select {
case <-t.C:
if tryReconnect(endpoint, c) {
return
}
case <-c.done:
return
}
}
}()
}
func tryReconnect(endpoint string, c *cfg) bool {
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint))
serverOpts, found := getGRPCEndpointOpts(endpoint, c)
if !found {
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
return true
}
lis, err := net.Listen("tcp", endpoint)
if err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", c.cfgGRPC.reconnectTimeout))
return false
}
c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint)
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("FrostFS Public API", srv, c.log)
})
c.cfgGRPC.appendAndHandle(endpoint, lis, srv)
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
return true
}
func getGRPCEndpointOpts(endpoint string, c *cfg) (result []grpc.ServerOption, found bool) {
unlock := c.LockAppConfigShared()
defer unlock()
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
if found {
return
}
if sc.Endpoint() != endpoint {
return
}
var ok bool
result, ok = getGrpcServerOpts(c, sc)
if !ok {
return
}
found = true
})
return
}
func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) {
serverOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(maxRecvMsgSize),
grpc.ChainUnaryInterceptor(
metrics.NewUnaryServerInterceptor(),
tracing.NewUnaryServerInterceptor(),
),
grpc.ChainStreamInterceptor(
metrics.NewStreamServerInterceptor(),
tracing.NewStreamServerInterceptor(),
),
}
tlsCfg := sc.TLS()
if tlsCfg != nil {
cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile())
if err != nil {
c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err))
return nil, false
}
var cipherSuites []uint16
if !tlsCfg.UseInsecureCrypto() {
// This more or less follows the list in https://wiki.mozilla.org/Security/Server_Side_TLS
// excluding:
// 1. TLS 1.3 suites need not be specified here.
// 2. Suites that use DH key exchange are not implemented by stdlib.
cipherSuites = []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
}
}
creds := credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
CipherSuites: cipherSuites,
Certificates: []tls.Certificate{cert},
})
serverOpts = append(serverOpts, grpc.Creds(creds))
}
return serverOpts, true
}
func serveGRPC(c *cfg) {
for i := range c.cfgGRPC.servers {
c.cfgGRPC.performAndSave(func(e string, l net.Listener, s *grpc.Server) {
c.wg.Add(1)
srv := c.cfgGRPC.servers[i]
lis := c.cfgGRPC.listeners[i]
endpoint := c.cfgGRPC.endpoints[i]
go func() {
defer func() {
c.log.Info(logs.FrostFSNodeStopListeningGRPCEndpoint,
zap.Stringer("endpoint", lis.Addr()),
zap.Stringer("endpoint", l.Addr()),
)
c.wg.Done()
@ -112,15 +189,17 @@ func serveGRPC(c *cfg) {
c.log.Info(logs.FrostFSNodeStartListeningEndpoint,
zap.String("service", "gRPC"),
zap.Stringer("endpoint", lis.Addr()),
zap.Stringer("endpoint", l.Addr()),
)
if err := srv.Serve(lis); err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
fmt.Println("gRPC server error", err)
if err := s.Serve(l); err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(e)
c.log.Error(logs.FrostFSNodeGRPCServerError, zap.Error(err))
c.cfgGRPC.dropConnection(e)
scheduleReconnect(e, c)
}
}()
}
})
}
func stopGRPC(name string, s *grpc.Server, l *logger.Logger) {

View file

@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"net"
"sync/atomic"
netmapGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc"
@ -21,6 +22,7 @@ import (
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"go.uber.org/zap"
"google.golang.org/grpc"
)
// primary solution of local network state dump.
@ -162,9 +164,9 @@ func initNetmapService(ctx context.Context, c *cfg) {
),
)
for _, srv := range c.cfgGRPC.servers {
netmapGRPC.RegisterNetmapServiceServer(srv, server)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
netmapGRPC.RegisterNetmapServiceServer(s, server)
})
addNewEpochNotificationHandlers(c)
}

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc"
@ -43,6 +44,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type objectSvc struct {
@ -204,9 +206,9 @@ func initObjectService(c *cfg) {
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
server := objectTransportGRPC.New(c.shared.metricsSvc)
for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
objectGRPC.RegisterObjectServiceServer(s, server)
})
}
func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.ClientCache) {

View file

@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
@ -16,6 +17,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/persistent"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/temporary"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"google.golang.org/grpc"
)
type sessionStorage interface {
@ -57,7 +59,7 @@ func initSessionService(c *cfg) {
),
)
for _, srv := range c.cfgGRPC.servers {
sessionGRPC.RegisterSessionServiceServer(srv, server)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
sessionGRPC.RegisterSessionServiceServer(s, server)
})
}

View file

@ -3,6 +3,7 @@ package main
import (
"context"
"errors"
"net"
"time"
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
@ -15,6 +16,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type cnrSource struct {
@ -63,9 +65,9 @@ func initTreeService(c *cfg) {
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
tree.WithMetrics(c.metricsCollector.TreeService()))
for _, srv := range c.cfgGRPC.servers {
tree.RegisterTreeServiceServer(srv, c.treeService)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
tree.RegisterTreeServiceServer(s, c.treeService)
})
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
c.treeService.Start(ctx)

View file

@ -423,6 +423,11 @@ const (
FrostFSNodeStoppingGRPCServer = "stopping gRPC server..."
FrostFSNodeGRPCCannotShutdownGracefullyForcingStop = "gRPC cannot shutdown gracefully, forcing stop"
FrostFSNodeGRPCServerStoppedSuccessfully = "gRPC server stopped successfully"
FrostFSNodeGRPCServerError = "gRPC server error"
FrostFSNodeGRPCReconnecting = "reconnecting gRPC server..."
FrostFSNodeGRPCReconnectedSuccessfully = "gRPC server reconnected successfully"
FrostFSNodeGRPCServerConfigNotFound = "gRPC server config not found"
FrostFSNodeGRPCReconnectFailed = "failed to reconnect gRPC server"
FrostFSNodeWaitingForAllProcessesToStop = "waiting for all processes to stop"
FrostFSNodeStartedLocalNodesMaintenance = "started local node's maintenance"
FrostFSNodeStoppedLocalNodesMaintenance = "stopped local node's maintenance"