node: Validate RPC limiter configuration #1658
3 changed files with 58 additions and 28 deletions
|
@ -29,7 +29,6 @@ import (
|
||||||
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||||
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
||||||
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||||
rpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/rpc"
|
|
||||||
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
|
@ -721,7 +720,7 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
|
|
||||||
c.cfgNetmap = initNetmap(appCfg, netState, relayOnly)
|
c.cfgNetmap = initNetmap(appCfg, netState, relayOnly)
|
||||||
|
|
||||||
c.cfgGRPC = initCfgGRPC(appCfg)
|
c.cfgGRPC = initCfgGRPC()
|
||||||
|
|
||||||
c.cfgMorph = cfgMorph{
|
c.cfgMorph = cfgMorph{
|
||||||
proxyScriptHash: contractsconfig.Proxy(appCfg),
|
proxyScriptHash: contractsconfig.Proxy(appCfg),
|
||||||
|
@ -852,21 +851,12 @@ func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func initCfgGRPC(appCfg *config.Config) (cfg cfgGRPC) {
|
func initCfgGRPC() (cfg cfgGRPC) {
|
||||||
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
||||||
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
|
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
|
||||||
|
|
||||||
var limits []limiting.KeyLimit
|
|
||||||
for _, l := range rpcconfig.Limits(appCfg) {
|
|
||||||
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
|
|
||||||
}
|
|
||||||
|
|
||||||
limiter, err := limiting.NewSemaphoreLimiter(limits)
|
|
||||||
fatalOnErr(err)
|
|
||||||
|
|
||||||
cfg.maxChunkSize = maxChunkSize
|
cfg.maxChunkSize = maxChunkSize
|
||||||
cfg.maxAddrAmount = maxAddrAmount
|
cfg.maxAddrAmount = maxAddrAmount
|
||||||
cfg.limiter.Store(limiter)
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1405,26 +1395,11 @@ func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp {
|
||||||
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
|
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
|
||||||
}
|
}
|
||||||
|
|
||||||
components = append(components, dCmp{"rpc_limiter", c.reloadLimits})
|
components = append(components, dCmp{"rpc_limiter", func() error { return initRPCLimiter(c) }})
|
||||||
|
|
||||||
return components
|
return components
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) reloadLimits() error {
|
|
||||||
var limits []limiting.KeyLimit
|
|
||||||
for _, l := range rpcconfig.Limits(c.appCfg) {
|
|
||||||
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
|
|
||||||
}
|
|
||||||
|
|
||||||
limiter, err := limiting.NewSemaphoreLimiter(limits)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
c.cfgGRPC.limiter.Store(limiter)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cfg) reloadPools() error {
|
func (c *cfg) reloadPools() error {
|
||||||
newSize := replicatorconfig.PoolSize(c.appCfg)
|
newSize := replicatorconfig.PoolSize(c.appCfg)
|
||||||
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
|
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
|
||||||
|
|
|
@ -4,10 +4,12 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc"
|
grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc"
|
||||||
|
rpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/rpc"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
qosInternal "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
qosInternal "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
@ -231,3 +233,54 @@ func stopGRPC(ctx context.Context, name string, s *grpc.Server, l *logger.Logger
|
||||||
|
|
||||||
l.Info(ctx, logs.FrostFSNodeGRPCServerStoppedSuccessfully)
|
l.Info(ctx, logs.FrostFSNodeGRPCServerStoppedSuccessfully)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func initRPCLimiter(c *cfg) error {
|
||||||
|
var limits []limiting.KeyLimit
|
||||||
|
for _, l := range rpcconfig.Limits(c.appCfg) {
|
||||||
|
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := validateRPCLimits(c, limits); err != nil {
|
||||||
|
return fmt.Errorf("validate RPC limits: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
limiter, err := limiting.NewSemaphoreLimiter(limits)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create RPC limiter: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.cfgGRPC.limiter.Store(limiter)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateRPCLimits(c *cfg, limits []limiting.KeyLimit) error {
|
||||||
|
|||||||
|
availableMethods := getAvailableMethods(c.cfgGRPC.servers)
|
||||||
|
for _, limit := range limits {
|
||||||
|
for _, method := range limit.Keys {
|
||||||
|
if _, ok := availableMethods[method]; !ok {
|
||||||
|
return fmt.Errorf("set limit on an unknown method %q", method)
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`on unknown` -> `on an unknown`
a-savchuk
commented
Fixed Fixed
|
|||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getAvailableMethods(servers []grpcServer) map[string]struct{} {
|
||||||
|
res := make(map[string]struct{})
|
||||||
|
for _, server := range servers {
|
||||||
|
for _, method := range getMethodsForServer(server.Server) {
|
||||||
|
res[method] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func getMethodsForServer(server *grpc.Server) []string {
|
||||||
|
var res []string
|
||||||
|
for service, info := range server.GetServiceInfo() {
|
||||||
|
for _, method := range info.Methods {
|
||||||
|
res = append(res, fmt.Sprintf("/%s/%s", service, method.Name))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
|
@ -117,6 +117,8 @@ func initApp(ctx context.Context, c *cfg) {
|
||||||
initAndLog(ctx, c, "apemanager", initAPEManagerService)
|
initAndLog(ctx, c, "apemanager", initAPEManagerService)
|
||||||
initAndLog(ctx, c, "control", func(c *cfg) { initControlService(ctx, c) })
|
initAndLog(ctx, c, "control", func(c *cfg) { initControlService(ctx, c) })
|
||||||
|
|
||||||
|
initAndLog(ctx, c, "RPC limiter", func(c *cfg) { fatalOnErr(initRPCLimiter(c)) })
|
||||||
|
|
||||||
initAndLog(ctx, c, "morph notifications", func(c *cfg) { listenMorphNotifications(ctx, c) })
|
initAndLog(ctx, c, "morph notifications", func(c *cfg) { listenMorphNotifications(ctx, c) })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue
A controversial point.
What if I want to disable tree or control service? In this case, I definitely have to edit the configuration file for the limits.
Does "disable" mean setting limit to 0. What's the problem then? Can you please give more detail?
As I remember, we also agreed that we don't apply limiting for control service
Disabling tree service is a fine point.
But in this case editing configuration is probably warranted?
It is not an operation one would do frequently.