diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index a5c53dcad..591578ed6 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -71,10 +72,14 @@ func (e *StorageEngine) open(ctx context.Context) error { } // Init initializes all StorageEngine's components. -func (e *StorageEngine) Init(ctx context.Context) error { +func (e *StorageEngine) Init(ctx context.Context) (err error) { e.mtx.Lock() defer e.mtx.Unlock() + if e.inhumePool, err = ants.NewPool(int(e.inhumePoolSize)); err != nil { + return fmt.Errorf("could not create pool: %w", err) + } + errCh := make(chan shardInitError, len(e.shards)) var eg errgroup.Group if e.cfg.lowMem && e.anyShardRequiresRefill() { @@ -92,7 +97,7 @@ func (e *StorageEngine) Init(ctx context.Context) error { return nil }) } - err := eg.Wait() + err = eg.Wait() close(errCh) if err != nil { return fmt.Errorf("failed to initialize shards: %w", err) @@ -161,6 +166,7 @@ func (e *StorageEngine) close(ctx context.Context, releasePools bool) error { for _, p := range e.shardPools { p.Release() } + e.inhumePool.Release() } for id, sh := range e.shards { diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 029904046..e4d1972b8 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -30,6 +30,8 @@ type StorageEngine struct { shardPools map[string]util.WorkerPool + inhumePool util.WorkerPool + closeCh chan struct{} setModeCh chan setModeRequest wg sync.WaitGroup @@ -191,6 +193,8 @@ type cfg struct { shardPoolSize uint32 + inhumePoolSize uint32 + lowMem bool containerSource atomic.Pointer[containerSource] @@ -198,9 +202,10 @@ type cfg struct { func defaultCfg() *cfg { res := &cfg{ - log: logger.NewLoggerWrapper(zap.L()), - shardPoolSize: 20, - metrics: noopMetrics{}, + log: logger.NewLoggerWrapper(zap.L()), + shardPoolSize: 20, + inhumePoolSize: 50, + metrics: noopMetrics{}, } res.containerSource.Store(&containerSource{}) return res @@ -244,6 +249,13 @@ func WithShardPoolSize(sz uint32) Option { } } +// WithInhumePoolSize returns option to specify size of worker pool for Inhume operation. +func WithInhumePoolSize(sz uint32) Option { + return func(c *cfg) { + c.inhumePoolSize = sz + } +} + // WithErrorThreshold returns an option to specify size amount of errors after which // shard is moved to read-only mode. func WithErrorThreshold(sz uint32) Option { diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index e89a8d048..4841edc9b 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -3,6 +3,7 @@ package engine import ( "context" "errors" + "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" @@ -81,45 +82,82 @@ func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRe } func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { - var shPrm shard.InhumePrm - if prm.forceRemoval { - shPrm.ForceRemoval() + defer elapsed("Inhume", e.metrics.AddMethodDuration)() + + taskCtx, taskCancel := context.WithCancelCause(ctx) + defer taskCancel(nil) + + var wg sync.WaitGroup + +loop: + for _, addr := range prm.addrs { + select { + case <-ctx.Done(): + taskCancel(context.Cause(ctx)) + break loop + default: + } + + wg.Add(1) + if err := e.inhumePool.Submit(func() { + defer wg.Done() + + if err := e.handleInhumeTask(taskCtx, addr, prm.tombstone, prm.forceRemoval); err != nil { + taskCancel(err) + } + }); err != nil { + wg.Done() + taskCancel(err) + wg.Wait() + + e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err)) + return InhumeRes{}, errInhumeFailure + } + } + wg.Wait() + + return InhumeRes{}, context.Cause(taskCtx) +} + +func (e *StorageEngine) handleInhumeTask(ctx context.Context, addr oid.Address, tombstone *oid.Address, forceRemoval bool) error { + if !forceRemoval { + locked, err := e.IsLocked(ctx, addr) + if err != nil { + e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck, + zap.Error(err), + zap.Stringer("addr", addr), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } else if locked { + return new(apistatus.ObjectLocked) + } } - for i := range prm.addrs { - if !prm.forceRemoval { - locked, err := e.IsLocked(ctx, prm.addrs[i]) - if err != nil { - e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck, - zap.Error(err), - zap.Stringer("addr", prm.addrs[i]), - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - } else if locked { - return InhumeRes{}, new(apistatus.ObjectLocked) - } - } + var prm shard.InhumePrm - if prm.tombstone != nil { - shPrm.SetTarget(*prm.tombstone, prm.addrs[i]) - } else { - shPrm.MarkAsGarbage(prm.addrs[i]) - } + if tombstone != nil { + prm.SetTarget(*tombstone, addr) + } else { + prm.MarkAsGarbage(addr) + } + if forceRemoval { + prm.ForceRemoval() + } - ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, true) + ok, err := e.inhumeAddr(ctx, addr, prm, true) + if err != nil { + return err + } + if !ok { + ok, err := e.inhumeAddr(ctx, addr, prm, false) if err != nil { - return InhumeRes{}, err + return err } if !ok { - ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, false) - if err != nil { - return InhumeRes{}, err - } else if !ok { - return InhumeRes{}, errInhumeFailure - } + return errInhumeFailure } } - return InhumeRes{}, nil + return nil } // Returns ok if object was inhumed during this invocation or before.