From 69c983ee963a3e65be003ee1c0ad41a5a430a0f3 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Wed, 13 Nov 2024 18:23:21 +0300 Subject: [PATCH] [#1450] engine: Inhume objects in parallel with a worker pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a worker pool for `Inhume` operation and use it to handle objects in parallel. Since metabase `Inhume` uses `bbolt.Batch`, handling many objects one by one may be inefficient. ``` goos: linux goarch: amd64 pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine cpu: 12th Gen Intel(R) Core(TM) i5-1235U │ old.txt │ new1.txt │ │ sec/op │ sec/op vs base │ InhumeMultipart/objects=1-12 11.42m ± 1% 11.42m ± 1% ~ (p=0.739 n=10) InhumeMultipart/objects=10-12 113.51m ± 0% 11.62m ± 1% -89.76% (p=0.000 n=10) InhumeMultipart/objects=100-12 1135.41m ± 1% 28.30m ± 1% -97.51% (p=0.000 n=10) InhumeMultipart/objects=1000-12 11357.8m ± 0% 259.8m ± 1% -97.71% (p=0.000 n=10) InhumeMultipart/objects=10000-12 113.251 ± 0% 2.277 ± 1% -97.99% (p=0.000 n=10) geomean 1.136 74.03m -93.48% ``` Signed-off-by: Aleksey Savchuk --- pkg/local_object_storage/engine/control.go | 10 ++- pkg/local_object_storage/engine/engine.go | 18 ++++- pkg/local_object_storage/engine/inhume.go | 94 +++++++++++++++------- 3 files changed, 87 insertions(+), 35 deletions(-) diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index a5c53dcad..591578ed6 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -71,10 +72,14 @@ func (e *StorageEngine) open(ctx context.Context) error { } // Init initializes all StorageEngine's components. -func (e *StorageEngine) Init(ctx context.Context) error { +func (e *StorageEngine) Init(ctx context.Context) (err error) { e.mtx.Lock() defer e.mtx.Unlock() + if e.inhumePool, err = ants.NewPool(int(e.inhumePoolSize)); err != nil { + return fmt.Errorf("could not create pool: %w", err) + } + errCh := make(chan shardInitError, len(e.shards)) var eg errgroup.Group if e.cfg.lowMem && e.anyShardRequiresRefill() { @@ -92,7 +97,7 @@ func (e *StorageEngine) Init(ctx context.Context) error { return nil }) } - err := eg.Wait() + err = eg.Wait() close(errCh) if err != nil { return fmt.Errorf("failed to initialize shards: %w", err) @@ -161,6 +166,7 @@ func (e *StorageEngine) close(ctx context.Context, releasePools bool) error { for _, p := range e.shardPools { p.Release() } + e.inhumePool.Release() } for id, sh := range e.shards { diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 029904046..e4d1972b8 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -30,6 +30,8 @@ type StorageEngine struct { shardPools map[string]util.WorkerPool + inhumePool util.WorkerPool + closeCh chan struct{} setModeCh chan setModeRequest wg sync.WaitGroup @@ -191,6 +193,8 @@ type cfg struct { shardPoolSize uint32 + inhumePoolSize uint32 + lowMem bool containerSource atomic.Pointer[containerSource] @@ -198,9 +202,10 @@ type cfg struct { func defaultCfg() *cfg { res := &cfg{ - log: logger.NewLoggerWrapper(zap.L()), - shardPoolSize: 20, - metrics: noopMetrics{}, + log: logger.NewLoggerWrapper(zap.L()), + shardPoolSize: 20, + inhumePoolSize: 50, + metrics: noopMetrics{}, } res.containerSource.Store(&containerSource{}) return res @@ -244,6 +249,13 @@ func WithShardPoolSize(sz uint32) Option { } } +// WithInhumePoolSize returns option to specify size of worker pool for Inhume operation. +func WithInhumePoolSize(sz uint32) Option { + return func(c *cfg) { + c.inhumePoolSize = sz + } +} + // WithErrorThreshold returns an option to specify size amount of errors after which // shard is moved to read-only mode. func WithErrorThreshold(sz uint32) Option { diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index e89a8d048..c2031761f 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -3,6 +3,7 @@ package engine import ( "context" "errors" + "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" @@ -81,45 +82,78 @@ func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRe } func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { - var shPrm shard.InhumePrm - if prm.forceRemoval { - shPrm.ForceRemoval() - } + taskCtx, taskCancel := context.WithCancelCause(ctx) + defer taskCancel(nil) - for i := range prm.addrs { - if !prm.forceRemoval { - locked, err := e.IsLocked(ctx, prm.addrs[i]) - if err != nil { - e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck, - zap.Error(err), - zap.Stringer("addr", prm.addrs[i]), - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - } else if locked { - return InhumeRes{}, new(apistatus.ObjectLocked) + var wg sync.WaitGroup + +loop: + for _, addr := range prm.addrs { + select { + case <-ctx.Done(): + taskCancel(context.Cause(ctx)) + break loop + default: + } + + wg.Add(1) + if err := e.inhumePool.Submit(func() { + defer wg.Done() + + if err := e.handleInhumeTask(taskCtx, addr, prm.tombstone, prm.forceRemoval); err != nil { + taskCancel(err) } - } + }); err != nil { + wg.Done() + taskCancel(err) + wg.Wait() - if prm.tombstone != nil { - shPrm.SetTarget(*prm.tombstone, prm.addrs[i]) - } else { - shPrm.MarkAsGarbage(prm.addrs[i]) + e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err)) + return InhumeRes{}, errInhumeFailure } + } + wg.Wait() - ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, true) + return InhumeRes{}, context.Cause(taskCtx) +} + +func (e *StorageEngine) handleInhumeTask(ctx context.Context, addr oid.Address, tombstone *oid.Address, forceRemoval bool) error { + if !forceRemoval { + locked, err := e.IsLocked(ctx, addr) if err != nil { - return InhumeRes{}, err - } - if !ok { - ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, false) - if err != nil { - return InhumeRes{}, err - } else if !ok { - return InhumeRes{}, errInhumeFailure - } + e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck, + zap.Error(err), + zap.Stringer("addr", addr), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } else if locked { + return new(apistatus.ObjectLocked) } } - return InhumeRes{}, nil + var prm shard.InhumePrm + if tombstone != nil { + prm.SetTarget(*tombstone, addr) + } else { + prm.MarkAsGarbage(addr) + } + if forceRemoval { + prm.ForceRemoval() + } + + ok, err := e.inhumeAddr(ctx, addr, prm, true) + if err != nil { + return err + } + if !ok { + ok, err := e.inhumeAddr(ctx, addr, prm, false) + if err != nil { + return err + } else if !ok { + return errInhumeFailure + } + } + + return nil } // Returns ok if object was inhumed during this invocation or before.