From dce269c62e555a08eb6e768b8fb25d7307564991 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Thu, 6 Feb 2025 11:50:12 +0300 Subject: [PATCH] [#1639] services/object: Remove limiting pools for Put operation Signed-off-by: Aleksey Savchuk --- cmd/frostfs-node/config.go | 22 +----------- cmd/frostfs-node/config/object/config.go | 28 --------------- cmd/frostfs-node/config/object/config_test.go | 4 --- cmd/frostfs-node/object.go | 1 - config/example/node.env | 2 -- config/example/node.json | 2 -- config/example/node.yaml | 2 -- docs/storage-node-configuration.md | 4 --- internal/logs/logs.go | 1 - pkg/services/object/common/writer/common.go | 10 ++---- pkg/services/object/common/writer/ec.go | 36 ++----------------- pkg/services/object/common/writer/ec_test.go | 5 --- pkg/services/object/common/writer/writer.go | 16 --------- pkg/services/object/put/service.go | 3 -- pkg/services/object/util/log.go | 8 ----- 15 files changed, 7 insertions(+), 137 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index d575c7228..43ec40d1d 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -664,10 +664,6 @@ type cfgAccessPolicyEngine struct { } type cfgObjectRoutines struct { - putRemote *ants.Pool - - putLocal *ants.Pool - replication *ants.Pool } @@ -1166,16 +1162,6 @@ func initAccessPolicyEngine(ctx context.Context, c *cfg) { func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) { var err error - optNonBlocking := ants.WithNonblocking(true) - - putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote() - pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking) - fatalOnErr(err) - - putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal() - pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking) - fatalOnErr(err) - replicatorPoolSize := replicatorconfig.PoolSize(cfg) pool.replication, err = ants.NewPool(replicatorPoolSize) fatalOnErr(err) @@ -1410,13 +1396,7 @@ func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp { } func (c *cfg) reloadPools() error { - newSize := objectconfig.Put(c.appCfg).PoolSizeLocal() - c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size") - - newSize = objectconfig.Put(c.appCfg).PoolSizeRemote() - c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size") - - newSize = replicatorconfig.PoolSize(c.appCfg) + newSize := replicatorconfig.PoolSize(c.appCfg) c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size") return nil diff --git a/cmd/frostfs-node/config/object/config.go b/cmd/frostfs-node/config/object/config.go index 6ff1fe2ab..c8c967d30 100644 --- a/cmd/frostfs-node/config/object/config.go +++ b/cmd/frostfs-node/config/object/config.go @@ -21,10 +21,6 @@ const ( putSubsection = "put" getSubsection = "get" - - // PutPoolSizeDefault is a default value of routine pool size to - // process object.Put requests in object service. - PutPoolSizeDefault = 10 ) // Put returns structure that provides access to "put" subsection of @@ -35,30 +31,6 @@ func Put(c *config.Config) PutConfig { } } -// PoolSizeRemote returns the value of "remote_pool_size" config parameter. -// -// Returns PutPoolSizeDefault if the value is not a positive number. -func (g PutConfig) PoolSizeRemote() int { - v := config.Int(g.cfg, "remote_pool_size") - if v > 0 { - return int(v) - } - - return PutPoolSizeDefault -} - -// PoolSizeLocal returns the value of "local_pool_size" config parameter. -// -// Returns PutPoolSizeDefault if the value is not a positive number. -func (g PutConfig) PoolSizeLocal() int { - v := config.Int(g.cfg, "local_pool_size") - if v > 0 { - return int(v) - } - - return PutPoolSizeDefault -} - // SkipSessionTokenIssuerVerification returns the value of "skip_session_token_issuer_verification" config parameter or `falseā€œ if is not defined. func (g PutConfig) SkipSessionTokenIssuerVerification() bool { return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification") diff --git a/cmd/frostfs-node/config/object/config_test.go b/cmd/frostfs-node/config/object/config_test.go index e2bb105d9..1c525ef55 100644 --- a/cmd/frostfs-node/config/object/config_test.go +++ b/cmd/frostfs-node/config/object/config_test.go @@ -13,8 +13,6 @@ func TestObjectSection(t *testing.T) { t.Run("defaults", func(t *testing.T) { empty := configtest.EmptyConfig() - require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote()) - require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeLocal()) require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty)) require.False(t, objectconfig.Put(empty).SkipSessionTokenIssuerVerification()) }) @@ -22,8 +20,6 @@ func TestObjectSection(t *testing.T) { const path = "../../../../config/example/node" fileConfigTest := func(c *config.Config) { - require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote()) - require.Equal(t, 200, objectconfig.Put(c).PoolSizeLocal()) require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c)) require.True(t, objectconfig.Put(c).SkipSessionTokenIssuerVerification()) } diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 40d3cc1cd..ad6f4140a 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -326,7 +326,6 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche c, c.cfgNetmap.state, irFetcher, - objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), objectwriter.WithLogger(c.log), objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification), ) diff --git a/config/example/node.env b/config/example/node.env index 2ba432b1b..aa3c72a91 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -87,8 +87,6 @@ FROSTFS_REPLICATOR_POOL_SIZE=10 FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=500 # Object service section -FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100 -FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200 FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE" diff --git a/config/example/node.json b/config/example/node.json index cfde8bcc7..afa815bc3 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -134,8 +134,6 @@ "tombstone_lifetime": 10 }, "put": { - "remote_pool_size": 100, - "local_pool_size": 200, "skip_session_token_issuer_verification": true }, "get": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 1f8ec843d..f63cc514b 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -117,8 +117,6 @@ object: delete: tombstone_lifetime: 10 # tombstone "local" lifetime in epochs put: - remote_pool_size: 100 # number of async workers for remote PUT operations - local_pool_size: 200 # number of async workers for local PUT operations skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true get: priority: # list of metrics of nodes for prioritization diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index aef05d589..b2ab75b7e 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -406,8 +406,6 @@ Contains object-service related parameters. ```yaml object: - put: - remote_pool_size: 100 get: priority: - $attribute:ClusterName @@ -416,8 +414,6 @@ object: | Parameter | Type | Default value | Description | |-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------| | `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. | -| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. | -| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. | | `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET requests. | # `runtime` section diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6a72644e5..d48a4da9b 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -125,7 +125,6 @@ const ( SearchCouldNotWriteObjectIdentifiers = "could not write object identifiers" SearchLocalOperationFailed = "local operation failed" UtilObjectServiceError = "object service error" - UtilCouldNotPushTaskToWorkerPool = "could not push task to worker pool" V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring" V2CantCheckIfRequestFromContainerNode = "can't check if request from container node" ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch" diff --git a/pkg/services/object/common/writer/common.go b/pkg/services/object/common/writer/common.go index dae168baf..1998e9638 100644 --- a/pkg/services/object/common/writer/common.go +++ b/pkg/services/object/common/writer/common.go @@ -79,11 +79,11 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement. continue } - workerPool, isLocal := n.cfg.getWorkerPool(addr.PublicKey()) + isLocal := n.cfg.NetmapKeys.IsLocalKey(addr.PublicKey()) item := new(bool) wg.Add(1) - if err := workerPool.Submit(func() { + go func() { defer wg.Done() err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr}) @@ -95,11 +95,7 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement. traverser.SubmitSuccess() *item = true - }); err != nil { - wg.Done() - svcutil.LogWorkerPoolError(ctx, n.cfg.Logger, "PUT", err) - return true - } + }() // Mark the container node as processed in order to exclude it // in subsequent container broadcast. Note that we don't diff --git a/pkg/services/object/common/writer/ec.go b/pkg/services/object/common/writer/ec.go index 8f269ec21..26a53e315 100644 --- a/pkg/services/object/common/writer/ec.go +++ b/pkg/services/object/common/writer/ec.go @@ -149,17 +149,7 @@ func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) } - completed := make(chan interface{}) - if poolErr := e.Config.RemotePool.Submit(func() { - defer close(completed) - err = e.Relay(ctx, info, c) - }); poolErr != nil { - close(completed) - svcutil.LogWorkerPoolError(ctx, e.Config.Logger, "PUT", poolErr) - return poolErr - } - <-completed - + err = e.Relay(ctx, info, c) if err == nil { return nil } @@ -343,21 +333,11 @@ func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, n } func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { - var err error localTarget := LocalTarget{ Storage: e.Config.LocalStore, Container: e.Container, } - completed := make(chan interface{}) - if poolErr := e.Config.LocalPool.Submit(func() { - defer close(completed) - err = localTarget.WriteObject(ctx, obj, e.ObjectMeta) - }); poolErr != nil { - close(completed) - return poolErr - } - <-completed - return err + return localTarget.WriteObject(ctx, obj, e.ObjectMeta) } func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { @@ -371,15 +351,5 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n nodeInfo: clientNodeInfo, } - var err error - completed := make(chan interface{}) - if poolErr := e.Config.RemotePool.Submit(func() { - defer close(completed) - err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta) - }); poolErr != nil { - close(completed) - return poolErr - } - <-completed - return err + return remoteTaget.WriteObject(ctx, obj, e.ObjectMeta) } diff --git a/pkg/services/object/common/writer/ec_test.go b/pkg/services/object/common/writer/ec_test.go index b7764661f..2458e352f 100644 --- a/pkg/services/object/common/writer/ec_test.go +++ b/pkg/services/object/common/writer/ec_test.go @@ -31,7 +31,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "git.frostfs.info/TrueCloudLab/tzhash/tz" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" ) @@ -131,9 +130,6 @@ func TestECWriter(t *testing.T) { nodeKey, err := keys.NewPrivateKey() require.NoError(t, err) - pool, err := ants.NewPool(4, ants.WithNonblocking(true)) - require.NoError(t, err) - log, err := logger.NewLogger(nil) require.NoError(t, err) @@ -141,7 +137,6 @@ func TestECWriter(t *testing.T) { ecw := ECWriter{ Config: &Config{ NetmapKeys: n, - RemotePool: pool, Logger: log, ClientConstructor: clientConstructor{vectors: ns}, KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil), diff --git a/pkg/services/object/common/writer/writer.go b/pkg/services/object/common/writer/writer.go index adaf1945b..d3d2b41b4 100644 --- a/pkg/services/object/common/writer/writer.go +++ b/pkg/services/object/common/writer/writer.go @@ -12,7 +12,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -52,8 +51,6 @@ type Config struct { NetmapSource netmap.Source - RemotePool, LocalPool util.WorkerPool - NetmapKeys netmap.AnnouncedKeys FormatValidator *object.FormatValidator @@ -69,12 +66,6 @@ type Config struct { type Option func(*Config) -func WithWorkerPools(remote, local util.WorkerPool) Option { - return func(c *Config) { - c.RemotePool, c.LocalPool = remote, local - } -} - func WithLogger(l *logger.Logger) Option { return func(c *Config) { c.Logger = l @@ -87,13 +78,6 @@ func WithVerifySessionTokenIssuer(v bool) Option { } } -func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) { - if c.NetmapKeys.IsLocalKey(pub) { - return c.LocalPool, true - } - return c.RemotePool, false -} - type Params struct { Config *Config diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 5cc0a5722..099486b3f 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -6,7 +6,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -27,8 +26,6 @@ func NewService(ks *objutil.KeyStorage, opts ...objectwriter.Option, ) *Service { c := &objectwriter.Config{ - RemotePool: util.NewPseudoWorkerPool(), - LocalPool: util.NewPseudoWorkerPool(), Logger: logger.NewLoggerWrapper(zap.L()), KeyStorage: ks, ClientConstructor: cc, diff --git a/pkg/services/object/util/log.go b/pkg/services/object/util/log.go index 2c1e053ac..b10826226 100644 --- a/pkg/services/object/util/log.go +++ b/pkg/services/object/util/log.go @@ -17,11 +17,3 @@ func LogServiceError(ctx context.Context, l *logger.Logger, req string, node net zap.Error(err), ) } - -// LogWorkerPoolError writes debug error message of object worker pool to provided logger. -func LogWorkerPoolError(ctx context.Context, l *logger.Logger, req string, err error) { - l.Error(ctx, logs.UtilCouldNotPushTaskToWorkerPool, - zap.String("request", req), - zap.Error(err), - ) -}