From e9c771584f98743c7cd9bfd2192a1fedcfa47ac8 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Thu, 6 Feb 2025 11:09:33 +0300 Subject: [PATCH 1/5] [#1639] config: Separate `replicator.pool_size` from other settings Separated `replicator.pool_size` and `object.put.remote_pool_size` settings. Signed-off-by: Aleksey Savchuk --- cmd/frostfs-node/config.go | 4 ---- cmd/frostfs-node/config/replicator/config.go | 11 ++++++++++- cmd/frostfs-node/config/replicator/config_test.go | 2 +- docs/storage-node-configuration.md | 8 ++++---- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 511777566..d575c7228 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -1177,10 +1177,6 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) { fatalOnErr(err) replicatorPoolSize := replicatorconfig.PoolSize(cfg) - if replicatorPoolSize <= 0 { - replicatorPoolSize = putRemoteCapacity - } - pool.replication, err = ants.NewPool(replicatorPoolSize) fatalOnErr(err) diff --git a/cmd/frostfs-node/config/replicator/config.go b/cmd/frostfs-node/config/replicator/config.go index 0fbac935c..e954bf19d 100644 --- a/cmd/frostfs-node/config/replicator/config.go +++ b/cmd/frostfs-node/config/replicator/config.go @@ -11,6 +11,8 @@ const ( // PutTimeoutDefault is a default timeout of object put request in replicator. PutTimeoutDefault = 5 * time.Second + // PoolSizeDefault is a default pool size for put request in replicator. + PoolSizeDefault = 10 ) // PutTimeout returns the value of "put_timeout" config parameter @@ -28,6 +30,13 @@ func PutTimeout(c *config.Config) time.Duration { // PoolSize returns the value of "pool_size" config parameter // from "replicator" section. +// +// Returns PoolSizeDefault if the value is non-positive integer. func PoolSize(c *config.Config) int { - return int(config.IntSafe(c.Sub(subsection), "pool_size")) + v := int(config.IntSafe(c.Sub(subsection), "pool_size")) + if v > 0 { + return v + } + + return PoolSizeDefault } diff --git a/cmd/frostfs-node/config/replicator/config_test.go b/cmd/frostfs-node/config/replicator/config_test.go index 2129c01b4..2aa490946 100644 --- a/cmd/frostfs-node/config/replicator/config_test.go +++ b/cmd/frostfs-node/config/replicator/config_test.go @@ -15,7 +15,7 @@ func TestReplicatorSection(t *testing.T) { empty := configtest.EmptyConfig() require.Equal(t, replicatorconfig.PutTimeoutDefault, replicatorconfig.PutTimeout(empty)) - require.Equal(t, 0, replicatorconfig.PoolSize(empty)) + require.Equal(t, replicatorconfig.PoolSizeDefault, replicatorconfig.PoolSize(empty)) }) const path = "../../../../config/example/node" diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index de2729c68..aef05d589 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -396,10 +396,10 @@ replicator: pool_size: 10 ``` -| Parameter | Type | Default value | Description | -|---------------|------------|----------------------------------------|---------------------------------------------| -| `put_timeout` | `duration` | `5s` | Timeout for performing the `PUT` operation. | -| `pool_size` | `int` | Equal to `object.put.remote_pool_size` | Maximum amount of concurrent replications. | +| Parameter | Type | Default value | Description | +|---------------|------------|---------------|---------------------------------------------| +| `put_timeout` | `duration` | `5s` | Timeout for performing the `PUT` operation. | +| `pool_size` | `int` | `10` | Maximum amount of concurrent replications. | # `object` section Contains object-service related parameters. -- 2.45.3 From d7e9e0f2b9a01152e69929fa1c711ff380e2ec60 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Thu, 6 Feb 2025 11:50:12 +0300 Subject: [PATCH 2/5] [#1639] services/object: Remove limiting pools for Put operation Signed-off-by: Aleksey Savchuk --- cmd/frostfs-node/config.go | 22 +----------- cmd/frostfs-node/config/object/config.go | 28 --------------- cmd/frostfs-node/config/object/config_test.go | 4 --- cmd/frostfs-node/object.go | 1 - config/example/node.env | 2 -- config/example/node.json | 2 -- config/example/node.yaml | 2 -- docs/storage-node-configuration.md | 4 --- internal/logs/logs.go | 1 - pkg/services/object/common/writer/common.go | 10 ++---- pkg/services/object/common/writer/ec.go | 36 ++----------------- pkg/services/object/common/writer/ec_test.go | 5 --- pkg/services/object/common/writer/writer.go | 16 --------- pkg/services/object/put/service.go | 3 -- pkg/services/object/util/log.go | 8 ----- 15 files changed, 7 insertions(+), 137 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index d575c7228..43ec40d1d 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -664,10 +664,6 @@ type cfgAccessPolicyEngine struct { } type cfgObjectRoutines struct { - putRemote *ants.Pool - - putLocal *ants.Pool - replication *ants.Pool } @@ -1166,16 +1162,6 @@ func initAccessPolicyEngine(ctx context.Context, c *cfg) { func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) { var err error - optNonBlocking := ants.WithNonblocking(true) - - putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote() - pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking) - fatalOnErr(err) - - putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal() - pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking) - fatalOnErr(err) - replicatorPoolSize := replicatorconfig.PoolSize(cfg) pool.replication, err = ants.NewPool(replicatorPoolSize) fatalOnErr(err) @@ -1410,13 +1396,7 @@ func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp { } func (c *cfg) reloadPools() error { - newSize := objectconfig.Put(c.appCfg).PoolSizeLocal() - c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size") - - newSize = objectconfig.Put(c.appCfg).PoolSizeRemote() - c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size") - - newSize = replicatorconfig.PoolSize(c.appCfg) + newSize := replicatorconfig.PoolSize(c.appCfg) c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size") return nil diff --git a/cmd/frostfs-node/config/object/config.go b/cmd/frostfs-node/config/object/config.go index 6ff1fe2ab..c8c967d30 100644 --- a/cmd/frostfs-node/config/object/config.go +++ b/cmd/frostfs-node/config/object/config.go @@ -21,10 +21,6 @@ const ( putSubsection = "put" getSubsection = "get" - - // PutPoolSizeDefault is a default value of routine pool size to - // process object.Put requests in object service. - PutPoolSizeDefault = 10 ) // Put returns structure that provides access to "put" subsection of @@ -35,30 +31,6 @@ func Put(c *config.Config) PutConfig { } } -// PoolSizeRemote returns the value of "remote_pool_size" config parameter. -// -// Returns PutPoolSizeDefault if the value is not a positive number. -func (g PutConfig) PoolSizeRemote() int { - v := config.Int(g.cfg, "remote_pool_size") - if v > 0 { - return int(v) - } - - return PutPoolSizeDefault -} - -// PoolSizeLocal returns the value of "local_pool_size" config parameter. -// -// Returns PutPoolSizeDefault if the value is not a positive number. -func (g PutConfig) PoolSizeLocal() int { - v := config.Int(g.cfg, "local_pool_size") - if v > 0 { - return int(v) - } - - return PutPoolSizeDefault -} - // SkipSessionTokenIssuerVerification returns the value of "skip_session_token_issuer_verification" config parameter or `falseā€œ if is not defined. func (g PutConfig) SkipSessionTokenIssuerVerification() bool { return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification") diff --git a/cmd/frostfs-node/config/object/config_test.go b/cmd/frostfs-node/config/object/config_test.go index e2bb105d9..1c525ef55 100644 --- a/cmd/frostfs-node/config/object/config_test.go +++ b/cmd/frostfs-node/config/object/config_test.go @@ -13,8 +13,6 @@ func TestObjectSection(t *testing.T) { t.Run("defaults", func(t *testing.T) { empty := configtest.EmptyConfig() - require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote()) - require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeLocal()) require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty)) require.False(t, objectconfig.Put(empty).SkipSessionTokenIssuerVerification()) }) @@ -22,8 +20,6 @@ func TestObjectSection(t *testing.T) { const path = "../../../../config/example/node" fileConfigTest := func(c *config.Config) { - require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote()) - require.Equal(t, 200, objectconfig.Put(c).PoolSizeLocal()) require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c)) require.True(t, objectconfig.Put(c).SkipSessionTokenIssuerVerification()) } diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 40d3cc1cd..ad6f4140a 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -326,7 +326,6 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche c, c.cfgNetmap.state, irFetcher, - objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), objectwriter.WithLogger(c.log), objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification), ) diff --git a/config/example/node.env b/config/example/node.env index 2ba432b1b..aa3c72a91 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -87,8 +87,6 @@ FROSTFS_REPLICATOR_POOL_SIZE=10 FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=500 # Object service section -FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100 -FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200 FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE" diff --git a/config/example/node.json b/config/example/node.json index cfde8bcc7..afa815bc3 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -134,8 +134,6 @@ "tombstone_lifetime": 10 }, "put": { - "remote_pool_size": 100, - "local_pool_size": 200, "skip_session_token_issuer_verification": true }, "get": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 1f8ec843d..f63cc514b 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -117,8 +117,6 @@ object: delete: tombstone_lifetime: 10 # tombstone "local" lifetime in epochs put: - remote_pool_size: 100 # number of async workers for remote PUT operations - local_pool_size: 200 # number of async workers for local PUT operations skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true get: priority: # list of metrics of nodes for prioritization diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index aef05d589..b2ab75b7e 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -406,8 +406,6 @@ Contains object-service related parameters. ```yaml object: - put: - remote_pool_size: 100 get: priority: - $attribute:ClusterName @@ -416,8 +414,6 @@ object: | Parameter | Type | Default value | Description | |-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------| | `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. | -| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. | -| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. | | `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET requests. | # `runtime` section diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6a72644e5..d48a4da9b 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -125,7 +125,6 @@ const ( SearchCouldNotWriteObjectIdentifiers = "could not write object identifiers" SearchLocalOperationFailed = "local operation failed" UtilObjectServiceError = "object service error" - UtilCouldNotPushTaskToWorkerPool = "could not push task to worker pool" V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring" V2CantCheckIfRequestFromContainerNode = "can't check if request from container node" ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch" diff --git a/pkg/services/object/common/writer/common.go b/pkg/services/object/common/writer/common.go index dae168baf..1998e9638 100644 --- a/pkg/services/object/common/writer/common.go +++ b/pkg/services/object/common/writer/common.go @@ -79,11 +79,11 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement. continue } - workerPool, isLocal := n.cfg.getWorkerPool(addr.PublicKey()) + isLocal := n.cfg.NetmapKeys.IsLocalKey(addr.PublicKey()) item := new(bool) wg.Add(1) - if err := workerPool.Submit(func() { + go func() { defer wg.Done() err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr}) @@ -95,11 +95,7 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement. traverser.SubmitSuccess() *item = true - }); err != nil { - wg.Done() - svcutil.LogWorkerPoolError(ctx, n.cfg.Logger, "PUT", err) - return true - } + }() // Mark the container node as processed in order to exclude it // in subsequent container broadcast. Note that we don't diff --git a/pkg/services/object/common/writer/ec.go b/pkg/services/object/common/writer/ec.go index 8f269ec21..26a53e315 100644 --- a/pkg/services/object/common/writer/ec.go +++ b/pkg/services/object/common/writer/ec.go @@ -149,17 +149,7 @@ func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) } - completed := make(chan interface{}) - if poolErr := e.Config.RemotePool.Submit(func() { - defer close(completed) - err = e.Relay(ctx, info, c) - }); poolErr != nil { - close(completed) - svcutil.LogWorkerPoolError(ctx, e.Config.Logger, "PUT", poolErr) - return poolErr - } - <-completed - + err = e.Relay(ctx, info, c) if err == nil { return nil } @@ -343,21 +333,11 @@ func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, n } func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { - var err error localTarget := LocalTarget{ Storage: e.Config.LocalStore, Container: e.Container, } - completed := make(chan interface{}) - if poolErr := e.Config.LocalPool.Submit(func() { - defer close(completed) - err = localTarget.WriteObject(ctx, obj, e.ObjectMeta) - }); poolErr != nil { - close(completed) - return poolErr - } - <-completed - return err + return localTarget.WriteObject(ctx, obj, e.ObjectMeta) } func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { @@ -371,15 +351,5 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n nodeInfo: clientNodeInfo, } - var err error - completed := make(chan interface{}) - if poolErr := e.Config.RemotePool.Submit(func() { - defer close(completed) - err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta) - }); poolErr != nil { - close(completed) - return poolErr - } - <-completed - return err + return remoteTaget.WriteObject(ctx, obj, e.ObjectMeta) } diff --git a/pkg/services/object/common/writer/ec_test.go b/pkg/services/object/common/writer/ec_test.go index b7764661f..2458e352f 100644 --- a/pkg/services/object/common/writer/ec_test.go +++ b/pkg/services/object/common/writer/ec_test.go @@ -31,7 +31,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "git.frostfs.info/TrueCloudLab/tzhash/tz" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" ) @@ -131,9 +130,6 @@ func TestECWriter(t *testing.T) { nodeKey, err := keys.NewPrivateKey() require.NoError(t, err) - pool, err := ants.NewPool(4, ants.WithNonblocking(true)) - require.NoError(t, err) - log, err := logger.NewLogger(nil) require.NoError(t, err) @@ -141,7 +137,6 @@ func TestECWriter(t *testing.T) { ecw := ECWriter{ Config: &Config{ NetmapKeys: n, - RemotePool: pool, Logger: log, ClientConstructor: clientConstructor{vectors: ns}, KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil), diff --git a/pkg/services/object/common/writer/writer.go b/pkg/services/object/common/writer/writer.go index adaf1945b..d3d2b41b4 100644 --- a/pkg/services/object/common/writer/writer.go +++ b/pkg/services/object/common/writer/writer.go @@ -12,7 +12,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -52,8 +51,6 @@ type Config struct { NetmapSource netmap.Source - RemotePool, LocalPool util.WorkerPool - NetmapKeys netmap.AnnouncedKeys FormatValidator *object.FormatValidator @@ -69,12 +66,6 @@ type Config struct { type Option func(*Config) -func WithWorkerPools(remote, local util.WorkerPool) Option { - return func(c *Config) { - c.RemotePool, c.LocalPool = remote, local - } -} - func WithLogger(l *logger.Logger) Option { return func(c *Config) { c.Logger = l @@ -87,13 +78,6 @@ func WithVerifySessionTokenIssuer(v bool) Option { } } -func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) { - if c.NetmapKeys.IsLocalKey(pub) { - return c.LocalPool, true - } - return c.RemotePool, false -} - type Params struct { Config *Config diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 5cc0a5722..099486b3f 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -6,7 +6,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -27,8 +26,6 @@ func NewService(ks *objutil.KeyStorage, opts ...objectwriter.Option, ) *Service { c := &objectwriter.Config{ - RemotePool: util.NewPseudoWorkerPool(), - LocalPool: util.NewPseudoWorkerPool(), Logger: logger.NewLoggerWrapper(zap.L()), KeyStorage: ks, ClientConstructor: cc, diff --git a/pkg/services/object/util/log.go b/pkg/services/object/util/log.go index 2c1e053ac..b10826226 100644 --- a/pkg/services/object/util/log.go +++ b/pkg/services/object/util/log.go @@ -17,11 +17,3 @@ func LogServiceError(ctx context.Context, l *logger.Logger, req string, node net zap.Error(err), ) } - -// LogWorkerPoolError writes debug error message of object worker pool to provided logger. -func LogWorkerPoolError(ctx context.Context, l *logger.Logger, req string, err error) { - l.Error(ctx, logs.UtilCouldNotPushTaskToWorkerPool, - zap.String("request", req), - zap.Error(err), - ) -} -- 2.45.3 From fe24be8c0b17346b702f0f803e228fa9cfa918b2 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Thu, 20 Feb 2025 11:18:02 +0300 Subject: [PATCH 3/5] [#1639] go.mod: Update sdk-go and qos Signed-off-by: Aleksey Savchuk --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 23ddad276..2bfc3abfe 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 - git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe + git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 diff --git a/go.sum b/go.sum index ecd2ab525..4a7dfd4dc 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g= -git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe h1:81gDNdWNLP24oMQukRiCE9R1wGSh0l0dRq3F1W+Oesc= -git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U= +git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf h1:ik2aMBpTJJpoZe2ffcGShXRkrvny65NEPLVt67KmH/A= +git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U= git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 h1:dOZHuOywvH1ms8U38lDCWpysgkCCeJ02RLI7zDhPcyw= git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= -- 2.45.3 From ffb1ed52976c834a6a222a6dc38e4b524124742a Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Fri, 7 Feb 2025 17:23:10 +0300 Subject: [PATCH 4/5] [#1639] qos: Add interceptors for limiting active RPCs Signed-off-by: Aleksey Savchuk --- internal/qos/grpc.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/internal/qos/grpc.go b/internal/qos/grpc.go index c253f1e9d..534a1f74b 100644 --- a/internal/qos/grpc.go +++ b/internal/qos/grpc.go @@ -3,7 +3,9 @@ package qos import ( "context" + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting" "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "google.golang.org/grpc" ) @@ -49,3 +51,36 @@ func NewAdjustOutgoingIOTagStreamClientInterceptor() grpc.StreamClientIntercepto return streamer(ctx, desc, cc, method, opts...) } } + +func NewMaxActiveRPCLimiterUnaryServerInterceptor(getLimiter func() limiting.Limiter) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == IOTagCritical.String() { + return handler(ctx, req) + } + + release, ok := getLimiter().Acquire(info.FullMethod) + if !ok { + return nil, new(apistatus.ResourceExhausted) + } + defer release() + + return handler(ctx, req) + } +} + +//nolint:contextcheck (grpc.ServerStream manages the context itself) +func NewMaxActiveRPCLimiterStreamServerInterceptor(getLimiter func() limiting.Limiter) grpc.StreamServerInterceptor { + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if tag, ok := tagging.IOTagFromContext(ss.Context()); ok && tag == IOTagCritical.String() { + return handler(srv, ss) + } + + release, ok := getLimiter().Acquire(info.FullMethod) + if !ok { + return new(apistatus.ResourceExhausted) + } + defer release() + + return handler(srv, ss) + } +} -- 2.45.3 From 611b96f54b03047bc7ef5f839e652143d5186b78 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Fri, 7 Feb 2025 15:17:37 +0300 Subject: [PATCH 5/5] [#1639] node: Support active RPC limiting - Allow configuration of active RPC limits for method groups - Apply RPC limiting for all services except the control service Signed-off-by: Aleksey Savchuk --- cmd/frostfs-node/config.go | 40 ++++++++++++-- cmd/frostfs-node/config/rpc/config.go | 43 +++++++++++++++ cmd/frostfs-node/config/rpc/config_test.go | 53 +++++++++++++++++++ cmd/frostfs-node/config/rpc/testdata/node.env | 3 ++ .../config/rpc/testdata/node.json | 18 +++++++ .../config/rpc/testdata/node.yaml | 8 +++ cmd/frostfs-node/grpc.go | 4 ++ config/example/node.env | 5 ++ config/example/node.json | 17 ++++++ config/example/node.yaml | 10 ++++ docs/storage-node-configuration.md | 21 ++++++++ 11 files changed, 217 insertions(+), 5 deletions(-) create mode 100644 cmd/frostfs-node/config/rpc/config.go create mode 100644 cmd/frostfs-node/config/rpc/config_test.go create mode 100644 cmd/frostfs-node/config/rpc/testdata/node.env create mode 100644 cmd/frostfs-node/config/rpc/testdata/node.json create mode 100644 cmd/frostfs-node/config/rpc/testdata/node.yaml diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 43ec40d1d..a7aeedc21 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -29,6 +29,7 @@ import ( nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object" replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator" + rpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/rpc" tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" @@ -69,6 +70,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state" "git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting" netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -528,6 +530,8 @@ type cfgGRPC struct { maxChunkSize uint64 maxAddrAmount uint64 reconnectTimeout time.Duration + + limiter atomic.Pointer[limiting.SemaphoreLimiter] } func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) { @@ -717,7 +721,7 @@ func initCfg(appCfg *config.Config) *cfg { c.cfgNetmap = initNetmap(appCfg, netState, relayOnly) - c.cfgGRPC = initCfgGRPC() + c.cfgGRPC = initCfgGRPC(appCfg) c.cfgMorph = cfgMorph{ proxyScriptHash: contractsconfig.Proxy(appCfg), @@ -848,14 +852,23 @@ func initFrostfsID(appCfg *config.Config) cfgFrostfsID { } } -func initCfgGRPC() cfgGRPC { +func initCfgGRPC(appCfg *config.Config) (cfg cfgGRPC) { maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes - return cfgGRPC{ - maxChunkSize: maxChunkSize, - maxAddrAmount: maxAddrAmount, + var limits []limiting.KeyLimit + for _, l := range rpcconfig.Limits(appCfg) { + limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps}) } + + limiter, err := limiting.NewSemaphoreLimiter(limits) + fatalOnErr(err) + + cfg.maxChunkSize = maxChunkSize + cfg.maxAddrAmount = maxAddrAmount + cfg.limiter.Store(limiter) + + return } func initCfgObject(appCfg *config.Config) cfgObject { @@ -1392,9 +1405,26 @@ func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp { components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }}) } + components = append(components, dCmp{"rpc_limiter", c.reloadLimits}) + return components } +func (c *cfg) reloadLimits() error { + var limits []limiting.KeyLimit + for _, l := range rpcconfig.Limits(c.appCfg) { + limits = append(limits, limiting.KeyLimit{Keys: l.Methods, Limit: l.MaxOps}) + } + + limiter, err := limiting.NewSemaphoreLimiter(limits) + if err != nil { + return err + } + + c.cfgGRPC.limiter.Store(limiter) + return nil +} + func (c *cfg) reloadPools() error { newSize := replicatorconfig.PoolSize(c.appCfg) c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size") diff --git a/cmd/frostfs-node/config/rpc/config.go b/cmd/frostfs-node/config/rpc/config.go new file mode 100644 index 000000000..197990d07 --- /dev/null +++ b/cmd/frostfs-node/config/rpc/config.go @@ -0,0 +1,43 @@ +package rpcconfig + +import ( + "strconv" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" +) + +const ( + subsection = "rpc" + limitsSubsection = "limits" +) + +type LimitConfig struct { + Methods []string + MaxOps int64 +} + +// Limits returns the "limits" config from "rpc" section. +func Limits(c *config.Config) []LimitConfig { + c = c.Sub(subsection).Sub(limitsSubsection) + + var limits []LimitConfig + + for i := uint64(0); ; i++ { + si := strconv.FormatUint(i, 10) + sc := c.Sub(si) + + methods := config.StringSliceSafe(sc, "methods") + if len(methods) == 0 { + break + } + + maxOps := config.IntSafe(sc, "max_ops") + if maxOps == 0 { + panic("no max operations for method group") + } + + limits = append(limits, LimitConfig{methods, maxOps}) + } + + return limits +} diff --git a/cmd/frostfs-node/config/rpc/config_test.go b/cmd/frostfs-node/config/rpc/config_test.go new file mode 100644 index 000000000..31a837cee --- /dev/null +++ b/cmd/frostfs-node/config/rpc/config_test.go @@ -0,0 +1,53 @@ +package rpcconfig + +import ( + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test" + "github.com/stretchr/testify/require" +) + +func TestRPCSection(t *testing.T) { + t.Run("defaults", func(t *testing.T) { + require.Empty(t, Limits(configtest.EmptyConfig())) + }) + + t.Run("correct config", func(t *testing.T) { + const path = "../../../../config/example/node" + + fileConfigTest := func(c *config.Config) { + limits := Limits(c) + require.Len(t, limits, 2) + + limit0 := limits[0] + limit1 := limits[1] + + require.ElementsMatch(t, limit0.Methods, []string{"/neo.fs.v2.object.ObjectService/PutSingle", "/neo.fs.v2.object.ObjectService/Put"}) + require.Equal(t, limit0.MaxOps, int64(1000)) + + require.ElementsMatch(t, limit1.Methods, []string{"/neo.fs.v2.object.ObjectService/Get"}) + require.Equal(t, limit1.MaxOps, int64(10000)) + } + + configtest.ForEachFileType(path, fileConfigTest) + + t.Run("ENV", func(t *testing.T) { + configtest.ForEnvFileType(t, path, fileConfigTest) + }) + }) + + t.Run("no max operations", func(t *testing.T) { + const path = "testdata/node" + + fileConfigTest := func(c *config.Config) { + require.Panics(t, func() { _ = Limits(c) }) + } + + configtest.ForEachFileType(path, fileConfigTest) + + t.Run("ENV", func(t *testing.T) { + configtest.ForEnvFileType(t, path, fileConfigTest) + }) + }) +} diff --git a/cmd/frostfs-node/config/rpc/testdata/node.env b/cmd/frostfs-node/config/rpc/testdata/node.env new file mode 100644 index 000000000..2fed4c5bc --- /dev/null +++ b/cmd/frostfs-node/config/rpc/testdata/node.env @@ -0,0 +1,3 @@ +FROSTFS_RPC_LIMITS_0_METHODS="/neo.fs.v2.object.ObjectService/PutSingle /neo.fs.v2.object.ObjectService/Put" +FROSTFS_RPC_LIMITS_1_METHODS="/neo.fs.v2.object.ObjectService/Get" +FROSTFS_RPC_LIMITS_1_MAX_OPS=10000 diff --git a/cmd/frostfs-node/config/rpc/testdata/node.json b/cmd/frostfs-node/config/rpc/testdata/node.json new file mode 100644 index 000000000..6156aa71d --- /dev/null +++ b/cmd/frostfs-node/config/rpc/testdata/node.json @@ -0,0 +1,18 @@ +{ + "rpc": { + "limits": [ + { + "methods": [ + "/neo.fs.v2.object.ObjectService/PutSingle", + "/neo.fs.v2.object.ObjectService/Put" + ] + }, + { + "methods": [ + "/neo.fs.v2.object.ObjectService/Get" + ], + "max_ops": 10000 + } + ] + } +} diff --git a/cmd/frostfs-node/config/rpc/testdata/node.yaml b/cmd/frostfs-node/config/rpc/testdata/node.yaml new file mode 100644 index 000000000..e50b7ae93 --- /dev/null +++ b/cmd/frostfs-node/config/rpc/testdata/node.yaml @@ -0,0 +1,8 @@ +rpc: + limits: + - methods: + - /neo.fs.v2.object.ObjectService/PutSingle + - /neo.fs.v2.object.ObjectService/Put + - methods: + - /neo.fs.v2.object.ObjectService/Get + max_ops: 10000 diff --git a/cmd/frostfs-node/grpc.go b/cmd/frostfs-node/grpc.go index 4d679e4cc..e1a273ce4 100644 --- a/cmd/frostfs-node/grpc.go +++ b/cmd/frostfs-node/grpc.go @@ -9,9 +9,11 @@ import ( grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + qosInternal "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc" tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting" qos "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" "go.uber.org/zap" "google.golang.org/grpc" @@ -134,11 +136,13 @@ func getGrpcServerOpts(ctx context.Context, c *cfg, sc *grpcconfig.Config) ([]gr qos.NewUnaryServerInterceptor(), metrics.NewUnaryServerInterceptor(), tracing.NewUnaryServerInterceptor(), + qosInternal.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }), ), grpc.ChainStreamInterceptor( qos.NewStreamServerInterceptor(), metrics.NewStreamServerInterceptor(), tracing.NewStreamServerInterceptor(), + qosInternal.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return c.cfgGRPC.limiter.Load() }), ), } diff --git a/config/example/node.env b/config/example/node.env index aa3c72a91..2ebef181a 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -91,6 +91,11 @@ FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE" +FROSTFS_RPC_LIMITS_0_METHODS="/neo.fs.v2.object.ObjectService/PutSingle /neo.fs.v2.object.ObjectService/Put" +FROSTFS_RPC_LIMITS_0_MAX_OPS=1000 +FROSTFS_RPC_LIMITS_1_METHODS="/neo.fs.v2.object.ObjectService/Get" +FROSTFS_RPC_LIMITS_1_MAX_OPS=10000 + # Storage engine section FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 diff --git a/config/example/node.json b/config/example/node.json index afa815bc3..0ed72effc 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -140,6 +140,23 @@ "priority": ["$attribute:ClusterName", "$attribute:UN-LOCODE"] } }, + "rpc": { + "limits": [ + { + "methods": [ + "/neo.fs.v2.object.ObjectService/PutSingle", + "/neo.fs.v2.object.ObjectService/Put" + ], + "max_ops": 1000 + }, + { + "methods": [ + "/neo.fs.v2.object.ObjectService/Get" + ], + "max_ops": 10000 + } + ] + }, "storage": { "shard_pool_size": 15, "shard_ro_error_threshold": 100, diff --git a/config/example/node.yaml b/config/example/node.yaml index f63cc514b..6b810653e 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -123,6 +123,16 @@ object: - $attribute:ClusterName - $attribute:UN-LOCODE +rpc: + limits: + - methods: + - /neo.fs.v2.object.ObjectService/PutSingle + - /neo.fs.v2.object.ObjectService/Put + max_ops: 1000 + - methods: + - /neo.fs.v2.object.ObjectService/Get + max_ops: 10000 + storage: # note: shard configuration can be omitted for relay node (see `node.relay`) shard_pool_size: 15 # size of per-shard worker pools used for PUT operations diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index b2ab75b7e..1eb5437ba 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -416,6 +416,27 @@ object: | `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. | | `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET requests. | + +# `rpc` section +Contains limits on the number of active RPC for specified method(s). + +```yaml +rpc: + limits: + - methods: + - /neo.fs.v2.object.ObjectService/PutSingle + - /neo.fs.v2.object.ObjectService/Put + max_ops: 1000 + - methods: + - /neo.fs.v2.object.ObjectService/Get + max_ops: 10000 +``` + +| Parameter | Type | Default value | Description | +|------------------|------------|---------------|--------------------------------------------------------------| +| `limits.max_ops` | `int` | | Maximum number of active RPC allowed for the given method(s) | +| `limits.methods` | `[]string` | | List of RPC methods sharing the given limit | + # `runtime` section Contains runtime parameters. -- 2.45.3