Compare commits

...

2 commits

Author SHA1 Message Date
3fe0a84364
[#1450] node/config: Allow to configure engine's InhumePoolSize
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-11-19 23:10:34 +03:00
9706a03b89
[#1450] engine: Inhume objects in parallel with a worker pool
Add a worker pool for `Inhume` operation and use it to handle objects
in parallel. Since metabase `Inhume` uses `bbolt.Batch`, handling many
objects one by one may be inefficient.

```
goos: linux
goarch: amd64
pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine
cpu: 12th Gen Intel(R) Core(TM) i5-1235U
                                 │    old.txt    │              new1.txt               │
                                 │    sec/op     │   sec/op     vs base                │
InhumeMultipart/objects=1-12         11.42m ± 1%   11.42m ± 1%        ~ (p=0.739 n=10)
InhumeMultipart/objects=10-12       113.51m ± 0%   11.62m ± 1%  -89.76% (p=0.000 n=10)
InhumeMultipart/objects=100-12     1135.41m ± 1%   28.30m ± 1%  -97.51% (p=0.000 n=10)
InhumeMultipart/objects=1000-12    11357.8m ± 0%   259.8m ± 1%  -97.71% (p=0.000 n=10)
InhumeMultipart/objects=10000-12    113.251 ± 0%    2.277 ± 1%  -97.99% (p=0.000 n=10)
geomean                               1.136        74.03m       -93.48%
```

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-11-19 23:10:27 +03:00
7 changed files with 110 additions and 35 deletions

View file

@ -118,6 +118,7 @@ type applicationConfiguration struct {
shardPoolSize uint32 shardPoolSize uint32
shards []shardCfg shards []shardCfg
lowMem bool lowMem bool
inhumePoolSize uint32
} }
// if need to run node in compatibility with other versions mode // if need to run node in compatibility with other versions mode
@ -249,6 +250,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
a.EngineCfg.inhumePoolSize = engineconfig.InhumePoolSize(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
} }
@ -881,6 +883,7 @@ func (c *cfg) engineOpts() []engine.Option {
engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log), engine.WithLogger(c.log),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
engine.WithInhumePoolSize(c.EngineCfg.inhumePoolSize),
) )
if c.metricsCollector != nil { if c.metricsCollector != nil {

View file

@ -15,6 +15,11 @@ const (
// ShardPoolSizeDefault is a default value of routine pool size per-shard to // ShardPoolSizeDefault is a default value of routine pool size per-shard to
// process object PUT operations in a storage engine. // process object PUT operations in a storage engine.
ShardPoolSizeDefault = 20 ShardPoolSizeDefault = 20
// InhumePoolSizeDefault is the default size of the engine-level
// worker pool, which is used to process objects during the INHUME
// operation.
InhumePoolSizeDefault = 50
) )
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found. // ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
@ -88,3 +93,16 @@ func ShardErrorThreshold(c *config.Config) uint32 {
func EngineLowMemoryConsumption(c *config.Config) bool { func EngineLowMemoryConsumption(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "low_mem") return config.BoolSafe(c.Sub(subsection), "low_mem")
} }
// InhumePoolSize returns the value of "inhume_pool_size" config parameter from
// "storage" section.
//
// Returns InhumePoolSizeDefault if the value is not a positive number.
func InhumePoolSize(c *config.Config) uint32 {
v := config.Uint32Safe(c.Sub(subsection), "inhume_pool_size")
if v > 0 {
return v
}
return InhumePoolSizeDefault
}

View file

@ -39,6 +39,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty)) require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty)) require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode()) require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
require.EqualValues(t, engineconfig.InhumePoolSizeDefault, engineconfig.InhumePoolSize(empty))
}) })
const path = "../../../../config/example/node" const path = "../../../../config/example/node"
@ -48,6 +49,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c)) require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c)) require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
require.EqualValues(t, 50, engineconfig.InhumePoolSize(c))
err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error { err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
defer func() { defer func() {

View file

@ -124,6 +124,7 @@ storage:
# note: shard configuration can be omitted for relay node (see `node.relay`) # note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
inhume_pool_size: 50 # size of engine-level worker pool used during INHUME operation
shard: shard:
default: # section with the default shard parameters default: # section with the default shard parameters

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -71,10 +72,14 @@ func (e *StorageEngine) open(ctx context.Context) error {
} }
// Init initializes all StorageEngine's components. // Init initializes all StorageEngine's components.
func (e *StorageEngine) Init(ctx context.Context) error { func (e *StorageEngine) Init(ctx context.Context) (err error) {
e.mtx.Lock() e.mtx.Lock()
defer e.mtx.Unlock() defer e.mtx.Unlock()
if e.inhumePool, err = ants.NewPool(int(e.inhumePoolSize)); err != nil {
return fmt.Errorf("could not create pool: %w", err)
}
errCh := make(chan shardInitError, len(e.shards)) errCh := make(chan shardInitError, len(e.shards))
var eg errgroup.Group var eg errgroup.Group
if e.cfg.lowMem && e.anyShardRequiresRefill() { if e.cfg.lowMem && e.anyShardRequiresRefill() {
@ -92,7 +97,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
return nil return nil
}) })
} }
err := eg.Wait() err = eg.Wait()
close(errCh) close(errCh)
if err != nil { if err != nil {
return fmt.Errorf("failed to initialize shards: %w", err) return fmt.Errorf("failed to initialize shards: %w", err)
@ -161,6 +166,7 @@ func (e *StorageEngine) close(ctx context.Context, releasePools bool) error {
for _, p := range e.shardPools { for _, p := range e.shardPools {
p.Release() p.Release()
} }
e.inhumePool.Release()
} }
for id, sh := range e.shards { for id, sh := range e.shards {

View file

@ -30,6 +30,8 @@ type StorageEngine struct {
shardPools map[string]util.WorkerPool shardPools map[string]util.WorkerPool
inhumePool util.WorkerPool
closeCh chan struct{} closeCh chan struct{}
setModeCh chan setModeRequest setModeCh chan setModeRequest
wg sync.WaitGroup wg sync.WaitGroup
@ -191,6 +193,8 @@ type cfg struct {
shardPoolSize uint32 shardPoolSize uint32
inhumePoolSize uint32
lowMem bool lowMem bool
containerSource atomic.Pointer[containerSource] containerSource atomic.Pointer[containerSource]
@ -200,6 +204,7 @@ func defaultCfg() *cfg {
res := &cfg{ res := &cfg{
log: logger.NewLoggerWrapper(zap.L()), log: logger.NewLoggerWrapper(zap.L()),
shardPoolSize: 20, shardPoolSize: 20,
inhumePoolSize: 50,
metrics: noopMetrics{}, metrics: noopMetrics{},
} }
res.containerSource.Store(&containerSource{}) res.containerSource.Store(&containerSource{})
@ -244,6 +249,13 @@ func WithShardPoolSize(sz uint32) Option {
} }
} }
// WithInhumePoolSize returns option to specify size of worker pool for Inhume operation.
func WithInhumePoolSize(sz uint32) Option {
return func(c *cfg) {
c.inhumePoolSize = sz
}
}
// WithErrorThreshold returns an option to specify size amount of errors after which // WithErrorThreshold returns an option to specify size amount of errors after which
// shard is moved to read-only mode. // shard is moved to read-only mode.
func WithErrorThreshold(sz uint32) Option { func WithErrorThreshold(sz uint32) Option {

View file

@ -3,6 +3,7 @@ package engine
import ( import (
"context" "context"
"errors" "errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -81,45 +82,77 @@ func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRe
} }
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
var shPrm shard.InhumePrm ctx, cancel := context.WithCancelCause(ctx)
if prm.forceRemoval { defer cancel(nil)
shPrm.ForceRemoval()
var wg sync.WaitGroup
loop:
for _, addr := range prm.addrs {
select {
case <-ctx.Done():
break loop
default:
} }
for i := range prm.addrs { wg.Add(1)
if !prm.forceRemoval { if err := e.inhumePool.Submit(func() {
locked, err := e.IsLocked(ctx, prm.addrs[i]) defer wg.Done()
if err != nil {
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
zap.Error(err),
zap.Stringer("addr", prm.addrs[i]),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else if locked {
return InhumeRes{}, new(apistatus.ObjectLocked)
}
}
if prm.tombstone != nil { if err := e.handleInhumeTask(ctx, addr, prm.tombstone, prm.forceRemoval); err != nil {
shPrm.SetTarget(*prm.tombstone, prm.addrs[i]) cancel(err)
} else {
shPrm.MarkAsGarbage(prm.addrs[i])
} }
}); err != nil {
wg.Done()
cancel(err)
wg.Wait()
ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, true) e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err))
if err != nil {
return InhumeRes{}, err
}
if !ok {
ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, false)
if err != nil {
return InhumeRes{}, err
} else if !ok {
return InhumeRes{}, errInhumeFailure return InhumeRes{}, errInhumeFailure
} }
} }
wg.Wait()
return InhumeRes{}, context.Cause(ctx)
} }
return InhumeRes{}, nil func (e *StorageEngine) handleInhumeTask(ctx context.Context, addr oid.Address, tombstone *oid.Address, forceRemoval bool) error {
if !forceRemoval {
locked, err := e.IsLocked(ctx, addr)
if err != nil {
e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck,
zap.Error(err),
zap.Stringer("addr", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else if locked {
return new(apistatus.ObjectLocked)
}
}
var prm shard.InhumePrm
if tombstone != nil {
prm.SetTarget(*tombstone, addr)
} else {
prm.MarkAsGarbage(addr)
}
if forceRemoval {
prm.ForceRemoval()
}
ok, err := e.inhumeAddr(ctx, addr, prm, true)
if err != nil {
return err
}
if !ok {
ok, err := e.inhumeAddr(ctx, addr, prm, false)
if err != nil {
return err
} else if !ok {
return errInhumeFailure
}
}
return nil
} }
// Returns ok if object was inhumed during this invocation or before. // Returns ok if object was inhumed during this invocation or before.