From 2005fdda0982f7aa23d5938dfe7a40dd2a72fdb4 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 10 Mar 2025 13:04:39 +0300 Subject: [PATCH] [#1667] shard: Drop shard pool After adding an ops limiter, shard's `put` pool is redundant. Signed-off-by: Dmitrii Stepanov --- .../internal/modules/storagecfg/config.go | 2 - cmd/frostfs-node/config.go | 3 - cmd/frostfs-node/config/configdir_test.go | 7 +- cmd/frostfs-node/config/engine/config.go | 16 --- cmd/frostfs-node/config/engine/config_test.go | 2 - config/example/node.env | 1 - config/example/node.json | 1 - config/example/node.yaml | 1 - docs/storage-node-configuration.md | 1 - pkg/local_object_storage/engine/control.go | 10 +- .../engine/control_test.go | 4 - pkg/local_object_storage/engine/engine.go | 18 +-- .../engine/engine_test.go | 1 - pkg/local_object_storage/engine/error_test.go | 1 - pkg/local_object_storage/engine/evacuate.go | 43 +++---- .../engine/evacuate_test.go | 1 - .../engine/inhume_test.go | 2 +- pkg/local_object_storage/engine/put.go | 106 ++++++++---------- pkg/local_object_storage/engine/shards.go | 20 ---- .../engine/shards_test.go | 2 - 20 files changed, 71 insertions(+), 171 deletions(-) diff --git a/cmd/frostfs-adm/internal/modules/storagecfg/config.go b/cmd/frostfs-adm/internal/modules/storagecfg/config.go index 77183fb497..67e3414c25 100644 --- a/cmd/frostfs-adm/internal/modules/storagecfg/config.go +++ b/cmd/frostfs-adm/internal/modules/storagecfg/config.go @@ -40,8 +40,6 @@ morph: - address: wss://{{.}}/ws{{end}} {{if not .Relay }} storage: - shard_pool_size: 15 # size of per-shard worker pools used for PUT operations - shard: default: # section with the default shard parameters metabase: diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 92aa827f24..e2fe231358 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -117,7 +117,6 @@ type applicationConfiguration struct { EngineCfg struct { errorThreshold uint32 - shardPoolSize uint32 shards []shardCfg lowMem bool } @@ -250,7 +249,6 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { // Storage Engine a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) - a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) @@ -893,7 +891,6 @@ func (c *cfg) engineOpts() []engine.Option { var opts []engine.Option opts = append(opts, - engine.WithShardPoolSize(c.EngineCfg.shardPoolSize), engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithLogger(c.log), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), diff --git a/cmd/frostfs-node/config/configdir_test.go b/cmd/frostfs-node/config/configdir_test.go index 35dae97d9e..ee9d4268be 100644 --- a/cmd/frostfs-node/config/configdir_test.go +++ b/cmd/frostfs-node/config/configdir_test.go @@ -12,13 +12,10 @@ import ( func TestConfigDir(t *testing.T) { dir := t.TempDir() - cfgFileName0 := path.Join(dir, "cfg_00.json") - cfgFileName1 := path.Join(dir, "cfg_01.yml") + cfgFileName := path.Join(dir, "cfg_01.yml") - require.NoError(t, os.WriteFile(cfgFileName0, []byte(`{"storage":{"shard_pool_size":15}}`), 0o777)) - require.NoError(t, os.WriteFile(cfgFileName1, []byte("logger:\n level: debug"), 0o777)) + require.NoError(t, os.WriteFile(cfgFileName, []byte("logger:\n level: debug"), 0o777)) c := New("", dir, "") require.Equal(t, "debug", cast.ToString(c.Sub("logger").Value("level"))) - require.EqualValues(t, 15, cast.ToUint32(c.Sub("storage").Value("shard_pool_size"))) } diff --git a/cmd/frostfs-node/config/engine/config.go b/cmd/frostfs-node/config/engine/config.go index e5735e88be..7994e78095 100644 --- a/cmd/frostfs-node/config/engine/config.go +++ b/cmd/frostfs-node/config/engine/config.go @@ -11,10 +11,6 @@ import ( const ( subsection = "storage" - - // ShardPoolSizeDefault is a default value of routine pool size per-shard to - // process object PUT operations in a storage engine. - ShardPoolSizeDefault = 20 ) // ErrNoShardConfigured is returned when at least 1 shard is required but none are found. @@ -65,18 +61,6 @@ func IterateShards(c *config.Config, required bool, f func(*shardconfig.Config) return nil } -// ShardPoolSize returns the value of "shard_pool_size" config parameter from "storage" section. -// -// Returns ShardPoolSizeDefault if the value is not a positive number. -func ShardPoolSize(c *config.Config) uint32 { - v := config.Uint32Safe(c.Sub(subsection), "shard_pool_size") - if v > 0 { - return v - } - - return ShardPoolSizeDefault -} - // ShardErrorThreshold returns the value of "shard_ro_error_threshold" config parameter from "storage" section. // // Returns 0 if the the value is missing. diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index b912b5d7df..eaf2a294e1 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -54,7 +54,6 @@ func TestEngineSection(t *testing.T) { require.False(t, handlerCalled) require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty)) - require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty)) require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode()) }) @@ -64,7 +63,6 @@ func TestEngineSection(t *testing.T) { num := 0 require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c)) - require.EqualValues(t, 15, engineconfig.ShardPoolSize(c)) err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error { defer func() { diff --git a/config/example/node.env b/config/example/node.env index 9bd6453447..010b6840cb 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -97,7 +97,6 @@ FROSTFS_RPC_LIMITS_1_METHODS="/neo.fs.v2.object.ObjectService/Get" FROSTFS_RPC_LIMITS_1_MAX_OPS=10000 # Storage engine section -FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 ## 0 shard ### Flag to refill Metabase from BlobStor diff --git a/config/example/node.json b/config/example/node.json index 6b799b3189..b26c35d2cb 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -158,7 +158,6 @@ ] }, "storage": { - "shard_pool_size": 15, "shard_ro_error_threshold": 100, "shard": { "0": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 2552a419ca..58b687d5cc 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -135,7 +135,6 @@ rpc: storage: # note: shard configuration can be omitted for relay node (see `node.relay`) - shard_pool_size: 15 # size of per-shard worker pools used for PUT operations shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) shard: diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 271cc6532c..51f0a9669f 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -170,7 +170,6 @@ Local storage engine configuration. | Parameter | Type | Default value | Description | |----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------| -| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | | `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | | `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. | | `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 6a416cfd94..7caa515d4b 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -153,16 +153,10 @@ func (e *StorageEngine) Close(ctx context.Context) error { } // closes all shards. Never returns an error, shard errors are logged. -func (e *StorageEngine) close(ctx context.Context, releasePools bool) error { +func (e *StorageEngine) close(ctx context.Context) error { e.mtx.RLock() defer e.mtx.RUnlock() - if releasePools { - for _, p := range e.shardPools { - p.Release() - } - } - for id, sh := range e.shards { if err := sh.Close(ctx); err != nil { e.log.Debug(ctx, logs.EngineCouldNotCloseShard, @@ -213,7 +207,7 @@ func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error { return e.open(ctx) } } else if prevErr == nil { // ok -> block - return e.close(ctx, errors.Is(err, errClosed)) + return e.close(ctx) } // otherwise do nothing diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index c9efc312ca..a0e658aebb 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -245,7 +245,6 @@ func TestReload(t *testing.T) { // no new paths => no new shards require.Equal(t, shardNum, len(e.shards)) - require.Equal(t, shardNum, len(e.shardPools)) newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum)) @@ -257,7 +256,6 @@ func TestReload(t *testing.T) { require.NoError(t, e.Reload(context.Background(), rcfg)) require.Equal(t, shardNum+1, len(e.shards)) - require.Equal(t, shardNum+1, len(e.shardPools)) require.NoError(t, e.Close(context.Background())) }) @@ -277,7 +275,6 @@ func TestReload(t *testing.T) { // removed one require.Equal(t, shardNum-1, len(e.shards)) - require.Equal(t, shardNum-1, len(e.shardPools)) require.NoError(t, e.Close(context.Background())) }) @@ -311,7 +308,6 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str } require.Equal(t, num, len(e.shards)) - require.Equal(t, num, len(e.shardPools)) return e, currShards } diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index e13252b824..a915c9bd6c 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -12,7 +12,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" @@ -29,8 +28,6 @@ type StorageEngine struct { shards map[string]hashedShard - shardPools map[string]util.WorkerPool - closeCh chan struct{} setModeCh chan setModeRequest wg sync.WaitGroup @@ -193,8 +190,6 @@ type cfg struct { metrics MetricRegister - shardPoolSize uint32 - lowMem bool containerSource atomic.Pointer[containerSource] @@ -202,9 +197,8 @@ type cfg struct { func defaultCfg() *cfg { res := &cfg{ - log: logger.NewLoggerWrapper(zap.L()), - shardPoolSize: 20, - metrics: noopMetrics{}, + log: logger.NewLoggerWrapper(zap.L()), + metrics: noopMetrics{}, } res.containerSource.Store(&containerSource{}) return res @@ -221,7 +215,6 @@ func New(opts ...Option) *StorageEngine { return &StorageEngine{ cfg: c, shards: make(map[string]hashedShard), - shardPools: make(map[string]util.WorkerPool), closeCh: make(chan struct{}), setModeCh: make(chan setModeRequest), evacuateLimiter: &evacuationLimiter{}, @@ -241,13 +234,6 @@ func WithMetrics(v MetricRegister) Option { } } -// WithShardPoolSize returns option to specify size of worker pool for each shard. -func WithShardPoolSize(sz uint32) Option { - return func(c *cfg) { - c.shardPoolSize = sz - } -} - // WithErrorThreshold returns an option to specify size amount of errors after which // shard is moved to read-only mode. func WithErrorThreshold(sz uint32) Option { diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 3f9196128c..6ef3846ee6 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -57,7 +57,6 @@ func (te *testEngineWrapper) setShardsNumOpts( te.shardIDs[i] = shard.ID() } require.Len(t, te.engine.shards, num) - require.Len(t, te.engine.shardPools, num) return te } diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index d68a7e826b..57029dd5fc 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -46,7 +46,6 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) var testShards [2]*testShard te := testNewEngine(t, - WithShardPoolSize(1), WithErrorThreshold(errThreshold), ). setShardsNumOpts(t, 2, func(id int) []shard.Option { diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 27eaea7683..c08dfbf03d 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -15,7 +15,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" @@ -201,11 +200,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes { return res } -type pooledShard struct { - hashedShard - pool util.WorkerPool -} - var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from one shard to the others. @@ -252,7 +246,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) erro } var mtx sync.RWMutex - copyShards := func() []pooledShard { + copyShards := func() []hashedShard { mtx.RLock() defer mtx.RUnlock() t := slices.Clone(shards) @@ -266,7 +260,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) erro } func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + shards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard, ) error { var err error ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", @@ -388,7 +382,7 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha } func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + shards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard, egContainer *errgroup.Group, egObject *errgroup.Group, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard", @@ -412,7 +406,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.Cancel } func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + shards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard, egContainer *errgroup.Group, egObject *errgroup.Group, ) error { sh := shardsToEvacuate[shardID] @@ -485,7 +479,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context } func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + getShards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard, ) error { sh := shardsToEvacuate[shardID] shards := getShards() @@ -515,7 +509,7 @@ func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, } func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, trees []pilorama.ContainerIDTreeID, - prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + prm EvacuateShardPrm, res *EvacuateShardRes, shards []hashedShard, shardsToEvacuate map[string]*shard.Shard, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateTrees", trace.WithAttributes( @@ -583,7 +577,7 @@ func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.S } func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, - prm EvacuateShardPrm, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + prm EvacuateShardPrm, shards []hashedShard, shardsToEvacuate map[string]*shard.Shard, ) (bool, string, error) { target, found, err := e.findShardToEvacuateTree(ctx, tree, shards, shardsToEvacuate) if err != nil { @@ -653,15 +647,15 @@ func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shar // findShardToEvacuateTree returns first shard according HRW or first shard with tree exists. func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilorama.ContainerIDTreeID, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, -) (pooledShard, bool, error) { + shards []hashedShard, shardsToEvacuate map[string]*shard.Shard, +) (hashedShard, bool, error) { hrw.SortHasherSliceByValue(shards, hrw.StringHash(tree.CID.EncodeToString())) - var result pooledShard + var result hashedShard var found bool for _, target := range shards { select { case <-ctx.Done(): - return pooledShard{}, false, ctx.Err() + return hashedShard{}, false, ctx.Err() default: } @@ -689,7 +683,7 @@ func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilora return result, found, nil } -func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, error) { +func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]hashedShard, error) { e.mtx.RLock() defer e.mtx.RUnlock() @@ -719,18 +713,15 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) // We must have all shards, to have correct information about their // indexes in a sorted slice and set appropriate marks in the metabase. // Evacuated shard is skipped during put. - shards := make([]pooledShard, 0, len(e.shards)) + shards := make([]hashedShard, 0, len(e.shards)) for id := range e.shards { - shards = append(shards, pooledShard{ - hashedShard: e.shards[id], - pool: e.shardPools[id], - }) + shards = append(shards, e.shards[id]) } return shards, nil } func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, - getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, cnr containerSDK.Container, + getShards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard, cnr containerSDK.Container, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects") defer span.End() @@ -800,7 +791,7 @@ func (e *StorageEngine) isNotRepOne(c *container.Container) bool { } func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, cnr containerSDK.Container, + shards []hashedShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, cnr containerSDK.Container, ) (bool, error) { hrw.SortHasherSliceByValue(shards, hrw.StringHash(addr.EncodeToString())) for j := range shards { @@ -813,7 +804,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok { continue } - switch e.putToShard(ctx, shards[j].hashedShard, shards[j].pool, addr, object, container.IsIndexedContainer(cnr)).status { + switch e.putToShard(ctx, shards[j], addr, object, container.IsIndexedContainer(cnr)).status { case putToShardSuccess: res.objEvacuated.Add(1) e.log.Debug(ctx, logs.EngineObjectIsMovedToAnotherShard, diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 45c4b696b8..ec79232973 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -196,7 +196,6 @@ func TestEvacuateShardObjects(t *testing.T) { e.mtx.Lock() delete(e.shards, evacuateShardID) - delete(e.shardPools, evacuateShardID) e.mtx.Unlock() checkHasObjects(t) diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index 8c5d28b154..10cebfb52b 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -205,7 +205,7 @@ func BenchmarkInhumeMultipart(b *testing.B) { func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) { b.StopTimer() - engine := testNewEngine(b, WithShardPoolSize(uint32(numObjects))). + engine := testNewEngine(b). setShardsNum(b, numShards).prepare(b).engine defer func() { require.NoError(b, engine.Close(context.Background())) }() diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 64288a5118..b348d13a23 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -9,7 +9,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -99,13 +98,13 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { var shRes putToShardRes e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { e.mtx.RLock() - pool, ok := e.shardPools[sh.ID().String()] + _, ok := e.shards[sh.ID().String()] e.mtx.RUnlock() if !ok { // Shard was concurrently removed, skip. return false } - shRes = e.putToShard(ctx, sh, pool, addr, prm.Object, prm.IsIndexedContainer) + shRes = e.putToShard(ctx, sh, addr, prm.Object, prm.IsIndexedContainer) return shRes.status != putToShardUnknown }) switch shRes.status { @@ -122,70 +121,59 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { // putToShard puts object to sh. // Return putToShardStatus and error if it is necessary to propagate an error upper. -func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool util.WorkerPool, +func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, addr oid.Address, obj *objectSDK.Object, isIndexedContainer bool, ) (res putToShardRes) { - exitCh := make(chan struct{}) + var existPrm shard.ExistsPrm + existPrm.Address = addr - if err := pool.Submit(func() { - defer close(exitCh) - - var existPrm shard.ExistsPrm - existPrm.Address = addr - - exists, err := sh.Exists(ctx, existPrm) - if err != nil { - if shard.IsErrObjectExpired(err) { - // object is already found but - // expired => do nothing with it - res.status = putToShardExists - } else { - e.log.Warn(ctx, logs.EngineCouldNotCheckObjectExistence, - zap.Stringer("shard_id", sh.ID()), - zap.Error(err)) - } - - return // this is not ErrAlreadyRemoved error so we can go to the next shard - } - - if exists.Exists() { + exists, err := sh.Exists(ctx, existPrm) + if err != nil { + if shard.IsErrObjectExpired(err) { + // object is already found but + // expired => do nothing with it res.status = putToShardExists - return + } else { + e.log.Warn(ctx, logs.EngineCouldNotCheckObjectExistence, + zap.Stringer("shard_id", sh.ID()), + zap.Error(err)) } - var putPrm shard.PutPrm - putPrm.SetObject(obj) - putPrm.SetIndexAttributes(isIndexedContainer) - - _, err = sh.Put(ctx, putPrm) - if err != nil { - if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) || - errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) { - e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard, - zap.Stringer("shard_id", sh.ID()), - zap.Error(err)) - return - } - if client.IsErrObjectAlreadyRemoved(err) { - e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard, - zap.Stringer("shard_id", sh.ID()), - zap.Error(err)) - res.status = putToShardRemoved - res.err = err - return - } - - e.reportShardError(ctx, sh, "could not put object to shard", err, zap.Stringer("address", addr)) - return - } - - res.status = putToShardSuccess - }); err != nil { - e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard, zap.Error(err)) - close(exitCh) + return // this is not ErrAlreadyRemoved error so we can go to the next shard } - <-exitCh + if exists.Exists() { + res.status = putToShardExists + return + } + + var putPrm shard.PutPrm + putPrm.SetObject(obj) + putPrm.SetIndexAttributes(isIndexedContainer) + + _, err = sh.Put(ctx, putPrm) + if err != nil { + if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) || + errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) { + e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard, + zap.Stringer("shard_id", sh.ID()), + zap.Error(err)) + return + } + if client.IsErrObjectAlreadyRemoved(err) { + e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard, + zap.Stringer("shard_id", sh.ID()), + zap.Error(err)) + res.status = putToShardRemoved + res.err = err + return + } + + e.reportShardError(ctx, sh, "could not put object to shard", err, zap.Stringer("address", addr)) + return + } + + res.status = putToShardSuccess return } diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 28f0287bc0..a38c851511 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -17,7 +17,6 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/hrw" "github.com/google/uuid" - "github.com/panjf2000/ants/v2" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -181,11 +180,6 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error { e.mtx.Lock() defer e.mtx.Unlock() - pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) - if err != nil { - return fmt.Errorf("create pool: %w", err) - } - strID := sh.ID().String() if _, ok := e.shards[strID]; ok { return fmt.Errorf("shard with id %s was already added", strID) @@ -199,8 +193,6 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error { hash: hrw.StringHash(strID), } - e.shardPools[strID] = pool - return nil } @@ -225,12 +217,6 @@ func (e *StorageEngine) removeShards(ctx context.Context, ids ...string) { ss = append(ss, sh) delete(e.shards, id) - pool, ok := e.shardPools[id] - if ok { - pool.Release() - delete(e.shardPools, id) - } - e.log.Info(ctx, logs.EngineShardHasBeenRemoved, zap.String("id", id)) } @@ -429,12 +415,6 @@ func (e *StorageEngine) deleteShards(ctx context.Context, ids []*shard.ID) ([]ha delete(e.shards, idStr) - pool, ok := e.shardPools[idStr] - if ok { - pool.Release() - delete(e.shardPools, idStr) - } - e.log.Info(ctx, logs.EngineShardHasBeenRemoved, zap.String("id", idStr)) } diff --git a/pkg/local_object_storage/engine/shards_test.go b/pkg/local_object_storage/engine/shards_test.go index 0bbc7563c3..3aa9629b05 100644 --- a/pkg/local_object_storage/engine/shards_test.go +++ b/pkg/local_object_storage/engine/shards_test.go @@ -17,7 +17,6 @@ func TestRemoveShard(t *testing.T) { e, ids := te.engine, te.shardIDs defer func() { require.NoError(t, e.Close(context.Background())) }() - require.Equal(t, numOfShards, len(e.shardPools)) require.Equal(t, numOfShards, len(e.shards)) removedNum := numOfShards / 2 @@ -37,7 +36,6 @@ func TestRemoveShard(t *testing.T) { } } - require.Equal(t, numOfShards-removedNum, len(e.shardPools)) require.Equal(t, numOfShards-removedNum, len(e.shards)) for id, removed := range mSh {