[#835] node: Fix appCfg concurrent access

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-12-11 18:32:02 +03:00
parent f4877e7b42
commit 61da7dca24
2 changed files with 79 additions and 41 deletions

View file

@ -426,11 +426,26 @@ type dynamicConfiguration struct {
metrics *httpComponent
}
type appConfigGuard struct {
mtx sync.RWMutex
}
func (g *appConfigGuard) LockAppConfigShared() func() {
g.mtx.RLock()
return func() { g.mtx.RUnlock() }
}
func (g *appConfigGuard) LockAppConfigExclusive() func() {
g.mtx.Lock()
return func() { g.mtx.Unlock() }
}
type cfg struct {
applicationConfiguration
internals
shared
dynamicConfiguration
appConfigGuard
// configuration of the internal
// services
@ -474,8 +489,9 @@ type cfgGRPC struct {
// handlers must be protected with guard
handlers []func(e string, l net.Listener, s *grpc.Server)
maxChunkSize uint64
maxAddrAmount uint64
maxChunkSize uint64
maxAddrAmount uint64
reconnectTimeout time.Duration
}
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
@ -1204,7 +1220,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
}
defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
err := c.readConfig(c.appCfg)
err := c.reloadAppConfig()
if err != nil {
c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
return
@ -1271,6 +1287,13 @@ func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
}
func (c *cfg) reloadAppConfig() error {
unlock := c.LockAppConfigExclusive()
defer unlock()
return c.readConfig(c.appCfg)
}
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)

View file

@ -49,6 +49,7 @@ func initGRPC(c *cfg) {
if successCount == 0 {
fatalOnErr(errors.New("could not listen to any gRPC endpoints"))
}
c.cfgGRPC.reconnectTimeout = grpcconfig.ReconnectTimeout(c.appCfg)
for _, endpoint := range endpointsToReconnect {
scheduleReconnect(endpoint, c)
@ -60,49 +61,13 @@ func scheduleReconnect(endpoint string, c *cfg) {
go func() {
defer c.wg.Done()
timeout := grpcconfig.ReconnectTimeout(c.appCfg)
t := time.NewTicker(timeout)
t := time.NewTicker(c.cfgGRPC.reconnectTimeout)
for {
select {
case <-t.C:
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint))
var success, found bool
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
if sc.Endpoint() != endpoint {
return
}
found = true
serverOpts, ok := getGrpcServerOpts(c, sc)
if !ok {
return
}
lis, err := net.Listen("tcp", sc.Endpoint())
if err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint())
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
return
}
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("FrostFS Public API", srv, c.log)
})
c.cfgGRPC.appendAndHandle(sc.Endpoint(), lis, srv)
success = true
})
if !found {
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
if tryReconnect(endpoint, c) {
return
}
if success {
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
return
}
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", timeout))
case <-c.done:
return
}
@ -110,6 +75,56 @@ func scheduleReconnect(endpoint string, c *cfg) {
}()
}
func tryReconnect(endpoint string, c *cfg) bool {
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint))
serverOpts, found := getGRPCEndpointOpts(endpoint, c)
if !found {
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
return true
}
lis, err := net.Listen("tcp", endpoint)
if err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", c.cfgGRPC.reconnectTimeout))
return false
}
c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint)
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("FrostFS Public API", srv, c.log)
})
c.cfgGRPC.appendAndHandle(endpoint, lis, srv)
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
return true
}
func getGRPCEndpointOpts(endpoint string, c *cfg) (result []grpc.ServerOption, found bool) {
unlock := c.LockAppConfigShared()
defer unlock()
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
if found {
return
}
if sc.Endpoint() != endpoint {
return
}
var ok bool
result, ok = getGrpcServerOpts(c, sc)
if !ok {
return
}
found = true
})
return
}
func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) {
serverOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(maxRecvMsgSize),