Support active RPC limiting #1639

Merged
dstepanov-yadro merged 5 commits from a-savchuk/frostfs-node:rps-limiting into master 2025-02-28 11:08:10 +00:00
26 changed files with 277 additions and 155 deletions

View file

@ -29,6 +29,7 @@ import (
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
rpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/rpc"
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
@ -69,6 +70,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -528,6 +530,8 @@ type cfgGRPC struct {
maxChunkSize uint64
maxAddrAmount uint64
reconnectTimeout time.Duration
limiter atomic.Pointer[limiting.SemaphoreLimiter]
}
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
@ -664,10 +668,6 @@ type cfgAccessPolicyEngine struct {
}
type cfgObjectRoutines struct {
putRemote *ants.Pool
putLocal *ants.Pool
replication *ants.Pool
}
@ -721,7 +721,7 @@ func initCfg(appCfg *config.Config) *cfg {
c.cfgNetmap = initNetmap(appCfg, netState, relayOnly)
c.cfgGRPC = initCfgGRPC()
c.cfgGRPC = initCfgGRPC(appCfg)
c.cfgMorph = cfgMorph{
proxyScriptHash: contractsconfig.Proxy(appCfg),
@ -852,14 +852,23 @@ func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
}
}
func initCfgGRPC() cfgGRPC {
func initCfgGRPC(appCfg *config.Config) (cfg cfgGRPC) {
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
return cfgGRPC{
maxChunkSize: maxChunkSize,
maxAddrAmount: maxAddrAmount,
var limits []limiting.KeyLimit
for _, l := range rpcconfig.Limits(appCfg) {
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})
}
limiter, err := limiting.NewSemaphoreLimiter(limits)
fatalOnErr(err)
cfg.maxChunkSize = maxChunkSize
cfg.maxAddrAmount = maxAddrAmount
cfg.limiter.Store(limiter)
return
}
func initCfgObject(appCfg *config.Config) cfgObject {
@ -1166,21 +1175,7 @@ func initAccessPolicyEngine(ctx context.Context, c *cfg) {
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
var err error
optNonBlocking := ants.WithNonblocking(true)
putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote()
pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking)
fatalOnErr(err)
putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal()
pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking)
fatalOnErr(err)
replicatorPoolSize := replicatorconfig.PoolSize(cfg)
if replicatorPoolSize <= 0 {
replicatorPoolSize = putRemoteCapacity
}
pool.replication, err = ants.NewPool(replicatorPoolSize)
fatalOnErr(err)
@ -1410,17 +1405,28 @@ func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp {
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
}
components = append(components, dCmp{"rpc_limiter", c.reloadLimits})
return components
}
func (c *cfg) reloadLimits() error {
var limits []limiting.KeyLimit
for _, l := range rpcconfig.Limits(c.appCfg) {
limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps})

We currently do not check for Keys uniqueness inside limits array.
This may be either a feature or a misconfiguration.
I don't think we need such feature, at the very least, the last item should take priority. And without wildcards, such feature has very limited use.

So, what about checking for uniqueness?
Also, there may be typos, proto names are not short. Do we have any solution for this?

We currently do not check for `Keys` uniqueness inside `limits` array. This may be either a feature or a misconfiguration. I don't think we need such feature, at the very least, the last item should take priority. And without wildcards, such feature has very limited use. So, what about checking for uniqueness? Also, there may be typos, proto names are not short. Do we have any solution for this?

c.cfgGRPC.servers[0].Server.GetServiceInfo() could be of use.

`c.cfgGRPC.servers[0].Server.GetServiceInfo()` could be of use.

As an example of where this will be useful -- future transition from neo.fs to frostfs namespace.

As an example of where this will be useful -- future transition from `neo.fs` to `frostfs` namespace.

So, what about checking for uniqueness?

The limiter already do this check

func (lr *SemaphoreLimiter) addLimit(limit *KeyLimit, sem *semaphore.Semaphore) error {
for _, key := range limit.Keys {
if _, exists := lr.m[key]; exists {
return fmt.Errorf("duplicate key %q", key)
}
lr.m[key] = sem
}
return nil
}

> So, what about checking for uniqueness? The limiter already do this check https://git.frostfs.info/TrueCloudLab/frostfs-qos/src/commit/356851eed3bf77e13c87bca9606935c8e7c58769/limiting/limiter.go#L55-L63

Also, there may be typos, proto names are not short. Do we have any solution for this?

I'll add it

> Also, there may be typos, proto names are not short. Do we have any solution for this? I'll add it
}
limiter, err := limiting.NewSemaphoreLimiter(limits)
if err != nil {
return err
}
c.cfgGRPC.limiter.Store(limiter)
return nil
}
func (c *cfg) reloadPools() error {
newSize := objectconfig.Put(c.appCfg).PoolSizeLocal()
c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size")
newSize = objectconfig.Put(c.appCfg).PoolSizeRemote()
c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size")
newSize = replicatorconfig.PoolSize(c.appCfg)
newSize := replicatorconfig.PoolSize(c.appCfg)
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
return nil

View file

@ -21,10 +21,6 @@ const (
putSubsection = "put"
getSubsection = "get"
// PutPoolSizeDefault is a default value of routine pool size to
// process object.Put requests in object service.
PutPoolSizeDefault = 10
)
// Put returns structure that provides access to "put" subsection of
@ -35,30 +31,6 @@ func Put(c *config.Config) PutConfig {
}
}
// PoolSizeRemote returns the value of "remote_pool_size" config parameter.
//
// Returns PutPoolSizeDefault if the value is not a positive number.
func (g PutConfig) PoolSizeRemote() int {
v := config.Int(g.cfg, "remote_pool_size")
if v > 0 {
return int(v)
}
return PutPoolSizeDefault
}
// PoolSizeLocal returns the value of "local_pool_size" config parameter.
//
// Returns PutPoolSizeDefault if the value is not a positive number.
func (g PutConfig) PoolSizeLocal() int {
v := config.Int(g.cfg, "local_pool_size")
if v > 0 {
return int(v)
}
return PutPoolSizeDefault
}
// SkipSessionTokenIssuerVerification returns the value of "skip_session_token_issuer_verification" config parameter or `false“ if is not defined.
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")

View file

@ -13,8 +13,6 @@ func TestObjectSection(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
empty := configtest.EmptyConfig()
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote())
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeLocal())
require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty))
require.False(t, objectconfig.Put(empty).SkipSessionTokenIssuerVerification())
})
@ -22,8 +20,6 @@ func TestObjectSection(t *testing.T) {
const path = "../../../../config/example/node"
fileConfigTest := func(c *config.Config) {
require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote())
require.Equal(t, 200, objectconfig.Put(c).PoolSizeLocal())
require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c))
require.True(t, objectconfig.Put(c).SkipSessionTokenIssuerVerification())
}

View file

@ -11,6 +11,8 @@ const (
// PutTimeoutDefault is a default timeout of object put request in replicator.
PutTimeoutDefault = 5 * time.Second
// PoolSizeDefault is a default pool size for put request in replicator.
PoolSizeDefault = 10
)
// PutTimeout returns the value of "put_timeout" config parameter
@ -28,6 +30,13 @@ func PutTimeout(c *config.Config) time.Duration {
// PoolSize returns the value of "pool_size" config parameter
// from "replicator" section.
//
// Returns PoolSizeDefault if the value is non-positive integer.
aarifullin marked this conversation as resolved Outdated

not positive -> non-positive :)

`not positive` -> `non-positive` :)

Fixed

Fixed
func PoolSize(c *config.Config) int {
return int(config.IntSafe(c.Sub(subsection), "pool_size"))
v := int(config.IntSafe(c.Sub(subsection), "pool_size"))
if v > 0 {
return v
}
return PoolSizeDefault
}

View file

@ -15,7 +15,7 @@ func TestReplicatorSection(t *testing.T) {
empty := configtest.EmptyConfig()
require.Equal(t, replicatorconfig.PutTimeoutDefault, replicatorconfig.PutTimeout(empty))
require.Equal(t, 0, replicatorconfig.PoolSize(empty))
require.Equal(t, replicatorconfig.PoolSizeDefault, replicatorconfig.PoolSize(empty))
})
const path = "../../../../config/example/node"

View file

@ -0,0 +1,43 @@
package rpcconfig
import (
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
)
const (
subsection = "rpc"
limitsSubsection = "limits"
)
type LimitConfig struct {
Methods []string
MaxOps int64
}
// Limits returns the "limits" config from "rpc" section.
func Limits(c *config.Config) []LimitConfig {
c = c.Sub(subsection).Sub(limitsSubsection)
var limits []LimitConfig
for i := uint64(0); ; i++ {
aarifullin marked this conversation as resolved Outdated
Optional change request

How about for i := uint64(0); ; i++ as we don't this variable outside?

##### Optional change request How about `for i := uint64(0); ; i++` as we don't this variable outside?

Done

Done
si := strconv.FormatUint(i, 10)
sc := c.Sub(si)
methods := config.StringSliceSafe(sc, "methods")
if len(methods) == 0 {
break
}
maxOps := config.IntSafe(sc, "max_ops")
if maxOps == 0 {
panic("no max operations for method group")
}
limits = append(limits, LimitConfig{methods, maxOps})
}
return limits
}

View file

@ -0,0 +1,53 @@
package rpcconfig
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
"github.com/stretchr/testify/require"
)
func TestRPCSection(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
require.Empty(t, Limits(configtest.EmptyConfig()))
})
t.Run("correct config", func(t *testing.T) {
const path = "../../../../config/example/node"
fileConfigTest := func(c *config.Config) {
limits := Limits(c)
require.Len(t, limits, 2)
limit0 := limits[0]
limit1 := limits[1]
require.ElementsMatch(t, limit0.Methods, []string{"/neo.fs.v2.object.ObjectService/PutSingle", "/neo.fs.v2.object.ObjectService/Put"})
require.Equal(t, limit0.MaxOps, int64(1000))
require.ElementsMatch(t, limit1.Methods, []string{"/neo.fs.v2.object.ObjectService/Get"})
require.Equal(t, limit1.MaxOps, int64(10000))
}
configtest.ForEachFileType(path, fileConfigTest)
t.Run("ENV", func(t *testing.T) {
configtest.ForEnvFileType(t, path, fileConfigTest)
})
})
t.Run("no max operations", func(t *testing.T) {
const path = "testdata/node"
fileConfigTest := func(c *config.Config) {
require.Panics(t, func() { _ = Limits(c) })
}
configtest.ForEachFileType(path, fileConfigTest)
t.Run("ENV", func(t *testing.T) {
configtest.ForEnvFileType(t, path, fileConfigTest)
})
})
}

View file

@ -0,0 +1,3 @@
FROSTFS_RPC_LIMITS_0_METHODS="/neo.fs.v2.object.ObjectService/PutSingle /neo.fs.v2.object.ObjectService/Put"
FROSTFS_RPC_LIMITS_1_METHODS="/neo.fs.v2.object.ObjectService/Get"
FROSTFS_RPC_LIMITS_1_MAX_OPS=10000

View file

@ -0,0 +1,18 @@
{
"rpc": {
"limits": [
{
"methods": [
"/neo.fs.v2.object.ObjectService/PutSingle",
"/neo.fs.v2.object.ObjectService/Put"
]
},
{
"methods": [
"/neo.fs.v2.object.ObjectService/Get"
],
"max_ops": 10000
}
]
}
}

View file

@ -0,0 +1,8 @@
rpc:
limits:
- methods:
- /neo.fs.v2.object.ObjectService/PutSingle
- /neo.fs.v2.object.ObjectService/Put
- methods:
- /neo.fs.v2.object.ObjectService/Get
max_ops: 10000

View file

@ -9,9 +9,11 @@ import (
grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
qosInternal "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -134,11 +136,13 @@ func getGrpcServerOpts(ctx context.Context, c *cfg, sc *grpcconfig.Config) ([]gr
qos.NewUnaryServerInterceptor(),
metrics.NewUnaryServerInterceptor(),
tracing.NewUnaryServerInterceptor(),
qosInternal.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }),
fyrchik marked this conversation as resolved Outdated

We could use c.cfgGRPC.limiter.Load instead of func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }.
Was your choice deliberate?

We could use `c.cfgGRPC.limiter.Load` instead of `func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }`. Was your choice deliberate?

NewMaxActiveRPCLimiterUnaryServerInterceptor receives the Limiter interface, but c.cfgGRPC.limiter.Load returns the *SemaphoreLimiter

cannot use c.cfgGRPC.limiter.Load (value of type func() *limiting.SemaphoreLimiter) as func() limiting.Limiter value in argument to qosInternal.NewMaxActiveRPCLimiterStreamServerInterceptor
`NewMaxActiveRPCLimiterUnaryServerInterceptor` receives the `Limiter` interface, but `c.cfgGRPC.limiter.Load` returns the `*SemaphoreLimiter` ``` cannot use c.cfgGRPC.limiter.Load (value of type func() *limiting.SemaphoreLimiter) as func() limiting.Limiter value in argument to qosInternal.NewMaxActiveRPCLimiterStreamServerInterceptor ```
),
grpc.ChainStreamInterceptor(
qos.NewStreamServerInterceptor(),
metrics.NewStreamServerInterceptor(),
tracing.NewStreamServerInterceptor(),
qosInternal.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }),
),
}

View file

@ -326,7 +326,6 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
c,
c.cfgNetmap.state,
irFetcher,
objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
objectwriter.WithLogger(c.log),
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
)

View file

@ -87,12 +87,15 @@ FROSTFS_REPLICATOR_POOL_SIZE=10
FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=500
# Object service section
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
FROSTFS_RPC_LIMITS_0_METHODS="/neo.fs.v2.object.ObjectService/PutSingle /neo.fs.v2.object.ObjectService/Put"
FROSTFS_RPC_LIMITS_0_MAX_OPS=1000
FROSTFS_RPC_LIMITS_1_METHODS="/neo.fs.v2.object.ObjectService/Get"
FROSTFS_RPC_LIMITS_1_MAX_OPS=10000
# Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100

View file

@ -134,14 +134,29 @@
"tombstone_lifetime": 10
},
"put": {
"remote_pool_size": 100,
"local_pool_size": 200,
"skip_session_token_issuer_verification": true
},
"get": {
"priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"]
}
},
"rpc": {
"limits": [
{
"methods": [
"/neo.fs.v2.object.ObjectService/PutSingle",
"/neo.fs.v2.object.ObjectService/Put"
],
"max_ops": 1000
},
{
"methods": [
"/neo.fs.v2.object.ObjectService/Get"
],
"max_ops": 10000
}
]
},
"storage": {
"shard_pool_size": 15,
"shard_ro_error_threshold": 100,

View file

@ -117,14 +117,22 @@ object:
delete:
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
put:
remote_pool_size: 100 # number of async workers for remote PUT operations
local_pool_size: 200 # number of async workers for local PUT operations
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
get:
priority: # list of metrics of nodes for prioritization
- $attribute:ClusterName
- $attribute:UN-LOCODE
rpc:
limits:
- methods:
- /neo.fs.v2.object.ObjectService/PutSingle
- /neo.fs.v2.object.ObjectService/Put
max_ops: 1000
- methods:
- /neo.fs.v2.object.ObjectService/Get
max_ops: 10000
storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations

View file

@ -396,18 +396,16 @@ replicator:
pool_size: 10
```
| Parameter | Type | Default value | Description |
|---------------|------------|----------------------------------------|---------------------------------------------|
| `put_timeout` | `duration` | `5s` | Timeout for performing the `PUT` operation. |
| `pool_size` | `int` | Equal to `object.put.remote_pool_size` | Maximum amount of concurrent replications. |
| Parameter | Type | Default value | Description |
|---------------|------------|---------------|---------------------------------------------|
| `put_timeout` | `duration` | `5s` | Timeout for performing the `PUT` operation. |
| `pool_size` | `int` | `10` | Maximum amount of concurrent replications. |
# `object` section
Contains object-service related parameters.
```yaml
object:
put:
remote_pool_size: 100
get:
priority:
- $attribute:ClusterName
@ -416,10 +414,29 @@ object:
| Parameter | Type | Default value | Description |
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET requests. |
# `rpc` section
Contains limits on the number of active RPC for specified method(s).
```yaml
rpc:
limits:
- methods:
- /neo.fs.v2.object.ObjectService/PutSingle
- /neo.fs.v2.object.ObjectService/Put
max_ops: 1000
- methods:
- /neo.fs.v2.object.ObjectService/Get
max_ops: 10000
```
| Parameter | Type | Default value | Description |
|------------------|------------|---------------|--------------------------------------------------------------|
| `limits.max_ops` | `int` | | Maximum number of active RPC allowed for the given method(s) |
| `limits.methods` | `[]string` | | List of RPC methods sharing the given limit |
# `runtime` section
Contains runtime parameters.

2
go.mod
View file

@ -8,7 +8,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972

4
go.sum
View file

@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe h1:81gDNdWNLP24oMQukRiCE9R1wGSh0l0dRq3F1W+Oesc=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf h1:ik2aMBpTJJpoZe2ffcGShXRkrvny65NEPLVt67KmH/A=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 h1:dOZHuOywvH1ms8U38lDCWpysgkCCeJ02RLI7zDhPcyw=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=

View file

@ -125,7 +125,6 @@ const (
SearchCouldNotWriteObjectIdentifiers = "could not write object identifiers"
SearchLocalOperationFailed = "local operation failed"
UtilObjectServiceError = "object service error"
UtilCouldNotPushTaskToWorkerPool = "could not push task to worker pool"
V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring"
V2CantCheckIfRequestFromContainerNode = "can't check if request from container node"
ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch"

View file

@ -3,7 +3,9 @@ package qos
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"google.golang.org/grpc"
)
@ -49,3 +51,36 @@ func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientIntercepto
return streamer(ctx, desc, cc, method, opts...)
}
}
func NewMaxActiveRPCLimiterUnaryServerInterceptor(getLimiter func() limiting.Limiter) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == IOTagCritical.String() {
return handler(ctx, req)
}
release, ok := getLimiter().Acquire(info.FullMethod)
if !ok {
return nil, new(apistatus.ResourceExhausted)
}
defer release()
return handler(ctx, req)
}
}
//nolint:contextcheck (grpc.ServerStream manages the context itself)
fyrchik marked this conversation as resolved Outdated

false positive implies a bug in linter.
Here we have no bug in linter, just cumbersome gRPC API.

`false positive` implies a bug in linter. Here we have no bug in linter, just cumbersome gRPC API.

Made the message more clear

Made the message more clear
func NewMaxActiveRPCLimiterStreamServerInterceptor(getLimiter func() limiting.Limiter) grpc.StreamServerInterceptor {

Refs #1656.

Refs #1656.
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if tag, ok := tagging.IOTagFromContext(ss.Context()); ok && tag == IOTagCritical.String() {
return handler(srv, ss)
}
release, ok := getLimiter().Acquire(info.FullMethod)
if !ok {
return new(apistatus.ResourceExhausted)
}
defer release()
dstepanov-yadro marked this conversation as resolved Outdated

The current behavior is as follows: the counter increases, the handler is called, the counter decreases, ss grpc.ServerStream will continue to work.
You need to make a wrapper over ss grpc.ServerStream that will decrease the counter when the stream is closed.

The current behavior is as follows: the counter increases, the handler is called, the counter decreases, `ss grpc.ServerStream` will continue to work. You need to make a wrapper over `ss grpc.ServerStream` that will decrease the counter when the stream is closed.

Are you sure? I always thought an interceptor's handler is called only once. I don't need to track each message, so I can reduce the counter on the handler exit, can't I?

Are you sure? I always thought an interceptor's handler is called only once. I don't need to track each message, so I can reduce the counter on the handler exit, can't I?

Now I'm not sure:) So if you checked and it works as expected, then it is great!

Now I'm not sure:) So if you checked and it works as expected, then it is great!

I'll check it one more time

I'll check it one more time

I think the handler calls RecvMsg and other stream methods itself, so if I don't need to process each stream message, I can simply call the handler.

I think the handler calls `RecvMsg` and other stream methods itself, so if I don't need to process each stream message, I can simply call the handler. - handler invocation from the source code https://github.com/grpc/grpc-go/blob/65c6718afb5d5d41a277f4b3c6439ac80bb38292/server.go#L1570-L1579 https://github.com/grpc/grpc-go/blob/65c6718afb5d5d41a277f4b3c6439ac80bb38292/server.go#L1694-L1703 - server interceptor example https://github.com/grpc/grpc-go/blob/65c6718afb5d5d41a277f4b3c6439ac80bb38292/examples/features/interceptor/server/main.go
return handler(srv, ss)
}
}

View file

@ -79,11 +79,11 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.
continue
}
workerPool, isLocal := n.cfg.getWorkerPool(addr.PublicKey())
isLocal := n.cfg.NetmapKeys.IsLocalKey(addr.PublicKey())
item := new(bool)
wg.Add(1)
if err := workerPool.Submit(func() {
go func() {
defer wg.Done()
err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
@ -95,11 +95,7 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.
traverser.SubmitSuccess()
*item = true
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(ctx, n.cfg.Logger, "PUT", err)
return true
}
}()
// Mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't

View file

@ -149,17 +149,7 @@ func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
completed := make(chan interface{})
if poolErr := e.Config.RemotePool.Submit(func() {
defer close(completed)
err = e.Relay(ctx, info, c)
}); poolErr != nil {
close(completed)
svcutil.LogWorkerPoolError(ctx, e.Config.Logger, "PUT", poolErr)
return poolErr
}
<-completed
err = e.Relay(ctx, info, c)
if err == nil {
return nil
}
@ -343,21 +333,11 @@ func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, n
}
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
var err error
localTarget := LocalTarget{
Storage: e.Config.LocalStore,
Container: e.Container,
}
completed := make(chan interface{})
if poolErr := e.Config.LocalPool.Submit(func() {
defer close(completed)
err = localTarget.WriteObject(ctx, obj, e.ObjectMeta)
}); poolErr != nil {
close(completed)
return poolErr
}
<-completed
return err
return localTarget.WriteObject(ctx, obj, e.ObjectMeta)
}
func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
@ -371,15 +351,5 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
nodeInfo: clientNodeInfo,
}
var err error
completed := make(chan interface{})
if poolErr := e.Config.RemotePool.Submit(func() {
defer close(completed)
err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
}); poolErr != nil {
close(completed)
return poolErr
}
<-completed
return err
return remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
}

View file

@ -31,7 +31,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
@ -131,9 +130,6 @@ func TestECWriter(t *testing.T) {
nodeKey, err := keys.NewPrivateKey()
require.NoError(t, err)
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
require.NoError(t, err)
log, err := logger.NewLogger(nil)
require.NoError(t, err)
@ -141,7 +137,6 @@ func TestECWriter(t *testing.T) {
ecw := ECWriter{
Config: &Config{
NetmapKeys: n,
RemotePool: pool,
Logger: log,
ClientConstructor: clientConstructor{vectors: ns},
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),

View file

@ -12,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -52,8 +51,6 @@ type Config struct {
NetmapSource netmap.Source
RemotePool, LocalPool util.WorkerPool
NetmapKeys netmap.AnnouncedKeys
FormatValidator *object.FormatValidator
@ -69,12 +66,6 @@ type Config struct {
type Option func(*Config)
func WithWorkerPools(remote, local util.WorkerPool) Option {
return func(c *Config) {
c.RemotePool, c.LocalPool = remote, local
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *Config) {
c.Logger = l
@ -87,13 +78,6 @@ func WithVerifySessionTokenIssuer(v bool) Option {
}
}
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
if c.NetmapKeys.IsLocalKey(pub) {
return c.LocalPool, true
}
return c.RemotePool, false
}
type Params struct {
Config *Config

View file

@ -6,7 +6,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
@ -27,8 +26,6 @@ func NewService(ks *objutil.KeyStorage,
opts ...objectwriter.Option,
) *Service {
c := &objectwriter.Config{
RemotePool: util.NewPseudoWorkerPool(),
LocalPool: util.NewPseudoWorkerPool(),
Logger: logger.NewLoggerWrapper(zap.L()),
KeyStorage: ks,
ClientConstructor: cc,

View file

@ -17,11 +17,3 @@ func LogServiceError(ctx context.Context, l *logger.Logger, req string, node net
zap.Error(err),
)
}
// LogWorkerPoolError writes debug error message of object worker pool to provided logger.
func LogWorkerPoolError(ctx context.Context, l *logger.Logger, req string, err error) {
l.Error(ctx, logs.UtilCouldNotPushTaskToWorkerPool,
dstepanov-yadro marked this conversation as resolved Outdated

Also drop logs.UtilCouldNotPushTaskToWorkerPool

Also drop `logs.UtilCouldNotPushTaskToWorkerPool`

Done

Done
zap.String("request", req),
zap.Error(err),
)
}