[#835] grpc: Try to reconnect if endpoint listen failed

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-11-30 20:51:23 +03:00
parent bdd43f6211
commit f4877e7b42
10 changed files with 243 additions and 85 deletions

View file

@ -2,12 +2,14 @@ package main
import ( import (
"context" "context"
"net"
accountingGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting/grpc" accountingGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
accountingTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/accounting/grpc" accountingTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/accounting/grpc"
accountingService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting" accountingService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting"
accounting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting/morph" accounting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting/morph"
"google.golang.org/grpc"
) )
func initAccountingService(ctx context.Context, c *cfg) { func initAccountingService(ctx context.Context, c *cfg) {
@ -28,7 +30,7 @@ func initAccountingService(ctx context.Context, c *cfg) {
), ),
) )
for _, srv := range c.cfgGRPC.servers { c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
accountingGRPC.RegisterAccountingServiceServer(srv, server) accountingGRPC.RegisterAccountingServiceServer(s, server)
} })
} }

View file

@ -460,18 +460,80 @@ func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error {
return nil return nil
} }
type grpcServer struct {
Listener net.Listener
Server *grpc.Server
Endpoint string
}
type cfgGRPC struct { type cfgGRPC struct {
listeners []net.Listener // guard protects connections and handlers
guard sync.RWMutex
servers []*grpc.Server // servers must be protected with guard
servers []grpcServer
endpoints []string // handlers must be protected with guard
handlers []func(e string, l net.Listener, s *grpc.Server)
maxChunkSize uint64 maxChunkSize uint64
maxAddrAmount uint64 maxAddrAmount uint64
} }
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
c.guard.Lock()
defer c.guard.Unlock()
c.servers = append(c.servers, grpcServer{
Listener: l,
Server: s,
Endpoint: e,
})
}
func (c *cfgGRPC) appendAndHandle(e string, l net.Listener, s *grpc.Server) {
c.guard.Lock()
defer c.guard.Unlock()
c.servers = append(c.servers, grpcServer{
Listener: l,
Server: s,
Endpoint: e,
})
for _, h := range c.handlers {
h(e, l, s)
}
}
func (c *cfgGRPC) performAndSave(handler func(e string, l net.Listener, s *grpc.Server)) {
c.guard.Lock()
defer c.guard.Unlock()
for _, conn := range c.servers {
handler(conn.Endpoint, conn.Listener, conn.Server)
}
c.handlers = append(c.handlers, handler)
}
func (c *cfgGRPC) dropConnection(endpoint string) {
c.guard.Lock()
defer c.guard.Unlock()
pos := -1
for idx, srv := range c.servers {
if srv.Endpoint == endpoint {
pos = idx
break
}
}
if pos < 0 {
return
}
c.servers[pos].Server.Stop() // closes listener
c.servers = append(c.servers[0:pos], c.servers[pos+1:]...)
}
type cfgMorph struct { type cfgMorph struct {
client *client.Client client *client.Client
@ -1232,7 +1294,7 @@ func (c *cfg) shutdown() {
} }
c.ctxCancel() c.ctxCancel()
c.done <- struct{}{} close(c.done)
for i := range c.closers { for i := range c.closers {
c.closers[len(c.closers)-1-i].fn() c.closers[len(c.closers)-1-i].fn()
} }

View file

@ -3,6 +3,7 @@ package grpcconfig
import ( import (
"errors" "errors"
"strconv" "strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
) )
@ -109,3 +110,17 @@ func IterateEndpoints(c *config.Config, f func(*Config)) {
panic("no gRPC server configured") panic("no gRPC server configured")
} }
} }
const DefaultReconnectInterval = time.Minute
// ReconnectTimeout returns the value of "reconnect_interval" gRPC config parameter.
//
// Returns DefaultReconnectInterval if value is not defined or invalid.
func ReconnectTimeout(c *config.Config) time.Duration {
grpcConf := c.Sub("grpc")
ri := config.DurationSafe(grpcConf, "reconnect_interval")
if ri > 0 {
return ri
}
return DefaultReconnectInterval
}

View file

@ -3,6 +3,7 @@ package main
import ( import (
"bytes" "bytes"
"context" "context"
"net"
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc" containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -16,6 +17,7 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc"
) )
func initContainerService(_ context.Context, c *cfg) { func initContainerService(_ context.Context, c *cfg) {
@ -37,9 +39,9 @@ func initContainerService(_ context.Context, c *cfg) {
), ),
) )
for _, srv := range c.cfgGRPC.servers { c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
containerGRPC.RegisterContainerServiceServer(srv, server) containerGRPC.RegisterContainerServiceServer(s, server)
} })
} }
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) { func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {

View file

@ -3,7 +3,6 @@ package main
import ( import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt"
"net" "net"
"time" "time"
@ -20,8 +19,98 @@ import (
const maxRecvMsgSize = 256 << 20 const maxRecvMsgSize = 256 << 20
func initGRPC(c *cfg) { func initGRPC(c *cfg) {
var endpointsToReconnect []string
var successCount int var successCount int
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) { grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
serverOpts, ok := getGrpcServerOpts(c, sc)
if !ok {
return
}
lis, err := net.Listen("tcp", sc.Endpoint())
if err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint())
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
endpointsToReconnect = append(endpointsToReconnect, sc.Endpoint())
return
}
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("FrostFS Public API", srv, c.log)
})
c.cfgGRPC.append(sc.Endpoint(), lis, srv)
successCount++
})
if successCount == 0 {
fatalOnErr(errors.New("could not listen to any gRPC endpoints"))
}
for _, endpoint := range endpointsToReconnect {
scheduleReconnect(endpoint, c)
}
}
func scheduleReconnect(endpoint string, c *cfg) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
timeout := grpcconfig.ReconnectTimeout(c.appCfg)
t := time.NewTicker(timeout)
for {
select {
case <-t.C:
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint))
var success, found bool
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
if sc.Endpoint() != endpoint {
return
}
found = true
serverOpts, ok := getGrpcServerOpts(c, sc)
if !ok {
return
}
lis, err := net.Listen("tcp", sc.Endpoint())
if err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint())
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
return
}
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("FrostFS Public API", srv, c.log)
})
c.cfgGRPC.appendAndHandle(sc.Endpoint(), lis, srv)
success = true
})
if !found {
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
return
}
if success {
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
return
}
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", timeout))
case <-c.done:
return
}
}
}()
}
func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) {
serverOpts := []grpc.ServerOption{ serverOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(maxRecvMsgSize), grpc.MaxRecvMsgSize(maxRecvMsgSize),
grpc.ChainUnaryInterceptor( grpc.ChainUnaryInterceptor(
@ -40,7 +129,7 @@ func initGRPC(c *cfg) {
cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile()) cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile())
if err != nil { if err != nil {
c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err)) c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err))
return return nil, false
} }
var cipherSuites []uint16 var cipherSuites []uint16
@ -67,44 +156,17 @@ func initGRPC(c *cfg) {
serverOpts = append(serverOpts, grpc.Creds(creds)) serverOpts = append(serverOpts, grpc.Creds(creds))
} }
lis, err := net.Listen("tcp", sc.Endpoint()) return serverOpts, true
if err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint())
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
return
}
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis)
c.cfgGRPC.endpoints = append(c.cfgGRPC.endpoints, sc.Endpoint())
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("FrostFS Public API", srv, c.log)
})
c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv)
successCount++
})
if successCount == 0 {
fatalOnErr(errors.New("could not listen to any gRPC endpoints"))
}
} }
func serveGRPC(c *cfg) { func serveGRPC(c *cfg) {
for i := range c.cfgGRPC.servers { c.cfgGRPC.performAndSave(func(e string, l net.Listener, s *grpc.Server) {
c.wg.Add(1) c.wg.Add(1)
srv := c.cfgGRPC.servers[i]
lis := c.cfgGRPC.listeners[i]
endpoint := c.cfgGRPC.endpoints[i]
go func() { go func() {
defer func() { defer func() {
c.log.Info(logs.FrostFSNodeStopListeningGRPCEndpoint, c.log.Info(logs.FrostFSNodeStopListeningGRPCEndpoint,
zap.Stringer("endpoint", lis.Addr()), zap.Stringer("endpoint", l.Addr()),
) )
c.wg.Done() c.wg.Done()
@ -112,15 +174,17 @@ func serveGRPC(c *cfg) {
c.log.Info(logs.FrostFSNodeStartListeningEndpoint, c.log.Info(logs.FrostFSNodeStartListeningEndpoint,
zap.String("service", "gRPC"), zap.String("service", "gRPC"),
zap.Stringer("endpoint", lis.Addr()), zap.Stringer("endpoint", l.Addr()),
) )
if err := srv.Serve(lis); err != nil { if err := s.Serve(l); err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint) c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(e)
fmt.Println("gRPC server error", err) c.log.Error(logs.FrostFSNodeGRPCServerError, zap.Error(err))
c.cfgGRPC.dropConnection(e)
scheduleReconnect(e, c)
} }
}() }()
} })
} }
func stopGRPC(name string, s *grpc.Server, l *logger.Logger) { func stopGRPC(name string, s *grpc.Server, l *logger.Logger) {

View file

@ -5,6 +5,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net"
"sync/atomic" "sync/atomic"
netmapGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc" netmapGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc"
@ -21,6 +22,7 @@ import (
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc"
) )
// primary solution of local network state dump. // primary solution of local network state dump.
@ -162,9 +164,9 @@ func initNetmapService(ctx context.Context, c *cfg) {
), ),
) )
for _, srv := range c.cfgGRPC.servers { c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
netmapGRPC.RegisterNetmapServiceServer(srv, server) netmapGRPC.RegisterNetmapServiceServer(s, server)
} })
addNewEpochNotificationHandlers(c) addNewEpochNotificationHandlers(c)
} }

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc" objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc"
@ -43,6 +44,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc"
) )
type objectSvc struct { type objectSvc struct {
@ -204,9 +206,9 @@ func initObjectService(c *cfg) {
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg)) signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
server := objectTransportGRPC.New(c.shared.metricsSvc) server := objectTransportGRPC.New(c.shared.metricsSvc)
for _, srv := range c.cfgGRPC.servers { c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
objectGRPC.RegisterObjectServiceServer(srv, server) objectGRPC.RegisterObjectServiceServer(s, server)
} })
} }
func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.ClientCache) { func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.ClientCache) {

View file

@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"net"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
@ -16,6 +17,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/persistent" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/persistent"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/temporary" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/temporary"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"google.golang.org/grpc"
) )
type sessionStorage interface { type sessionStorage interface {
@ -57,7 +59,7 @@ func initSessionService(c *cfg) {
), ),
) )
for _, srv := range c.cfgGRPC.servers { c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
sessionGRPC.RegisterSessionServiceServer(srv, server) sessionGRPC.RegisterSessionServiceServer(s, server)
} })
} }

View file

@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"errors" "errors"
"net"
"time" "time"
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree" treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
@ -15,6 +16,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc"
) )
type cnrSource struct { type cnrSource struct {
@ -63,9 +65,9 @@ func initTreeService(c *cfg) {
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()), tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
tree.WithMetrics(c.metricsCollector.TreeService())) tree.WithMetrics(c.metricsCollector.TreeService()))
for _, srv := range c.cfgGRPC.servers { c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
tree.RegisterTreeServiceServer(srv, c.treeService) tree.RegisterTreeServiceServer(s, c.treeService)
} })
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
c.treeService.Start(ctx) c.treeService.Start(ctx)

View file

@ -423,6 +423,11 @@ const (
FrostFSNodeStoppingGRPCServer = "stopping gRPC server..." FrostFSNodeStoppingGRPCServer = "stopping gRPC server..."
FrostFSNodeGRPCCannotShutdownGracefullyForcingStop = "gRPC cannot shutdown gracefully, forcing stop" FrostFSNodeGRPCCannotShutdownGracefullyForcingStop = "gRPC cannot shutdown gracefully, forcing stop"
FrostFSNodeGRPCServerStoppedSuccessfully = "gRPC server stopped successfully" FrostFSNodeGRPCServerStoppedSuccessfully = "gRPC server stopped successfully"
FrostFSNodeGRPCServerError = "gRPC server error"
FrostFSNodeGRPCReconnecting = "reconnecting gRPC server..."
FrostFSNodeGRPCReconnectedSuccessfully = "gRPC server reconnected successfully"
FrostFSNodeGRPCServerConfigNotFound = "gRPC server config not found"
FrostFSNodeGRPCReconnectFailed = "failed to reconnect gRPC server"
FrostFSNodeWaitingForAllProcessesToStop = "waiting for all processes to stop" FrostFSNodeWaitingForAllProcessesToStop = "waiting for all processes to stop"
FrostFSNodeStartedLocalNodesMaintenance = "started local node's maintenance" FrostFSNodeStartedLocalNodesMaintenance = "started local node's maintenance"
FrostFSNodeStoppedLocalNodesMaintenance = "stopped local node's maintenance" FrostFSNodeStoppedLocalNodesMaintenance = "stopped local node's maintenance"