From 61da7dca24162448689c71a14a116c3d7b401b7b Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 11 Dec 2023 18:32:02 +0300 Subject: [PATCH] [#835] node: Fix appCfg concurrent access Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 29 ++++++++++-- cmd/frostfs-node/grpc.go | 91 ++++++++++++++++++++++---------------- 2 files changed, 79 insertions(+), 41 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index e887c2e6c..e6c30376f 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -426,11 +426,26 @@ type dynamicConfiguration struct { metrics *httpComponent } +type appConfigGuard struct { + mtx sync.RWMutex +} + +func (g *appConfigGuard) LockAppConfigShared() func() { + g.mtx.RLock() + return func() { g.mtx.RUnlock() } +} + +func (g *appConfigGuard) LockAppConfigExclusive() func() { + g.mtx.Lock() + return func() { g.mtx.Unlock() } +} + type cfg struct { applicationConfiguration internals shared dynamicConfiguration + appConfigGuard // configuration of the internal // services @@ -474,8 +489,9 @@ type cfgGRPC struct { // handlers must be protected with guard handlers []func(e string, l net.Listener, s *grpc.Server) - maxChunkSize uint64 - maxAddrAmount uint64 + maxChunkSize uint64 + maxAddrAmount uint64 + reconnectTimeout time.Duration } func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) { @@ -1204,7 +1220,7 @@ func (c *cfg) reloadConfig(ctx context.Context) { } defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY) - err := c.readConfig(c.appCfg) + err := c.reloadAppConfig() if err != nil { c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err)) return @@ -1271,6 +1287,13 @@ func (c *cfg) reloadConfig(ctx context.Context) { c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully) } +func (c *cfg) reloadAppConfig() error { + unlock := c.LockAppConfigExclusive() + defer unlock() + + return c.readConfig(c.appCfg) +} + func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker { var tssPrm tsourse.TombstoneSourcePrm tssPrm.SetGetService(c.cfgObject.getSvc) diff --git a/cmd/frostfs-node/grpc.go b/cmd/frostfs-node/grpc.go index cc78440bf..3a38b2cca 100644 --- a/cmd/frostfs-node/grpc.go +++ b/cmd/frostfs-node/grpc.go @@ -49,6 +49,7 @@ func initGRPC(c *cfg) { if successCount == 0 { fatalOnErr(errors.New("could not listen to any gRPC endpoints")) } + c.cfgGRPC.reconnectTimeout = grpcconfig.ReconnectTimeout(c.appCfg) for _, endpoint := range endpointsToReconnect { scheduleReconnect(endpoint, c) @@ -60,49 +61,13 @@ func scheduleReconnect(endpoint string, c *cfg) { go func() { defer c.wg.Done() - timeout := grpcconfig.ReconnectTimeout(c.appCfg) - t := time.NewTicker(timeout) + t := time.NewTicker(c.cfgGRPC.reconnectTimeout) for { select { case <-t.C: - c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint)) - var success, found bool - grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) { - if sc.Endpoint() != endpoint { - return - } - found = true - serverOpts, ok := getGrpcServerOpts(c, sc) - if !ok { - return - } - lis, err := net.Listen("tcp", sc.Endpoint()) - if err != nil { - c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint()) - c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err)) - return - } - c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint()) - - srv := grpc.NewServer(serverOpts...) - - c.onShutdown(func() { - stopGRPC("FrostFS Public API", srv, c.log) - }) - - c.cfgGRPC.appendAndHandle(sc.Endpoint(), lis, srv) - success = true - }) - if !found { - c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint)) + if tryReconnect(endpoint, c) { return } - - if success { - c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint)) - return - } - c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", timeout)) case <-c.done: return } @@ -110,6 +75,56 @@ func scheduleReconnect(endpoint string, c *cfg) { }() } +func tryReconnect(endpoint string, c *cfg) bool { + c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint)) + + serverOpts, found := getGRPCEndpointOpts(endpoint, c) + if !found { + c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint)) + return true + } + + lis, err := net.Listen("tcp", endpoint) + if err != nil { + c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint) + c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err)) + c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", c.cfgGRPC.reconnectTimeout)) + return false + } + c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint) + + srv := grpc.NewServer(serverOpts...) + + c.onShutdown(func() { + stopGRPC("FrostFS Public API", srv, c.log) + }) + + c.cfgGRPC.appendAndHandle(endpoint, lis, srv) + + c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint)) + return true +} + +func getGRPCEndpointOpts(endpoint string, c *cfg) (result []grpc.ServerOption, found bool) { + unlock := c.LockAppConfigShared() + defer unlock() + grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) { + if found { + return + } + if sc.Endpoint() != endpoint { + return + } + var ok bool + result, ok = getGrpcServerOpts(c, sc) + if !ok { + return + } + found = true + }) + return +} + func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) { serverOpts := []grpc.ServerOption{ grpc.MaxRecvMsgSize(maxRecvMsgSize), -- 2.45.2