[#1658] node: Simplify RPC limiter initialization

- Move all initialization logic to one place
- Initialize the limiter after all RPC services are registered to be able
  to validate that configured limits match the methods registered earlier

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
Aleksey Savchuk 2025-02-28 15:07:24 +03:00
parent dae0949f6e
commit 92ab58984b
Signed by: a-savchuk
GPG key ID: 70C0A7FF6F9C4639
3 changed files with 22 additions and 28 deletions

View file

@ -29,7 +29,6 @@ import (
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object" objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator" replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
rpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/rpc"
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing" tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
@ -721,7 +720,7 @@ func initCfg(appCfg *config.Config) *cfg {
c.cfgNetmap = initNetmap(appCfg, netState, relayOnly) c.cfgNetmap = initNetmap(appCfg, netState, relayOnly)
c.cfgGRPC = initCfgGRPC(appCfg) c.cfgGRPC = initCfgGRPC()
c.cfgMorph = cfgMorph{ c.cfgMorph = cfgMorph{
proxyScriptHash: contractsconfig.Proxy(appCfg), proxyScriptHash: contractsconfig.Proxy(appCfg),
@ -852,21 +851,12 @@ func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
} }
} }
func initCfgGRPC(appCfg *config.Config) (cfg cfgGRPC) { func initCfgGRPC() (cfg cfgGRPC) {
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
var limits []limiting.KeyLimit
for _, l := range rpcconfig.Limits(appCfg) {
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
}
limiter, err := limiting.NewSemaphoreLimiter(limits)
fatalOnErr(err)
cfg.maxChunkSize = maxChunkSize cfg.maxChunkSize = maxChunkSize
cfg.maxAddrAmount = maxAddrAmount cfg.maxAddrAmount = maxAddrAmount
cfg.limiter.Store(limiter)
return return
} }
@ -1405,26 +1395,11 @@ func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp {
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }}) components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
} }
components = append(components, dCmp{"rpc_limiter", c.reloadLimits}) components = append(components, dCmp{"rpc_limiter", func() error { return initRPCLimiter(c) }})
return components return components
} }
func (c *cfg) reloadLimits() error {
var limits []limiting.KeyLimit
for _, l := range rpcconfig.Limits(c.appCfg) {
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
}
limiter, err := limiting.NewSemaphoreLimiter(limits)
if err != nil {
return err
}
c.cfgGRPC.limiter.Store(limiter)
return nil
}
func (c *cfg) reloadPools() error { func (c *cfg) reloadPools() error {
newSize := replicatorconfig.PoolSize(c.appCfg) newSize := replicatorconfig.PoolSize(c.appCfg)
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size") c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")

View file

@ -4,10 +4,12 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt"
"net" "net"
"time" "time"
grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc" grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc"
rpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/rpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
qosInternal "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" qosInternal "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -231,3 +233,18 @@ func stopGRPC(ctx context.Context, name string, s *grpc.Server, l *logger.Logger
l.Info(ctx, logs.FrostFSNodeGRPCServerStoppedSuccessfully) l.Info(ctx, logs.FrostFSNodeGRPCServerStoppedSuccessfully)
} }
func initRPCLimiter(c *cfg) error {
var limits []limiting.KeyLimit
for _, l := range rpcconfig.Limits(c.appCfg) {
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
}
limiter, err := limiting.NewSemaphoreLimiter(limits)
if err != nil {
return fmt.Errorf("create RPC limiter: %w", err)
}
c.cfgGRPC.limiter.Store(limiter)
return nil
}

View file

@ -117,6 +117,8 @@ func initApp(ctx context.Context, c *cfg) {
initAndLog(ctx, c, "apemanager", initAPEManagerService) initAndLog(ctx, c, "apemanager", initAPEManagerService)
initAndLog(ctx, c, "control", func(c *cfg) { initControlService(ctx, c) }) initAndLog(ctx, c, "control", func(c *cfg) { initControlService(ctx, c) })
initAndLog(ctx, c, "RPC limiter", func(c *cfg) { fatalOnErr(initRPCLimiter(c)) })
initAndLog(ctx, c, "morph notifications", func(c *cfg) { listenMorphNotifications(ctx, c) }) initAndLog(ctx, c, "morph notifications", func(c *cfg) { listenMorphNotifications(ctx, c) })
} }