[#1639] node: Support active RPC limiting
All checks were successful
Vulncheck / Vulncheck (push) Successful in 1m10s
Pre-commit hooks / Pre-commit (push) Successful in 1m34s
Build / Build Components (push) Successful in 2m15s
Tests and linters / Tests with -race (push) Successful in 2m32s
Tests and linters / Run gofumpt (push) Successful in 2m33s
Tests and linters / gopls check (push) Successful in 2m37s
Tests and linters / Tests (push) Successful in 2m49s
Tests and linters / Lint (push) Successful in 2m51s
Tests and linters / Staticcheck (push) Successful in 2m57s
OCI image / Build container images (push) Successful in 5m12s
All checks were successful
Vulncheck / Vulncheck (push) Successful in 1m10s
Pre-commit hooks / Pre-commit (push) Successful in 1m34s
Build / Build Components (push) Successful in 2m15s
Tests and linters / Tests with -race (push) Successful in 2m32s
Tests and linters / Run gofumpt (push) Successful in 2m33s
Tests and linters / gopls check (push) Successful in 2m37s
Tests and linters / Tests (push) Successful in 2m49s
Tests and linters / Lint (push) Successful in 2m51s
Tests and linters / Staticcheck (push) Successful in 2m57s
OCI image / Build container images (push) Successful in 5m12s
- Allow configuration of active RPC limits for method groups - Apply RPC limiting for all services except the control service Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
5590886599
commit
dae0949f6e
11 changed files with 217 additions and 5 deletions
|
@ -29,6 +29,7 @@ import (
|
|||
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
||||
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||
rpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/rpc"
|
||||
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
|
@ -69,6 +70,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||
netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -528,6 +530,8 @@ type cfgGRPC struct {
|
|||
maxChunkSize uint64
|
||||
maxAddrAmount uint64
|
||||
reconnectTimeout time.Duration
|
||||
|
||||
limiter atomic.Pointer[limiting.SemaphoreLimiter]
|
||||
}
|
||||
|
||||
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
|
||||
|
@ -717,7 +721,7 @@ func initCfg(appCfg *config.Config) *cfg {
|
|||
|
||||
c.cfgNetmap = initNetmap(appCfg, netState, relayOnly)
|
||||
|
||||
c.cfgGRPC = initCfgGRPC()
|
||||
c.cfgGRPC = initCfgGRPC(appCfg)
|
||||
|
||||
c.cfgMorph = cfgMorph{
|
||||
proxyScriptHash: contractsconfig.Proxy(appCfg),
|
||||
|
@ -848,14 +852,23 @@ func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
|
|||
}
|
||||
}
|
||||
|
||||
func initCfgGRPC() cfgGRPC {
|
||||
func initCfgGRPC(appCfg *config.Config) (cfg cfgGRPC) {
|
||||
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
||||
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
|
||||
|
||||
return cfgGRPC{
|
||||
maxChunkSize: maxChunkSize,
|
||||
maxAddrAmount: maxAddrAmount,
|
||||
var limits []limiting.KeyLimit
|
||||
for _, l := range rpcconfig.Limits(appCfg) {
|
||||
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
|
||||
}
|
||||
|
||||
limiter, err := limiting.NewSemaphoreLimiter(limits)
|
||||
fatalOnErr(err)
|
||||
|
||||
cfg.maxChunkSize = maxChunkSize
|
||||
cfg.maxAddrAmount = maxAddrAmount
|
||||
cfg.limiter.Store(limiter)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func initCfgObject(appCfg *config.Config) cfgObject {
|
||||
|
@ -1392,9 +1405,26 @@ func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp {
|
|||
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
|
||||
}
|
||||
|
||||
components = append(components, dCmp{"rpc_limiter", c.reloadLimits})
|
||||
|
||||
return components
|
||||
}
|
||||
|
||||
func (c *cfg) reloadLimits() error {
|
||||
var limits []limiting.KeyLimit
|
||||
for _, l := range rpcconfig.Limits(c.appCfg) {
|
||||
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
|
||||
}
|
||||
|
||||
limiter, err := limiting.NewSemaphoreLimiter(limits)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.cfgGRPC.limiter.Store(limiter)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cfg) reloadPools() error {
|
||||
newSize := replicatorconfig.PoolSize(c.appCfg)
|
||||
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
|
||||
|
|
43
cmd/frostfs-node/config/rpc/config.go
Normal file
43
cmd/frostfs-node/config/rpc/config.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package rpcconfig
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
)
|
||||
|
||||
const (
|
||||
subsection = "rpc"
|
||||
limitsSubsection = "limits"
|
||||
)
|
||||
|
||||
type LimitConfig struct {
|
||||
Methods []string
|
||||
MaxOps int64
|
||||
}
|
||||
|
||||
// Limits returns the "limits" config from "rpc" section.
|
||||
func Limits(c *config.Config) []LimitConfig {
|
||||
c = c.Sub(subsection).Sub(limitsSubsection)
|
||||
|
||||
var limits []LimitConfig
|
||||
|
||||
for i := uint64(0); ; i++ {
|
||||
si := strconv.FormatUint(i, 10)
|
||||
sc := c.Sub(si)
|
||||
|
||||
methods := config.StringSliceSafe(sc, "methods")
|
||||
if len(methods) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
maxOps := config.IntSafe(sc, "max_ops")
|
||||
if maxOps == 0 {
|
||||
panic("no max operations for method group")
|
||||
}
|
||||
|
||||
limits = append(limits, LimitConfig{methods, maxOps})
|
||||
}
|
||||
|
||||
return limits
|
||||
}
|
53
cmd/frostfs-node/config/rpc/config_test.go
Normal file
53
cmd/frostfs-node/config/rpc/config_test.go
Normal file
|
@ -0,0 +1,53 @@
|
|||
package rpcconfig
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRPCSection(t *testing.T) {
|
||||
t.Run("defaults", func(t *testing.T) {
|
||||
require.Empty(t, Limits(configtest.EmptyConfig()))
|
||||
})
|
||||
|
||||
t.Run("correct config", func(t *testing.T) {
|
||||
const path = "../../../../config/example/node"
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
limits := Limits(c)
|
||||
require.Len(t, limits, 2)
|
||||
|
||||
limit0 := limits[0]
|
||||
limit1 := limits[1]
|
||||
|
||||
require.ElementsMatch(t, limit0.Methods, []string{"/neo.fs.v2.object.ObjectService/PutSingle", "/neo.fs.v2.object.ObjectService/Put"})
|
||||
require.Equal(t, limit0.MaxOps, int64(1000))
|
||||
|
||||
require.ElementsMatch(t, limit1.Methods, []string{"/neo.fs.v2.object.ObjectService/Get"})
|
||||
require.Equal(t, limit1.MaxOps, int64(10000))
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
||||
t.Run("ENV", func(t *testing.T) {
|
||||
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("no max operations", func(t *testing.T) {
|
||||
const path = "testdata/node"
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
require.Panics(t, func() { _ = Limits(c) })
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
||||
t.Run("ENV", func(t *testing.T) {
|
||||
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||
})
|
||||
})
|
||||
}
|
3
cmd/frostfs-node/config/rpc/testdata/node.env
vendored
Normal file
3
cmd/frostfs-node/config/rpc/testdata/node.env
vendored
Normal file
|
@ -0,0 +1,3 @@
|
|||
FROSTFS_RPC_LIMITS_0_METHODS="/neo.fs.v2.object.ObjectService/PutSingle /neo.fs.v2.object.ObjectService/Put"
|
||||
FROSTFS_RPC_LIMITS_1_METHODS="/neo.fs.v2.object.ObjectService/Get"
|
||||
FROSTFS_RPC_LIMITS_1_MAX_OPS=10000
|
18
cmd/frostfs-node/config/rpc/testdata/node.json
vendored
Normal file
18
cmd/frostfs-node/config/rpc/testdata/node.json
vendored
Normal file
|
@ -0,0 +1,18 @@
|
|||
{
|
||||
"rpc": {
|
||||
"limits": [
|
||||
{
|
||||
"methods": [
|
||||
"/neo.fs.v2.object.ObjectService/PutSingle",
|
||||
"/neo.fs.v2.object.ObjectService/Put"
|
||||
]
|
||||
},
|
||||
{
|
||||
"methods": [
|
||||
"/neo.fs.v2.object.ObjectService/Get"
|
||||
],
|
||||
"max_ops": 10000
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
8
cmd/frostfs-node/config/rpc/testdata/node.yaml
vendored
Normal file
8
cmd/frostfs-node/config/rpc/testdata/node.yaml
vendored
Normal file
|
@ -0,0 +1,8 @@
|
|||
rpc:
|
||||
limits:
|
||||
- methods:
|
||||
- /neo.fs.v2.object.ObjectService/PutSingle
|
||||
- /neo.fs.v2.object.ObjectService/Put
|
||||
- methods:
|
||||
- /neo.fs.v2.object.ObjectService/Get
|
||||
max_ops: 10000
|
|
@ -9,9 +9,11 @@ import (
|
|||
|
||||
grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
qosInternal "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
@ -134,11 +136,13 @@ func getGrpcServerOpts(ctx context.Context, c *cfg, sc *grpcconfig.Config) ([]gr
|
|||
qos.NewUnaryServerInterceptor(),
|
||||
metrics.NewUnaryServerInterceptor(),
|
||||
tracing.NewUnaryServerInterceptor(),
|
||||
qosInternal.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }),
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
qos.NewStreamServerInterceptor(),
|
||||
metrics.NewStreamServerInterceptor(),
|
||||
tracing.NewStreamServerInterceptor(),
|
||||
qosInternal.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }),
|
||||
),
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue