Support active RPC limiting #1639
|
@ -29,6 +29,7 @@ import (
|
|||
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
||||
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||
rpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/rpc"
|
||||
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
|
@ -69,6 +70,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||
netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -528,6 +530,8 @@ type cfgGRPC struct {
|
|||
maxChunkSize uint64
|
||||
maxAddrAmount uint64
|
||||
reconnectTimeout time.Duration
|
||||
|
||||
limiter atomic.Pointer[limiting.SemaphoreLimiter]
|
||||
}
|
||||
|
||||
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
|
||||
|
@ -664,10 +668,6 @@ type cfgAccessPolicyEngine struct {
|
|||
}
|
||||
|
||||
type cfgObjectRoutines struct {
|
||||
putRemote *ants.Pool
|
||||
|
||||
putLocal *ants.Pool
|
||||
|
||||
replication *ants.Pool
|
||||
}
|
||||
|
||||
|
@ -721,7 +721,7 @@ func initCfg(appCfg *config.Config) *cfg {
|
|||
|
||||
c.cfgNetmap = initNetmap(appCfg, netState, relayOnly)
|
||||
|
||||
c.cfgGRPC = initCfgGRPC()
|
||||
c.cfgGRPC = initCfgGRPC(appCfg)
|
||||
|
||||
c.cfgMorph = cfgMorph{
|
||||
proxyScriptHash: contractsconfig.Proxy(appCfg),
|
||||
|
@ -852,14 +852,23 @@ func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
|
|||
}
|
||||
}
|
||||
|
||||
func initCfgGRPC() cfgGRPC {
|
||||
func initCfgGRPC(appCfg *config.Config) (cfg cfgGRPC) {
|
||||
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
||||
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
|
||||
|
||||
return cfgGRPC{
|
||||
maxChunkSize: maxChunkSize,
|
||||
maxAddrAmount: maxAddrAmount,
|
||||
var limits []limiting.KeyLimit
|
||||
for _, l := range rpcconfig.Limits(appCfg) {
|
||||
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
|
||||
}
|
||||
|
||||
limiter, err := limiting.NewSemaphoreLimiter(limits)
|
||||
fatalOnErr(err)
|
||||
|
||||
cfg.maxChunkSize = maxChunkSize
|
||||
cfg.maxAddrAmount = maxAddrAmount
|
||||
cfg.limiter.Store(limiter)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func initCfgObject(appCfg *config.Config) cfgObject {
|
||||
|
@ -1166,21 +1175,7 @@ func initAccessPolicyEngine(ctx context.Context, c *cfg) {
|
|||
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
||||
var err error
|
||||
|
||||
optNonBlocking := ants.WithNonblocking(true)
|
||||
|
||||
putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote()
|
||||
pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking)
|
||||
fatalOnErr(err)
|
||||
|
||||
putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal()
|
||||
pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking)
|
||||
fatalOnErr(err)
|
||||
|
||||
replicatorPoolSize := replicatorconfig.PoolSize(cfg)
|
||||
if replicatorPoolSize <= 0 {
|
||||
replicatorPoolSize = putRemoteCapacity
|
||||
}
|
||||
|
||||
pool.replication, err = ants.NewPool(replicatorPoolSize)
|
||||
fatalOnErr(err)
|
||||
|
||||
|
@ -1410,17 +1405,28 @@ func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp {
|
|||
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
|
||||
}
|
||||
|
||||
components = append(components, dCmp{"rpc_limiter", c.reloadLimits})
|
||||
|
||||
return components
|
||||
}
|
||||
|
||||
func (c *cfg) reloadLimits() error {
|
||||
var limits []limiting.KeyLimit
|
||||
for _, l := range rpcconfig.Limits(c.appCfg) {
|
||||
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
|
||||
|
||||
}
|
||||
|
||||
limiter, err := limiting.NewSemaphoreLimiter(limits)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.cfgGRPC.limiter.Store(limiter)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cfg) reloadPools() error {
|
||||
newSize := objectconfig.Put(c.appCfg).PoolSizeLocal()
|
||||
c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size")
|
||||
|
||||
newSize = objectconfig.Put(c.appCfg).PoolSizeRemote()
|
||||
c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size")
|
||||
|
||||
newSize = replicatorconfig.PoolSize(c.appCfg)
|
||||
newSize := replicatorconfig.PoolSize(c.appCfg)
|
||||
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
|
||||
|
||||
return nil
|
||||
|
|
|
@ -21,10 +21,6 @@ const (
|
|||
|
||||
putSubsection = "put"
|
||||
getSubsection = "get"
|
||||
|
||||
// PutPoolSizeDefault is a default value of routine pool size to
|
||||
// process object.Put requests in object service.
|
||||
PutPoolSizeDefault = 10
|
||||
)
|
||||
|
||||
// Put returns structure that provides access to "put" subsection of
|
||||
|
@ -35,30 +31,6 @@ func Put(c *config.Config) PutConfig {
|
|||
}
|
||||
}
|
||||
|
||||
// PoolSizeRemote returns the value of "remote_pool_size" config parameter.
|
||||
//
|
||||
// Returns PutPoolSizeDefault if the value is not a positive number.
|
||||
func (g PutConfig) PoolSizeRemote() int {
|
||||
v := config.Int(g.cfg, "remote_pool_size")
|
||||
if v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
|
||||
return PutPoolSizeDefault
|
||||
}
|
||||
|
||||
// PoolSizeLocal returns the value of "local_pool_size" config parameter.
|
||||
//
|
||||
// Returns PutPoolSizeDefault if the value is not a positive number.
|
||||
func (g PutConfig) PoolSizeLocal() int {
|
||||
v := config.Int(g.cfg, "local_pool_size")
|
||||
if v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
|
||||
return PutPoolSizeDefault
|
||||
}
|
||||
|
||||
// SkipSessionTokenIssuerVerification returns the value of "skip_session_token_issuer_verification" config parameter or `false“ if is not defined.
|
||||
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
||||
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
||||
|
|
|
@ -13,8 +13,6 @@ func TestObjectSection(t *testing.T) {
|
|||
t.Run("defaults", func(t *testing.T) {
|
||||
empty := configtest.EmptyConfig()
|
||||
|
||||
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote())
|
||||
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeLocal())
|
||||
require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty))
|
||||
require.False(t, objectconfig.Put(empty).SkipSessionTokenIssuerVerification())
|
||||
})
|
||||
|
@ -22,8 +20,6 @@ func TestObjectSection(t *testing.T) {
|
|||
const path = "../../../../config/example/node"
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote())
|
||||
require.Equal(t, 200, objectconfig.Put(c).PoolSizeLocal())
|
||||
require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c))
|
||||
require.True(t, objectconfig.Put(c).SkipSessionTokenIssuerVerification())
|
||||
}
|
||||
|
|
|
@ -11,6 +11,8 @@ const (
|
|||
|
||||
// PutTimeoutDefault is a default timeout of object put request in replicator.
|
||||
PutTimeoutDefault = 5 * time.Second
|
||||
// PoolSizeDefault is a default pool size for put request in replicator.
|
||||
PoolSizeDefault = 10
|
||||
)
|
||||
|
||||
// PutTimeout returns the value of "put_timeout" config parameter
|
||||
|
@ -28,6 +30,13 @@ func PutTimeout(c *config.Config) time.Duration {
|
|||
|
||||
// PoolSize returns the value of "pool_size" config parameter
|
||||
// from "replicator" section.
|
||||
//
|
||||
// Returns PoolSizeDefault if the value is non-positive integer.
|
||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
`not positive` -> `non-positive` :)
a-savchuk
commented
Fixed Fixed
|
||||
func PoolSize(c *config.Config) int {
|
||||
return int(config.IntSafe(c.Sub(subsection), "pool_size"))
|
||||
v := int(config.IntSafe(c.Sub(subsection), "pool_size"))
|
||||
if v > 0 {
|
||||
return v
|
||||
}
|
||||
|
||||
return PoolSizeDefault
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ func TestReplicatorSection(t *testing.T) {
|
|||
empty := configtest.EmptyConfig()
|
||||
|
||||
require.Equal(t, replicatorconfig.PutTimeoutDefault, replicatorconfig.PutTimeout(empty))
|
||||
require.Equal(t, 0, replicatorconfig.PoolSize(empty))
|
||||
require.Equal(t, replicatorconfig.PoolSizeDefault, replicatorconfig.PoolSize(empty))
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
|
|
43
cmd/frostfs-node/config/rpc/config.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package rpcconfig
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
)
|
||||
|
||||
const (
|
||||
subsection = "rpc"
|
||||
limitsSubsection = "limits"
|
||||
)
|
||||
|
||||
type LimitConfig struct {
|
||||
Methods []string
|
||||
MaxOps int64
|
||||
}
|
||||
|
||||
// Limits returns the "limits" config from "rpc" section.
|
||||
func Limits(c *config.Config) []LimitConfig {
|
||||
c = c.Sub(subsection).Sub(limitsSubsection)
|
||||
|
||||
var limits []LimitConfig
|
||||
|
||||
for i := uint64(0); ; i++ {
|
||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
Optional change requestHow about ##### Optional change request
How about `for i := uint64(0); ; i++` as we don't this variable outside?
a-savchuk
commented
Done Done
|
||||
si := strconv.FormatUint(i, 10)
|
||||
sc := c.Sub(si)
|
||||
|
||||
methods := config.StringSliceSafe(sc, "methods")
|
||||
if len(methods) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
maxOps := config.IntSafe(sc, "max_ops")
|
||||
if maxOps == 0 {
|
||||
panic("no max operations for method group")
|
||||
}
|
||||
|
||||
limits = append(limits, LimitConfig{methods, maxOps})
|
||||
}
|
||||
|
||||
return limits
|
||||
}
|
53
cmd/frostfs-node/config/rpc/config_test.go
Normal file
|
@ -0,0 +1,53 @@
|
|||
package rpcconfig
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRPCSection(t *testing.T) {
|
||||
t.Run("defaults", func(t *testing.T) {
|
||||
require.Empty(t, Limits(configtest.EmptyConfig()))
|
||||
})
|
||||
|
||||
t.Run("correct config", func(t *testing.T) {
|
||||
const path = "../../../../config/example/node"
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
limits := Limits(c)
|
||||
require.Len(t, limits, 2)
|
||||
|
||||
limit0 := limits[0]
|
||||
limit1 := limits[1]
|
||||
|
||||
require.ElementsMatch(t, limit0.Methods, []string{"/neo.fs.v2.object.ObjectService/PutSingle", "/neo.fs.v2.object.ObjectService/Put"})
|
||||
require.Equal(t, limit0.MaxOps, int64(1000))
|
||||
|
||||
require.ElementsMatch(t, limit1.Methods, []string{"/neo.fs.v2.object.ObjectService/Get"})
|
||||
require.Equal(t, limit1.MaxOps, int64(10000))
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
||||
t.Run("ENV", func(t *testing.T) {
|
||||
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("no max operations", func(t *testing.T) {
|
||||
const path = "testdata/node"
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
require.Panics(t, func() { _ = Limits(c) })
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
||||
t.Run("ENV", func(t *testing.T) {
|
||||
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||
})
|
||||
})
|
||||
}
|
3
cmd/frostfs-node/config/rpc/testdata/node.env
vendored
Normal file
|
@ -0,0 +1,3 @@
|
|||
FROSTFS_RPC_LIMITS_0_METHODS="/neo.fs.v2.object.ObjectService/PutSingle /neo.fs.v2.object.ObjectService/Put"
|
||||
FROSTFS_RPC_LIMITS_1_METHODS="/neo.fs.v2.object.ObjectService/Get"
|
||||
FROSTFS_RPC_LIMITS_1_MAX_OPS=10000
|
18
cmd/frostfs-node/config/rpc/testdata/node.json
vendored
Normal file
|
@ -0,0 +1,18 @@
|
|||
{
|
||||
"rpc": {
|
||||
"limits": [
|
||||
{
|
||||
"methods": [
|
||||
"/neo.fs.v2.object.ObjectService/PutSingle",
|
||||
"/neo.fs.v2.object.ObjectService/Put"
|
||||
]
|
||||
},
|
||||
{
|
||||
"methods": [
|
||||
"/neo.fs.v2.object.ObjectService/Get"
|
||||
],
|
||||
"max_ops": 10000
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
8
cmd/frostfs-node/config/rpc/testdata/node.yaml
vendored
Normal file
|
@ -0,0 +1,8 @@
|
|||
rpc:
|
||||
limits:
|
||||
- methods:
|
||||
- /neo.fs.v2.object.ObjectService/PutSingle
|
||||
- /neo.fs.v2.object.ObjectService/Put
|
||||
- methods:
|
||||
- /neo.fs.v2.object.ObjectService/Get
|
||||
max_ops: 10000
|
|
@ -9,9 +9,11 @@ import (
|
|||
|
||||
grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
qosInternal "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
@ -134,11 +136,13 @@ func getGrpcServerOpts(ctx context.Context, c *cfg, sc *grpcconfig.Config) ([]gr
|
|||
qos.NewUnaryServerInterceptor(),
|
||||
metrics.NewUnaryServerInterceptor(),
|
||||
tracing.NewUnaryServerInterceptor(),
|
||||
qosInternal.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }),
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
We could use We could use `c.cfgGRPC.limiter.Load` instead of `func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }`.
Was your choice deliberate?
a-savchuk
commented
`NewMaxActiveRPCLimiterUnaryServerInterceptor` receives the `Limiter` interface, but `c.cfgGRPC.limiter.Load` returns the `*SemaphoreLimiter`
```
cannot use c.cfgGRPC.limiter.Load (value of type func() *limiting.SemaphoreLimiter) as func() limiting.Limiter value in argument to qosInternal.NewMaxActiveRPCLimiterStreamServerInterceptor
```
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
qos.NewStreamServerInterceptor(),
|
||||
metrics.NewStreamServerInterceptor(),
|
||||
tracing.NewStreamServerInterceptor(),
|
||||
qosInternal.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }),
|
||||
),
|
||||
}
|
||||
|
||||
|
|
|
@ -326,7 +326,6 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
|
|||
c,
|
||||
c.cfgNetmap.state,
|
||||
irFetcher,
|
||||
objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
||||
objectwriter.WithLogger(c.log),
|
||||
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
|
||||
)
|
||||
|
|
|
@ -87,12 +87,15 @@ FROSTFS_REPLICATOR_POOL_SIZE=10
|
|||
FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=500
|
||||
|
||||
# Object service section
|
||||
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
||||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
||||
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
|
||||
|
||||
FROSTFS_RPC_LIMITS_0_METHODS="/neo.fs.v2.object.ObjectService/PutSingle /neo.fs.v2.object.ObjectService/Put"
|
||||
FROSTFS_RPC_LIMITS_0_MAX_OPS=1000
|
||||
FROSTFS_RPC_LIMITS_1_METHODS="/neo.fs.v2.object.ObjectService/Get"
|
||||
FROSTFS_RPC_LIMITS_1_MAX_OPS=10000
|
||||
|
||||
# Storage engine section
|
||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
|
||||
|
|
|
@ -134,14 +134,29 @@
|
|||
"tombstone_lifetime": 10
|
||||
},
|
||||
"put": {
|
||||
"remote_pool_size": 100,
|
||||
"local_pool_size": 200,
|
||||
"skip_session_token_issuer_verification": true
|
||||
},
|
||||
"get": {
|
||||
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
|
||||
}
|
||||
},
|
||||
"rpc": {
|
||||
"limits": [
|
||||
{
|
||||
"methods": [
|
||||
"/neo.fs.v2.object.ObjectService/PutSingle",
|
||||
"/neo.fs.v2.object.ObjectService/Put"
|
||||
],
|
||||
"max_ops": 1000
|
||||
},
|
||||
{
|
||||
"methods": [
|
||||
"/neo.fs.v2.object.ObjectService/Get"
|
||||
],
|
||||
"max_ops": 10000
|
||||
}
|
||||
]
|
||||
},
|
||||
"storage": {
|
||||
"shard_pool_size": 15,
|
||||
"shard_ro_error_threshold": 100,
|
||||
|
|
|
@ -117,14 +117,22 @@ object:
|
|||
delete:
|
||||
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
||||
put:
|
||||
remote_pool_size: 100 # number of async workers for remote PUT operations
|
||||
local_pool_size: 200 # number of async workers for local PUT operations
|
||||
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
||||
get:
|
||||
priority: # list of metrics of nodes for prioritization
|
||||
- $attribute:ClusterName
|
||||
- $attribute:UN-LOCODE
|
||||
|
||||
rpc:
|
||||
limits:
|
||||
- methods:
|
||||
- /neo.fs.v2.object.ObjectService/PutSingle
|
||||
- /neo.fs.v2.object.ObjectService/Put
|
||||
max_ops: 1000
|
||||
- methods:
|
||||
- /neo.fs.v2.object.ObjectService/Get
|
||||
max_ops: 10000
|
||||
|
||||
storage:
|
||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||
|
|
|
@ -396,18 +396,16 @@ replicator:
|
|||
pool_size: 10
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|---------------|------------|----------------------------------------|---------------------------------------------|
|
||||
| `put_timeout` | `duration` | `5s` | Timeout for performing the `PUT` operation. |
|
||||
| `pool_size` | `int` | Equal to `object.put.remote_pool_size` | Maximum amount of concurrent replications. |
|
||||
| Parameter | Type | Default value | Description |
|
||||
|---------------|------------|---------------|---------------------------------------------|
|
||||
| `put_timeout` | `duration` | `5s` | Timeout for performing the `PUT` operation. |
|
||||
| `pool_size` | `int` | `10` | Maximum amount of concurrent replications. |
|
||||
|
||||
# `object` section
|
||||
Contains object-service related parameters.
|
||||
|
||||
```yaml
|
||||
object:
|
||||
put:
|
||||
remote_pool_size: 100
|
||||
get:
|
||||
priority:
|
||||
- $attribute:ClusterName
|
||||
|
@ -416,10 +414,29 @@ object:
|
|||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------|
|
||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET requests. |
|
||||
|
||||
|
||||
# `rpc` section
|
||||
Contains limits on the number of active RPC for specified method(s).
|
||||
|
||||
```yaml
|
||||
rpc:
|
||||
limits:
|
||||
- methods:
|
||||
- /neo.fs.v2.object.ObjectService/PutSingle
|
||||
- /neo.fs.v2.object.ObjectService/Put
|
||||
max_ops: 1000
|
||||
- methods:
|
||||
- /neo.fs.v2.object.ObjectService/Get
|
||||
max_ops: 10000
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|------------------|------------|---------------|--------------------------------------------------------------|
|
||||
| `limits.max_ops` | `int` | | Maximum number of active RPC allowed for the given method(s) |
|
||||
| `limits.methods` | `[]string` | | List of RPC methods sharing the given limit |
|
||||
|
||||
# `runtime` section
|
||||
Contains runtime parameters.
|
||||
|
||||
|
|
2
go.mod
|
@ -8,7 +8,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
|
|
4
go.sum
|
@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
|||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe h1:81gDNdWNLP24oMQukRiCE9R1wGSh0l0dRq3F1W+Oesc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf h1:ik2aMBpTJJpoZe2ffcGShXRkrvny65NEPLVt67KmH/A=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 h1:dOZHuOywvH1ms8U38lDCWpysgkCCeJ02RLI7zDhPcyw=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
|
|
|
@ -125,7 +125,6 @@ const (
|
|||
SearchCouldNotWriteObjectIdentifiers = "could not write object identifiers"
|
||||
SearchLocalOperationFailed = "local operation failed"
|
||||
UtilObjectServiceError = "object service error"
|
||||
UtilCouldNotPushTaskToWorkerPool = "could not push task to worker pool"
|
||||
V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring"
|
||||
V2CantCheckIfRequestFromContainerNode = "can't check if request from container node"
|
||||
ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch"
|
||||
|
|
|
@ -3,7 +3,9 @@ package qos
|
|||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
|
@ -49,3 +51,36 @@ func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientIntercepto
|
|||
return streamer(ctx, desc, cc, method, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func NewMaxActiveRPCLimiterUnaryServerInterceptor(getLimiter func() limiting.Limiter) grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
|
||||
if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == IOTagCritical.String() {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
release, ok := getLimiter().Acquire(info.FullMethod)
|
||||
if !ok {
|
||||
return nil, new(apistatus.ResourceExhausted)
|
||||
}
|
||||
defer release()
|
||||
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:contextcheck (grpc.ServerStream manages the context itself)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`false positive` implies a bug in linter.
Here we have no bug in linter, just cumbersome gRPC API.
a-savchuk
commented
Made the message more clear Made the message more clear
|
||||
func NewMaxActiveRPCLimiterStreamServerInterceptor(getLimiter func() limiting.Limiter) grpc.StreamServerInterceptor {
|
||||
fyrchik
commented
Refs #1656. Refs #1656.
|
||||
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
if tag, ok := tagging.IOTagFromContext(ss.Context()); ok && tag == IOTagCritical.String() {
|
||||
return handler(srv, ss)
|
||||
}
|
||||
|
||||
release, ok := getLimiter().Acquire(info.FullMethod)
|
||||
if !ok {
|
||||
return new(apistatus.ResourceExhausted)
|
||||
}
|
||||
defer release()
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
The current behavior is as follows: the counter increases, the handler is called, the counter decreases, The current behavior is as follows: the counter increases, the handler is called, the counter decreases, `ss grpc.ServerStream` will continue to work.
You need to make a wrapper over `ss grpc.ServerStream` that will decrease the counter when the stream is closed.
a-savchuk
commented
Are you sure? I always thought an interceptor's handler is called only once. I don't need to track each message, so I can reduce the counter on the handler exit, can't I? Are you sure? I always thought an interceptor's handler is called only once. I don't need to track each message, so I can reduce the counter on the handler exit, can't I?
dstepanov-yadro
commented
Now I'm not sure:) So if you checked and it works as expected, then it is great! Now I'm not sure:) So if you checked and it works as expected, then it is great!
a-savchuk
commented
I'll check it one more time I'll check it one more time
a-savchuk
commented
I think the handler calls
I think the handler calls `RecvMsg` and other stream methods itself, so if I don't need to process each stream message, I can simply call the handler.
- handler invocation from the source code
https://github.com/grpc/grpc-go/blob/65c6718afb5d5d41a277f4b3c6439ac80bb38292/server.go#L1570-L1579
https://github.com/grpc/grpc-go/blob/65c6718afb5d5d41a277f4b3c6439ac80bb38292/server.go#L1694-L1703
- server interceptor example
https://github.com/grpc/grpc-go/blob/65c6718afb5d5d41a277f4b3c6439ac80bb38292/examples/features/interceptor/server/main.go
|
||||
|
||||
return handler(srv, ss)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,11 +79,11 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
continue
|
||||
}
|
||||
|
||||
workerPool, isLocal := n.cfg.getWorkerPool(addr.PublicKey())
|
||||
isLocal := n.cfg.NetmapKeys.IsLocalKey(addr.PublicKey())
|
||||
|
||||
item := new(bool)
|
||||
wg.Add(1)
|
||||
if err := workerPool.Submit(func() {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
|
||||
|
@ -95,11 +95,7 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
|
||||
traverser.SubmitSuccess()
|
||||
*item = true
|
||||
}); err != nil {
|
||||
wg.Done()
|
||||
svcutil.LogWorkerPoolError(ctx, n.cfg.Logger, "PUT", err)
|
||||
return true
|
||||
}
|
||||
}()
|
||||
|
||||
// Mark the container node as processed in order to exclude it
|
||||
// in subsequent container broadcast. Note that we don't
|
||||
|
|
|
@ -149,17 +149,7 @@ func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
|
|||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||
}
|
||||
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = e.Relay(ctx, info, c)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
svcutil.LogWorkerPoolError(ctx, e.Config.Logger, "PUT", poolErr)
|
||||
return poolErr
|
||||
}
|
||||
<-completed
|
||||
|
||||
err = e.Relay(ctx, info, c)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -343,21 +333,11 @@ func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, n
|
|||
}
|
||||
|
||||
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||
var err error
|
||||
localTarget := LocalTarget{
|
||||
Storage: e.Config.LocalStore,
|
||||
Container: e.Container,
|
||||
}
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.Config.LocalPool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = localTarget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
return poolErr
|
||||
}
|
||||
<-completed
|
||||
return err
|
||||
return localTarget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}
|
||||
|
||||
func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||
|
@ -371,15 +351,5 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
|
|||
nodeInfo: clientNodeInfo,
|
||||
}
|
||||
|
||||
var err error
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
return poolErr
|
||||
}
|
||||
<-completed
|
||||
return err
|
||||
return remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -131,9 +130,6 @@ func TestECWriter(t *testing.T) {
|
|||
nodeKey, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
||||
require.NoError(t, err)
|
||||
|
||||
log, err := logger.NewLogger(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -141,7 +137,6 @@ func TestECWriter(t *testing.T) {
|
|||
ecw := ECWriter{
|
||||
Config: &Config{
|
||||
NetmapKeys: n,
|
||||
RemotePool: pool,
|
||||
Logger: log,
|
||||
ClientConstructor: clientConstructor{vectors: ns},
|
||||
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -52,8 +51,6 @@ type Config struct {
|
|||
|
||||
NetmapSource netmap.Source
|
||||
|
||||
RemotePool, LocalPool util.WorkerPool
|
||||
|
||||
NetmapKeys netmap.AnnouncedKeys
|
||||
|
||||
FormatValidator *object.FormatValidator
|
||||
|
@ -69,12 +66,6 @@ type Config struct {
|
|||
|
||||
type Option func(*Config)
|
||||
|
||||
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
||||
return func(c *Config) {
|
||||
c.RemotePool, c.LocalPool = remote, local
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *Config) {
|
||||
c.Logger = l
|
||||
|
@ -87,13 +78,6 @@ func WithVerifySessionTokenIssuer(v bool) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
|
||||
if c.NetmapKeys.IsLocalKey(pub) {
|
||||
return c.LocalPool, true
|
||||
}
|
||||
return c.RemotePool, false
|
||||
}
|
||||
|
||||
type Params struct {
|
||||
Config *Config
|
||||
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -27,8 +26,6 @@ func NewService(ks *objutil.KeyStorage,
|
|||
opts ...objectwriter.Option,
|
||||
) *Service {
|
||||
c := &objectwriter.Config{
|
||||
RemotePool: util.NewPseudoWorkerPool(),
|
||||
LocalPool: util.NewPseudoWorkerPool(),
|
||||
Logger: logger.NewLoggerWrapper(zap.L()),
|
||||
KeyStorage: ks,
|
||||
ClientConstructor: cc,
|
||||
|
|
|
@ -17,11 +17,3 @@ func LogServiceError(ctx context.Context, l *logger.Logger, req string, node net
|
|||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
||||
// LogWorkerPoolError writes debug error message of object worker pool to provided logger.
|
||||
func LogWorkerPoolError(ctx context.Context, l *logger.Logger, req string, err error) {
|
||||
l.Error(ctx, logs.UtilCouldNotPushTaskToWorkerPool,
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Also drop Also drop `logs.UtilCouldNotPushTaskToWorkerPool`
a-savchuk
commented
Done Done
|
||||
zap.String("request", req),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
|
We currently do not check for
Keys
uniqueness insidelimits
array.This may be either a feature or a misconfiguration.
I don't think we need such feature, at the very least, the last item should take priority. And without wildcards, such feature has very limited use.
So, what about checking for uniqueness?
Also, there may be typos, proto names are not short. Do we have any solution for this?
c.cfgGRPC.servers[0].Server.GetServiceInfo()
could be of use.As an example of where this will be useful -- future transition from
neo.fs
tofrostfs
namespace.The limiter already do this check
func (lr *SemaphoreLimiter) addLimit(limit *KeyLimit, sem *semaphore.Semaphore) error {
for _, key := range limit.Keys {
if _, exists := lr.m[key]; exists {
return fmt.Errorf("duplicate key %q", key)
}
lr.m[key] = sem
}
return nil
}
I'll add it