Reconnect gRPC servers #836
2 changed files with 79 additions and 41 deletions
|
@ -426,11 +426,26 @@ type dynamicConfiguration struct {
|
|||
metrics *httpComponent
|
||||
}
|
||||
|
||||
type appConfigGuard struct {
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
func (g *appConfigGuard) LockAppConfigShared() func() {
|
||||
g.mtx.RLock()
|
||||
return func() { g.mtx.RUnlock() }
|
||||
}
|
||||
|
||||
func (g *appConfigGuard) LockAppConfigExclusive() func() {
|
||||
g.mtx.Lock()
|
||||
return func() { g.mtx.Unlock() }
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
applicationConfiguration
|
||||
internals
|
||||
shared
|
||||
dynamicConfiguration
|
||||
appConfigGuard
|
||||
|
||||
// configuration of the internal
|
||||
// services
|
||||
|
@ -474,8 +489,9 @@ type cfgGRPC struct {
|
|||
// handlers must be protected with guard
|
||||
handlers []func(e string, l net.Listener, s *grpc.Server)
|
||||
|
||||
maxChunkSize uint64
|
||||
maxAddrAmount uint64
|
||||
maxChunkSize uint64
|
||||
maxAddrAmount uint64
|
||||
reconnectTimeout time.Duration
|
||||
}
|
||||
|
||||
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
|
||||
|
@ -1204,7 +1220,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
}
|
||||
defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
|
||||
|
||||
err := c.readConfig(c.appCfg)
|
||||
err := c.reloadAppConfig()
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
|
||||
return
|
||||
|
@ -1271,6 +1287,13 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
||||
}
|
||||
|
||||
func (c *cfg) reloadAppConfig() error {
|
||||
unlock := c.LockAppConfigExclusive()
|
||||
defer unlock()
|
||||
|
||||
return c.readConfig(c.appCfg)
|
||||
}
|
||||
|
||||
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
||||
var tssPrm tsourse.TombstoneSourcePrm
|
||||
tssPrm.SetGetService(c.cfgObject.getSvc)
|
||||
|
|
|
@ -49,6 +49,7 @@ func initGRPC(c *cfg) {
|
|||
if successCount == 0 {
|
||||
fatalOnErr(errors.New("could not listen to any gRPC endpoints"))
|
||||
}
|
||||
c.cfgGRPC.reconnectTimeout = grpcconfig.ReconnectTimeout(c.appCfg)
|
||||
|
||||
for _, endpoint := range endpointsToReconnect {
|
||||
scheduleReconnect(endpoint, c)
|
||||
|
@ -60,49 +61,13 @@ func scheduleReconnect(endpoint string, c *cfg) {
|
|||
go func() {
|
||||
defer c.wg.Done()
|
||||
|
||||
timeout := grpcconfig.ReconnectTimeout(c.appCfg)
|
||||
t := time.NewTicker(timeout)
|
||||
t := time.NewTicker(c.cfgGRPC.reconnectTimeout)
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint))
|
||||
var success, found bool
|
||||
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
|
||||
if sc.Endpoint() != endpoint {
|
||||
return
|
||||
}
|
||||
found = true
|
||||
serverOpts, ok := getGrpcServerOpts(c, sc)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
lis, err := net.Listen("tcp", sc.Endpoint())
|
||||
if err != nil {
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint())
|
||||
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
|
||||
return
|
||||
}
|
||||
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
|
||||
|
||||
srv := grpc.NewServer(serverOpts...)
|
||||
|
||||
c.onShutdown(func() {
|
||||
stopGRPC("FrostFS Public API", srv, c.log)
|
||||
})
|
||||
|
||||
c.cfgGRPC.appendAndHandle(sc.Endpoint(), lis, srv)
|
||||
success = true
|
||||
})
|
||||
if !found {
|
||||
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
|
||||
if tryReconnect(endpoint, c) {
|
||||
return
|
||||
}
|
||||
|
||||
if success {
|
||||
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
|
||||
return
|
||||
}
|
||||
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", timeout))
|
||||
case <-c.done:
|
||||
return
|
||||
}
|
||||
|
@ -110,6 +75,56 @@ func scheduleReconnect(endpoint string, c *cfg) {
|
|||
}()
|
||||
}
|
||||
|
||||
func tryReconnect(endpoint string, c *cfg) bool {
|
||||
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint))
|
||||
|
||||
serverOpts, found := getGRPCEndpointOpts(endpoint, c)
|
||||
if !found {
|
||||
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
|
||||
return true
|
||||
}
|
||||
|
||||
lis, err := net.Listen("tcp", endpoint)
|
||||
if err != nil {
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
|
||||
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
|
||||
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", c.cfgGRPC.reconnectTimeout))
|
||||
return false
|
||||
}
|
||||
c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint)
|
||||
|
||||
srv := grpc.NewServer(serverOpts...)
|
||||
|
||||
c.onShutdown(func() {
|
||||
stopGRPC("FrostFS Public API", srv, c.log)
|
||||
})
|
||||
|
||||
c.cfgGRPC.appendAndHandle(endpoint, lis, srv)
|
||||
|
||||
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
|
||||
return true
|
||||
}
|
||||
|
||||
func getGRPCEndpointOpts(endpoint string, c *cfg) (result []grpc.ServerOption, found bool) {
|
||||
unlock := c.LockAppConfigShared()
|
||||
defer unlock()
|
||||
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
|
||||
if found {
|
||||
return
|
||||
}
|
||||
if sc.Endpoint() != endpoint {
|
||||
return
|
||||
}
|
||||
var ok bool
|
||||
result, ok = getGrpcServerOpts(c, sc)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
found = true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) {
|
||||
serverOpts := []grpc.ServerOption{
|
||||
grpc.MaxRecvMsgSize(maxRecvMsgSize),
|
||||
|
|
Loading…
Reference in a new issue