From 9ac33f8d1cc3d93d380760dc0354f1f14c7bf351 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Mon, 18 Nov 2024 14:40:10 +0300 Subject: [PATCH] [#1450] engine: Group object by shard before `Inhume` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` goos: linux goarch: amd64 pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine cpu: 12th Gen Intel(R) Core(TM) i5-1235U │ old.txt │ new.txt │ │ sec/op │ sec/op vs base │ InhumeMultipart/objects=1-12 11.42m ± 1% 10.71m ± 0% -6.27% (p=0.000 n=10) InhumeMultipart/objects=10-12 113.5m ± 0% 100.9m ± 3% -11.08% (p=0.000 n=10) InhumeMultipart/objects=100-12 1135.4m ± 1% 681.3m ± 2% -40.00% (p=0.000 n=10) InhumeMultipart/objects=1000-12 11.358 ± 0% 1.089 ± 1% -90.41% (p=0.000 n=10) InhumeMultipart/objects=10000-12 113.251 ± 0% 1.645 ± 1% -98.55% (p=0.000 n=10) geomean 1.136 265.5m -76.63% ``` Signed-off-by: Aleksey Savchuk --- pkg/local_object_storage/engine/inhume.go | 184 +++++++++++++--------- pkg/util/worker_pool.go | 55 +++++++ 2 files changed, 163 insertions(+), 76 deletions(-) diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index e89a8d048..8d223f5c2 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -81,110 +81,142 @@ func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRe } func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { - var shPrm shard.InhumePrm - if prm.forceRemoval { - shPrm.ForceRemoval() + addrsPerShard, err := e.groupObjectsByShard(ctx, prm.addrs, !prm.forceRemoval) + if err != nil { + return InhumeRes{}, err } - for i := range prm.addrs { - if !prm.forceRemoval { - locked, err := e.IsLocked(ctx, prm.addrs[i]) - if err != nil { - e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck, - zap.Error(err), - zap.Stringer("addr", prm.addrs[i]), - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - } else if locked { - return InhumeRes{}, new(apistatus.ObjectLocked) - } + var inhumePrm shard.InhumePrm + + for shardID, addrIndexes := range addrsPerShard { + addr := make([]oid.Address, len(addrIndexes)) + for i, index := range addrIndexes { + addr[i] = prm.addrs[index] } if prm.tombstone != nil { - shPrm.SetTarget(*prm.tombstone, prm.addrs[i]) + inhumePrm.SetTarget(*prm.tombstone, addr...) } else { - shPrm.MarkAsGarbage(prm.addrs[i]) + inhumePrm.MarkAsGarbage(addr...) + } + if prm.forceRemoval { + inhumePrm.ForceRemoval() } - ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, true) - if err != nil { - return InhumeRes{}, err + sh, exists := e.shards[shardID] + if !exists { + e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, + zap.Error(errors.New("this shard was expected to exist")), + zap.String("shard_id", shardID), + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), + ) + return InhumeRes{}, errInhumeFailure } - if !ok { - ok, err := e.inhumeAddr(ctx, prm.addrs[i], shPrm, false) - if err != nil { - return InhumeRes{}, err - } else if !ok { - return InhumeRes{}, errInhumeFailure + + if _, err := sh.Inhume(ctx, inhumePrm); err != nil { + var errLocked *apistatus.ObjectLocked + switch { + case errors.As(err, &errLocked): + err = errLocked + case errors.Is(err, shard.ErrLockObjectRemoval): + case errors.Is(err, shard.ErrReadOnlyMode): + case errors.Is(err, shard.ErrDegradedMode): + default: + e.reportShardError(ctx, sh, "couldn't inhume object in shard", err) } + return InhumeRes{}, err } } return InhumeRes{}, nil } -// Returns ok if object was inhumed during this invocation or before. -func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm shard.InhumePrm, checkExists bool) (bool, error) { - root := false - var existPrm shard.ExistsPrm - var retErr error - var ok bool +// groupObjectsByShard groups objects based on the shard(s) they are stored on. +// +// If checkLocked is set, [apistatus.ObjectLocked] will be returned if any of +// the objects are locked. +func (e *StorageEngine) groupObjectsByShard(ctx context.Context, addrs []oid.Address, checkLocked bool) (map[string][]int, error) { + groups := make(map[string][]int) + + for i, addr := range addrs { + ids, err := e.findShards(ctx, addr, checkLocked) + if err != nil { + return nil, err + } + for _, id := range ids { + groups[id] = append(groups[id], i) + } + } + + return groups, nil +} + +// findShards determines the shard(s) where the object is stored. +// +// If the object is a root object, multiple shards will be returned. +// +// If checkLocked is set, [apistatus.ObjectLocked] will be returned if any of +// the objects are locked. +func (e *StorageEngine) findShards(ctx context.Context, addr oid.Address, checkLocked bool) ([]string, error) { + var ( + ids []string + retErr error + + prm shard.ExistsPrm + + siErr *objectSDK.SplitInfoError + ecErr *objectSDK.ECInfoError + + isRootObject = false + ) e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { - defer func() { - // if object is root we continue since information about it - // can be presented in other shards - if checkExists && root { - stop = false - } - }() + objectExists := false - if checkExists { - existPrm.Address = addr - exRes, err := sh.Exists(ctx, existPrm) - if err != nil { - if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) { - // inhumed once - no need to be inhumed again - ok = true - return true - } - - var siErr *objectSDK.SplitInfoError - var ecErr *objectSDK.ECInfoError - if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) { - e.reportShardError(ctx, sh, "could not check for presents in shard", err, zap.Stringer("address", addr)) - return - } - - root = true - } else if !exRes.Exists() { - return - } + prm.Address = addr + switch res, err := sh.Exists(ctx, prm); { + case client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err): + // Don't report the error and just keep going. + case errors.As(err, &siErr) || errors.As(err, &ecErr): + isRootObject = true + objectExists = true + case err != nil: + e.reportShardError( + ctx, sh, "couldn't check for presence in shard", + err, zap.Stringer("address", addr), + ) + case res.Exists(): + objectExists = true + default: } - _, err := sh.Inhume(ctx, prm) - if err != nil { - var errLocked *apistatus.ObjectLocked - switch { - case errors.As(err, &errLocked): + if !objectExists { + return + } + + if checkLocked { + if isLocked, err := sh.IsLocked(ctx, addr); err != nil { + e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck, + zap.Error(err), + zap.Stringer("address", addr), + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), + ) + } else if isLocked { retErr = new(apistatus.ObjectLocked) return true - case errors.Is(err, shard.ErrLockObjectRemoval): - retErr = meta.ErrLockObjectRemoval - return true - case errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, shard.ErrDegradedMode): - retErr = err - return true } - - e.reportShardError(ctx, sh, "could not inhume object in shard", err, zap.Stringer("address", addr)) - return false } - ok = true - return true + ids = append(ids, sh.ID().String()) + + // Continue if it's a root object. + return !isRootObject }) - return ok, retErr + if retErr != nil { + return nil, retErr + } + return ids, nil } // IsLocked checks whether an object is locked according to StorageEngine's state. diff --git a/pkg/util/worker_pool.go b/pkg/util/worker_pool.go index 97d76c492..8284f1e6c 100644 --- a/pkg/util/worker_pool.go +++ b/pkg/util/worker_pool.go @@ -1,6 +1,8 @@ package util import ( + "context" + "sync" "sync/atomic" "github.com/panjf2000/ants/v2" @@ -53,3 +55,56 @@ func (p *pseudoWorkerPool) Submit(fn func()) error { func (p *pseudoWorkerPool) Release() { p.closed.Store(true) } + +type WorkerTask func(ctx context.Context) error + +type WorkerPoolSubmitError struct { + err error +} + +func (e *WorkerPoolSubmitError) Error() string { + return e.err.Error() +} + +func (e *WorkerPoolSubmitError) Unwrap() error { + return e.err +} + +// ExecuteWithWorkerPool runs tasks in parallel using a pool and waits for all +// tasks to be complete. +// +// Returns [WorkerPoolSubmitError] when it couldn't submit a task. +func ExecuteWithWorkerPool(ctx context.Context, pool WorkerPool, tasks []WorkerTask) error { + taskCtx, taskCancel := context.WithCancelCause(ctx) + defer taskCancel(nil) + + var wg sync.WaitGroup + +loop: + for _, task := range tasks { + select { + case <-ctx.Done(): + taskCancel(context.Cause(ctx)) + break loop + default: + } + + wg.Add(1) + if err := pool.Submit(func() { + defer wg.Done() + + if err := task(taskCtx); err != nil { + taskCancel(err) + } + }); err != nil { + wg.Done() + taskCancel(err) + wg.Wait() + + return &WorkerPoolSubmitError{err} + } + } + wg.Wait() + + return context.Cause(taskCtx) +}