[#1639] services/object: Remove limiting pools for Put operation
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
a97bded440
commit
dce269c62e
15 changed files with 7 additions and 137 deletions
|
@ -664,10 +664,6 @@ type cfgAccessPolicyEngine struct {
|
|||
}
|
||||
|
||||
type cfgObjectRoutines struct {
|
||||
putRemote *ants.Pool
|
||||
|
||||
putLocal *ants.Pool
|
||||
|
||||
replication *ants.Pool
|
||||
}
|
||||
|
||||
|
@ -1166,16 +1162,6 @@ func initAccessPolicyEngine(ctx context.Context, c *cfg) {
|
|||
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
||||
var err error
|
||||
|
||||
optNonBlocking := ants.WithNonblocking(true)
|
||||
|
||||
putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote()
|
||||
pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking)
|
||||
fatalOnErr(err)
|
||||
|
||||
putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal()
|
||||
pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking)
|
||||
fatalOnErr(err)
|
||||
|
||||
replicatorPoolSize := replicatorconfig.PoolSize(cfg)
|
||||
pool.replication, err = ants.NewPool(replicatorPoolSize)
|
||||
fatalOnErr(err)
|
||||
|
@ -1410,13 +1396,7 @@ func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp {
|
|||
}
|
||||
|
||||
func (c *cfg) reloadPools() error {
|
||||
newSize := objectconfig.Put(c.appCfg).PoolSizeLocal()
|
||||
c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size")
|
||||
|
||||
newSize = objectconfig.Put(c.appCfg).PoolSizeRemote()
|
||||
c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size")
|
||||
|
||||
newSize = replicatorconfig.PoolSize(c.appCfg)
|
||||
newSize := replicatorconfig.PoolSize(c.appCfg)
|
||||
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
|
||||
|
||||
return nil
|
||||
|
|
|
@ -21,10 +21,6 @@ const (
|
|||
|
||||
putSubsection = "put"
|
||||
getSubsection = "get"
|
||||
|
||||
// PutPoolSizeDefault is a default value of routine pool size to
|
||||
// process object.Put requests in object service.
|
||||
PutPoolSizeDefault = 10
|
||||
)
|
||||
|
||||
// Put returns structure that provides access to "put" subsection of
|
||||
|
@ -35,30 +31,6 @@ func Put(c *config.Config) PutConfig {
|
|||
}
|
||||
}
|
||||
|
||||
// PoolSizeRemote returns the value of "remote_pool_size" config parameter.
|
||||
//
|
||||
// Returns PutPoolSizeDefault if the value is not a positive number.
|
||||
func (g PutConfig) PoolSizeRemote() int {
|
||||
v := config.Int(g.cfg, "remote_pool_size")
|
||||
if v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
|
||||
return PutPoolSizeDefault
|
||||
}
|
||||
|
||||
// PoolSizeLocal returns the value of "local_pool_size" config parameter.
|
||||
//
|
||||
// Returns PutPoolSizeDefault if the value is not a positive number.
|
||||
func (g PutConfig) PoolSizeLocal() int {
|
||||
v := config.Int(g.cfg, "local_pool_size")
|
||||
if v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
|
||||
return PutPoolSizeDefault
|
||||
}
|
||||
|
||||
// SkipSessionTokenIssuerVerification returns the value of "skip_session_token_issuer_verification" config parameter or `false“ if is not defined.
|
||||
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
|
||||
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
|
||||
|
|
|
@ -13,8 +13,6 @@ func TestObjectSection(t *testing.T) {
|
|||
t.Run("defaults", func(t *testing.T) {
|
||||
empty := configtest.EmptyConfig()
|
||||
|
||||
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote())
|
||||
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeLocal())
|
||||
require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty))
|
||||
require.False(t, objectconfig.Put(empty).SkipSessionTokenIssuerVerification())
|
||||
})
|
||||
|
@ -22,8 +20,6 @@ func TestObjectSection(t *testing.T) {
|
|||
const path = "../../../../config/example/node"
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote())
|
||||
require.Equal(t, 200, objectconfig.Put(c).PoolSizeLocal())
|
||||
require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c))
|
||||
require.True(t, objectconfig.Put(c).SkipSessionTokenIssuerVerification())
|
||||
}
|
||||
|
|
|
@ -326,7 +326,6 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
|
|||
c,
|
||||
c.cfgNetmap.state,
|
||||
irFetcher,
|
||||
objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
||||
objectwriter.WithLogger(c.log),
|
||||
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
|
||||
)
|
||||
|
|
|
@ -87,8 +87,6 @@ FROSTFS_REPLICATOR_POOL_SIZE=10
|
|||
FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=500
|
||||
|
||||
# Object service section
|
||||
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
||||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
|
||||
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||
FROSTFS_OBJECT_GET_PRIORITY="$attribute:ClusterName $attribute:UN-LOCODE"
|
||||
|
|
|
@ -134,8 +134,6 @@
|
|||
"tombstone_lifetime": 10
|
||||
},
|
||||
"put": {
|
||||
"remote_pool_size": 100,
|
||||
"local_pool_size": 200,
|
||||
"skip_session_token_issuer_verification": true
|
||||
},
|
||||
"get": {
|
||||
|
|
|
@ -117,8 +117,6 @@ object:
|
|||
delete:
|
||||
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
||||
put:
|
||||
remote_pool_size: 100 # number of async workers for remote PUT operations
|
||||
local_pool_size: 200 # number of async workers for local PUT operations
|
||||
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
|
||||
get:
|
||||
priority: # list of metrics of nodes for prioritization
|
||||
|
|
|
@ -406,8 +406,6 @@ Contains object-service related parameters.
|
|||
|
||||
```yaml
|
||||
object:
|
||||
put:
|
||||
remote_pool_size: 100
|
||||
get:
|
||||
priority:
|
||||
- $attribute:ClusterName
|
||||
|
@ -416,8 +414,6 @@ object:
|
|||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------|
|
||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET requests. |
|
||||
|
||||
# `runtime` section
|
||||
|
|
|
@ -125,7 +125,6 @@ const (
|
|||
SearchCouldNotWriteObjectIdentifiers = "could not write object identifiers"
|
||||
SearchLocalOperationFailed = "local operation failed"
|
||||
UtilObjectServiceError = "object service error"
|
||||
UtilCouldNotPushTaskToWorkerPool = "could not push task to worker pool"
|
||||
V2CantCheckIfRequestFromInnerRing = "can't check if request from inner ring"
|
||||
V2CantCheckIfRequestFromContainerNode = "can't check if request from container node"
|
||||
ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch = "could not restore block subscription after RPC switch"
|
||||
|
|
|
@ -79,11 +79,11 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
continue
|
||||
}
|
||||
|
||||
workerPool, isLocal := n.cfg.getWorkerPool(addr.PublicKey())
|
||||
isLocal := n.cfg.NetmapKeys.IsLocalKey(addr.PublicKey())
|
||||
|
||||
item := new(bool)
|
||||
wg.Add(1)
|
||||
if err := workerPool.Submit(func() {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
|
||||
|
@ -95,11 +95,7 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
|
||||
traverser.SubmitSuccess()
|
||||
*item = true
|
||||
}); err != nil {
|
||||
wg.Done()
|
||||
svcutil.LogWorkerPoolError(ctx, n.cfg.Logger, "PUT", err)
|
||||
return true
|
||||
}
|
||||
}()
|
||||
|
||||
// Mark the container node as processed in order to exclude it
|
||||
// in subsequent container broadcast. Note that we don't
|
||||
|
|
|
@ -149,17 +149,7 @@ func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
|
|||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||
}
|
||||
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = e.Relay(ctx, info, c)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
svcutil.LogWorkerPoolError(ctx, e.Config.Logger, "PUT", poolErr)
|
||||
return poolErr
|
||||
}
|
||||
<-completed
|
||||
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -343,21 +333,11 @@ func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, n
|
|||
}
|
||||
|
||||
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||
var err error
|
||||
localTarget := LocalTarget{
|
||||
Storage: e.Config.LocalStore,
|
||||
Container: e.Container,
|
||||
}
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.Config.LocalPool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = localTarget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
return poolErr
|
||||
}
|
||||
<-completed
|
||||
return err
|
||||
return localTarget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}
|
||||
|
||||
func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||
|
@ -371,15 +351,5 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
|
|||
nodeInfo: clientNodeInfo,
|
||||
}
|
||||
|
||||
var err error
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
return poolErr
|
||||
}
|
||||
<-completed
|
||||
return err
|
||||
return remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -131,9 +130,6 @@ func TestECWriter(t *testing.T) {
|
|||
nodeKey, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
||||
require.NoError(t, err)
|
||||
|
||||
log, err := logger.NewLogger(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -141,7 +137,6 @@ func TestECWriter(t *testing.T) {
|
|||
ecw := ECWriter{
|
||||
Config: &Config{
|
||||
NetmapKeys: n,
|
||||
RemotePool: pool,
|
||||
Logger: log,
|
||||
ClientConstructor: clientConstructor{vectors: ns},
|
||||
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -52,8 +51,6 @@ type Config struct {
|
|||
|
||||
NetmapSource netmap.Source
|
||||
|
||||
RemotePool, LocalPool util.WorkerPool
|
||||
|
||||
NetmapKeys netmap.AnnouncedKeys
|
||||
|
||||
FormatValidator *object.FormatValidator
|
||||
|
@ -69,12 +66,6 @@ type Config struct {
|
|||
|
||||
type Option func(*Config)
|
||||
|
||||
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
||||
return func(c *Config) {
|
||||
c.RemotePool, c.LocalPool = remote, local
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *Config) {
|
||||
c.Logger = l
|
||||
|
@ -87,13 +78,6 @@ func WithVerifySessionTokenIssuer(v bool) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
|
||||
if c.NetmapKeys.IsLocalKey(pub) {
|
||||
return c.LocalPool, true
|
||||
}
|
||||
return c.RemotePool, false
|
||||
}
|
||||
|
||||
type Params struct {
|
||||
Config *Config
|
||||
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -27,8 +26,6 @@ func NewService(ks *objutil.KeyStorage,
|
|||
opts ...objectwriter.Option,
|
||||
) *Service {
|
||||
c := &objectwriter.Config{
|
||||
RemotePool: util.NewPseudoWorkerPool(),
|
||||
LocalPool: util.NewPseudoWorkerPool(),
|
||||
Logger: logger.NewLoggerWrapper(zap.L()),
|
||||
KeyStorage: ks,
|
||||
ClientConstructor: cc,
|
||||
|
|
|
@ -17,11 +17,3 @@ func LogServiceError(ctx context.Context, l *logger.Logger, req string, node net
|
|||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
||||
// LogWorkerPoolError writes debug error message of object worker pool to provided logger.
|
||||
func LogWorkerPoolError(ctx context.Context, l *logger.Logger, req string, err error) {
|
||||
l.Error(ctx, logs.UtilCouldNotPushTaskToWorkerPool,
|
||||
zap.String("request", req),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue