Reconnect gRPC servers #836

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:feat/grpc_init_lazy into master 2024-09-04 19:51:04 +00:00
2 changed files with 79 additions and 41 deletions

View file

@ -426,11 +426,26 @@ type dynamicConfiguration struct {
metrics *httpComponent metrics *httpComponent
} }
type appConfigGuard struct {
mtx sync.RWMutex
}
func (g *appConfigGuard) LockAppConfigShared() func() {
g.mtx.RLock()
return func() { g.mtx.RUnlock() }
}
func (g *appConfigGuard) LockAppConfigExclusive() func() {
g.mtx.Lock()
return func() { g.mtx.Unlock() }
}
type cfg struct { type cfg struct {
applicationConfiguration applicationConfiguration
internals internals
shared shared
dynamicConfiguration dynamicConfiguration
appConfigGuard
// configuration of the internal // configuration of the internal
// services // services
@ -474,8 +489,9 @@ type cfgGRPC struct {
// handlers must be protected with guard // handlers must be protected with guard
handlers []func(e string, l net.Listener, s *grpc.Server) handlers []func(e string, l net.Listener, s *grpc.Server)
maxChunkSize uint64 maxChunkSize uint64
maxAddrAmount uint64 maxAddrAmount uint64
reconnectTimeout time.Duration
} }
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) { func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
@ -1204,7 +1220,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
} }
defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY) defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
err := c.readConfig(c.appCfg) err := c.reloadAppConfig()
if err != nil { if err != nil {
c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err)) c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
return return
@ -1271,6 +1287,13 @@ func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully) c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
} }
func (c *cfg) reloadAppConfig() error {
unlock := c.LockAppConfigExclusive()
defer unlock()
return c.readConfig(c.appCfg)
}
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker { func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
var tssPrm tsourse.TombstoneSourcePrm var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc) tssPrm.SetGetService(c.cfgObject.getSvc)

View file

@ -49,6 +49,7 @@ func initGRPC(c *cfg) {
if successCount == 0 { if successCount == 0 {
fatalOnErr(errors.New("could not listen to any gRPC endpoints")) fatalOnErr(errors.New("could not listen to any gRPC endpoints"))
} }
c.cfgGRPC.reconnectTimeout = grpcconfig.ReconnectTimeout(c.appCfg)
for _, endpoint := range endpointsToReconnect { for _, endpoint := range endpointsToReconnect {
fyrchik marked this conversation as resolved Outdated

Why have you decided to use separate goroutines to handle each reconnection?

Why have you decided to use separate goroutines to handle each reconnection?

Each address tries to reconnect independently so that waiting for one address does not affect the speed of reconnecting to the others.

Each address tries to reconnect independently so that waiting for one address does not affect the speed of reconnecting to the others.
scheduleReconnect(endpoint, c) scheduleReconnect(endpoint, c)
@ -60,49 +61,13 @@ func scheduleReconnect(endpoint string, c *cfg) {
go func() { go func() {
defer c.wg.Done() defer c.wg.Done()
timeout := grpcconfig.ReconnectTimeout(c.appCfg) t := time.NewTicker(c.cfgGRPC.reconnectTimeout)
t := time.NewTicker(timeout)
for { for {
select { select {
case <-t.C: case <-t.C:
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint)) if tryReconnect(endpoint, c) {
var success, found bool
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
if sc.Endpoint() != endpoint {
return
}
found = true
serverOpts, ok := getGrpcServerOpts(c, sc)
if !ok {
return
}
lis, err := net.Listen("tcp", sc.Endpoint())
if err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint())
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
return
}
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("FrostFS Public API", srv, c.log)
})
c.cfgGRPC.appendAndHandle(sc.Endpoint(), lis, srv)
success = true
})
if !found {
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
return return
} }

Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running?

Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running?

Also, this can be executed concurrently to SIGHUP, how can we easily see no data-race occurs?

Also, this can be executed concurrently to SIGHUP, how can we easily see no data-race occurs?

Well, I used ostrich tactics in this case: as I see SIGHUP handling now is not threadsafe in general.

Well, I used ostrich tactics in this case: as I see SIGHUP handling now is not threadsafe in general.

Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running?

If there is no current endpoint after sighup, then the reconnect goroutine will return. Or did I misunderstand the question?

> Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running? If there is no current endpoint after sighup, then the reconnect goroutine will return. Or did I misunderstand the question?

as I see SIGHUP handling now is not threadsafe in general

What do you mean? Do we already have places like this (non-threadsafe)? e.g. for engine it is thread safe.

>as I see SIGHUP handling now is not threadsafe in general What do you mean? Do we already have places like this (non-threadsafe)? e.g. for engine it is thread safe.
func initApp(ctx context.Context, c *cfg) {
	c.wg.Add(1)
	go func() {
		c.signalWatcher(ctx) <-- here app starts to accept signals and it is possible that config will be updated concurrently with initAndLog calls
		c.wg.Done()
	}()

	initAndLog(c, ...)
	initAndLog(c, ...)
	initAndLog(c, ...)
	initAndLog(c, ...)
``` func initApp(ctx context.Context, c *cfg) { c.wg.Add(1) go func() { c.signalWatcher(ctx) <-- here app starts to accept signals and it is possible that config will be updated concurrently with initAndLog calls c.wg.Done() }() initAndLog(c, ...) initAndLog(c, ...) initAndLog(c, ...) initAndLog(c, ...) ```

That's true (and it is a bug), but initApp is called only during startup, there is much less probability to receive SIGHUP here
With grpc servers constantly relistening we introduce the race under normal operation.

That's true (and it is a bug), but `initApp` is called only during startup, there is much less probability to receive SIGHUP here With grpc servers constantly relistening we introduce the race under normal operation.

If it looks like being hard to add now we can do it in #855, but IMO this is a high-priority task.

If it looks like being hard to add now we can do it in https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/855, but IMO this is a high-priority task.

Ok, fixed

Ok, fixed

We are execution reloadConfig only for state control.HealthStatus_READY, in other cases we are skipping reloading. Status will be ready only when all initAndLog calls will be compleated. Also, we have special status HealthStatus_RECONFIGURING. So if we need to reload grpc, at the first step we need to stop connection and wait for completion of the all background tasks. Looks like mutex here is redundant. This functionality already implemented on top of internals.healthStatus *atomic.Int32.

We are execution `reloadConfig` only for state control.HealthStatus_READY, in other cases we are skipping reloading. Status will be ready only when all `initAndLog` calls will be compleated. Also, we have special status HealthStatus_RECONFIGURING. So if we need to reload grpc, at the first step we need to stop connection and wait for completion of the all background tasks. Looks like mutex here is redundant. This functionality already implemented on top of `internals.healthStatus *atomic.Int32`.

Thanks, my statement about sighup threadsafe is incorrect.

But the situation when node recreates gRPC server is not RECONFIGURING: node is healthy. Also gRPC reconnect should not to wait full SIGHUP.

Health status CompareAndSwap strategy allows to run sighup OR one gRPC reconnect. It is too strict.

Mutex locks only appConfig reload and allows many goroutines to read it.

Thanks, my statement about sighup threadsafe is incorrect. But the situation when node recreates gRPC server is not RECONFIGURING: node is healthy. Also gRPC reconnect should not to wait full SIGHUP. Health status `CompareAndSwap` strategy allows to run sighup OR one gRPC reconnect. It is too strict. Mutex locks only appConfig reload and allows many goroutines to read it.
if success {
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
return
}
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", timeout))
case <-c.done: case <-c.done:
return return
} }
@ -110,6 +75,56 @@ func scheduleReconnect(endpoint string, c *cfg) {
}() }()
} }
func tryReconnect(endpoint string, c *cfg) bool {
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint))
serverOpts, found := getGRPCEndpointOpts(endpoint, c)
if !found {
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
return true
}
lis, err := net.Listen("tcp", endpoint)
if err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", c.cfgGRPC.reconnectTimeout))
return false
}
c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint)
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("FrostFS Public API", srv, c.log)
})
c.cfgGRPC.appendAndHandle(endpoint, lis, srv)
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
return true
}
func getGRPCEndpointOpts(endpoint string, c *cfg) (result []grpc.ServerOption, found bool) {
unlock := c.LockAppConfigShared()
defer unlock()
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
if found {
return
}
if sc.Endpoint() != endpoint {
return
}
var ok bool
result, ok = getGrpcServerOpts(c, sc)
if !ok {
return
}
found = true
})
return
}
func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) { func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) {
serverOpts := []grpc.ServerOption{ serverOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(maxRecvMsgSize), grpc.MaxRecvMsgSize(maxRecvMsgSize),