Reconnect gRPC servers #836
|
@ -426,11 +426,26 @@ type dynamicConfiguration struct {
|
|||
metrics *httpComponent
|
||||
}
|
||||
|
||||
type appConfigGuard struct {
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
func (g *appConfigGuard) LockAppConfigShared() func() {
|
||||
g.mtx.RLock()
|
||||
return func() { g.mtx.RUnlock() }
|
||||
}
|
||||
|
||||
func (g *appConfigGuard) LockAppConfigExclusive() func() {
|
||||
g.mtx.Lock()
|
||||
return func() { g.mtx.Unlock() }
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
applicationConfiguration
|
||||
internals
|
||||
shared
|
||||
dynamicConfiguration
|
||||
appConfigGuard
|
||||
|
||||
// configuration of the internal
|
||||
// services
|
||||
|
@ -476,6 +491,7 @@ type cfgGRPC struct {
|
|||
|
||||
maxChunkSize uint64
|
||||
maxAddrAmount uint64
|
||||
reconnectTimeout time.Duration
|
||||
}
|
||||
|
||||
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
|
||||
|
@ -1204,7 +1220,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
}
|
||||
defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
|
||||
|
||||
err := c.readConfig(c.appCfg)
|
||||
err := c.reloadAppConfig()
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
|
||||
return
|
||||
|
@ -1271,6 +1287,13 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
||||
}
|
||||
|
||||
func (c *cfg) reloadAppConfig() error {
|
||||
unlock := c.LockAppConfigExclusive()
|
||||
defer unlock()
|
||||
|
||||
return c.readConfig(c.appCfg)
|
||||
}
|
||||
|
||||
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
||||
var tssPrm tsourse.TombstoneSourcePrm
|
||||
tssPrm.SetGetService(c.cfgObject.getSvc)
|
||||
|
|
|
@ -49,6 +49,7 @@ func initGRPC(c *cfg) {
|
|||
if successCount == 0 {
|
||||
fatalOnErr(errors.New("could not listen to any gRPC endpoints"))
|
||||
}
|
||||
c.cfgGRPC.reconnectTimeout = grpcconfig.ReconnectTimeout(c.appCfg)
|
||||
|
||||
for _, endpoint := range endpointsToReconnect {
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
scheduleReconnect(endpoint, c)
|
||||
|
@ -60,29 +61,37 @@ func scheduleReconnect(endpoint string, c *cfg) {
|
|||
go func() {
|
||||
defer c.wg.Done()
|
||||
|
||||
timeout := grpcconfig.ReconnectTimeout(c.appCfg)
|
||||
t := time.NewTicker(timeout)
|
||||
t := time.NewTicker(c.cfgGRPC.reconnectTimeout)
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
if tryReconnect(endpoint, c) {
|
||||
return
|
||||
}
|
||||
fyrchik
commented
Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running? Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running?
fyrchik
commented
Also, this can be executed concurrently to SIGHUP, how can we easily see no data-race occurs? Also, this can be executed concurrently to SIGHUP, how can we easily see no data-race occurs?
dstepanov-yadro
commented
Well, I used ostrich tactics in this case: as I see SIGHUP handling now is not threadsafe in general. Well, I used ostrich tactics in this case: as I see SIGHUP handling now is not threadsafe in general.
dstepanov-yadro
commented
If there is no current endpoint after sighup, then the reconnect goroutine will return. Or did I misunderstand the question? > Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running?
If there is no current endpoint after sighup, then the reconnect goroutine will return. Or did I misunderstand the question?
fyrchik
commented
What do you mean? Do we already have places like this (non-threadsafe)? e.g. for engine it is thread safe. >as I see SIGHUP handling now is not threadsafe in general
What do you mean? Do we already have places like this (non-threadsafe)? e.g. for engine it is thread safe.
dstepanov-yadro
commented
```
func initApp(ctx context.Context, c *cfg) {
c.wg.Add(1)
go func() {
c.signalWatcher(ctx) <-- here app starts to accept signals and it is possible that config will be updated concurrently with initAndLog calls
c.wg.Done()
}()
initAndLog(c, ...)
initAndLog(c, ...)
initAndLog(c, ...)
initAndLog(c, ...)
```
fyrchik
commented
That's true (and it is a bug), but That's true (and it is a bug), but `initApp` is called only during startup, there is much less probability to receive SIGHUP here
With grpc servers constantly relistening we introduce the race under normal operation.
fyrchik
commented
If it looks like being hard to add now we can do it in #855, but IMO this is a high-priority task. If it looks like being hard to add now we can do it in https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/855, but IMO this is a high-priority task.
dstepanov-yadro
commented
Ok, fixed Ok, fixed
acid-ant
commented
We are execution We are execution `reloadConfig` only for state control.HealthStatus_READY, in other cases we are skipping reloading. Status will be ready only when all `initAndLog` calls will be compleated. Also, we have special status HealthStatus_RECONFIGURING. So if we need to reload grpc, at the first step we need to stop connection and wait for completion of the all background tasks. Looks like mutex here is redundant. This functionality already implemented on top of `internals.healthStatus *atomic.Int32`.
dstepanov-yadro
commented
Thanks, my statement about sighup threadsafe is incorrect. But the situation when node recreates gRPC server is not RECONFIGURING: node is healthy. Also gRPC reconnect should not to wait full SIGHUP. Health status Mutex locks only appConfig reload and allows many goroutines to read it. Thanks, my statement about sighup threadsafe is incorrect.
But the situation when node recreates gRPC server is not RECONFIGURING: node is healthy. Also gRPC reconnect should not to wait full SIGHUP.
Health status `CompareAndSwap` strategy allows to run sighup OR one gRPC reconnect. It is too strict.
Mutex locks only appConfig reload and allows many goroutines to read it.
|
||||
case <-c.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func tryReconnect(endpoint string, c *cfg) bool {
|
||||
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint))
|
||||
var success, found bool
|
||||
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
|
||||
if sc.Endpoint() != endpoint {
|
||||
return
|
||||
|
||||
serverOpts, found := getGRPCEndpointOpts(endpoint, c)
|
||||
if !found {
|
||||
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
|
||||
return true
|
||||
}
|
||||
found = true
|
||||
serverOpts, ok := getGrpcServerOpts(c, sc)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
lis, err := net.Listen("tcp", sc.Endpoint())
|
||||
|
||||
lis, err := net.Listen("tcp", endpoint)
|
||||
if err != nil {
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint())
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
|
||||
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
|
||||
return
|
||||
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", c.cfgGRPC.reconnectTimeout))
|
||||
return false
|
||||
}
|
||||
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
|
||||
c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint)
|
||||
|
||||
srv := grpc.NewServer(serverOpts...)
|
||||
|
||||
|
@ -90,24 +99,30 @@ func scheduleReconnect(endpoint string, c *cfg) {
|
|||
stopGRPC("FrostFS Public API", srv, c.log)
|
||||
})
|
||||
|
||||
c.cfgGRPC.appendAndHandle(sc.Endpoint(), lis, srv)
|
||||
success = true
|
||||
})
|
||||
if !found {
|
||||
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
|
||||
return
|
||||
}
|
||||
c.cfgGRPC.appendAndHandle(endpoint, lis, srv)
|
||||
|
||||
if success {
|
||||
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
|
||||
return true
|
||||
}
|
||||
|
||||
func getGRPCEndpointOpts(endpoint string, c *cfg) (result []grpc.ServerOption, found bool) {
|
||||
unlock := c.LockAppConfigShared()
|
||||
defer unlock()
|
||||
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
|
||||
if found {
|
||||
return
|
||||
}
|
||||
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", timeout))
|
||||
case <-c.done:
|
||||
if sc.Endpoint() != endpoint {
|
||||
return
|
||||
}
|
||||
var ok bool
|
||||
result, ok = getGrpcServerOpts(c, sc)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}()
|
||||
found = true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) {
|
||||
|
|
Why have you decided to use separate goroutines to handle each reconnection?
Each address tries to reconnect independently so that waiting for one address does not affect the speed of reconnecting to the others.