diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index e971fa525..135d3a8a9 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -3,12 +3,12 @@ package engine import ( "context" "errors" - "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -82,152 +82,182 @@ func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRe } func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { - defer elapsed("Inhume", e.metrics.AddMethodDuration)() - - var retErr error - - wg := sync.WaitGroup{} - errOnce := sync.Once{} - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - -loop: - for _, addr := range prm.addrs { - select { - case <-ctx.Done(): - break loop - default: - } - - wg.Add(1) - if err := e.inhumePool.Submit(func() { - defer wg.Done() - - if err := e.handleInhumeTask(ctx, addr, prm.tombstone, prm.forceRemoval); err != nil { - errOnce.Do(func() { - retErr = err - cancel() - }) - } - }); err != nil { - wg.Done() - cancel() - wg.Wait() - - e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err)) - return InhumeRes{}, errInhumeFailure - } - } - wg.Wait() - - return InhumeRes{}, retErr -} - -func (e *StorageEngine) handleInhumeTask(ctx context.Context, addr oid.Address, tombstone *oid.Address, forceRemoval bool) error { - if !forceRemoval { - locked, err := e.IsLocked(ctx, addr) - if err != nil { - e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck, - zap.Error(err), - zap.Stringer("addr", addr), - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - } else if locked { - return new(apistatus.ObjectLocked) - } - } - - var prm shard.InhumePrm - - if tombstone != nil { - prm.SetTarget(*tombstone, addr) - } else { - prm.MarkAsGarbage(addr) - } - if forceRemoval { - prm.ForceRemoval() - } - - ok, err := e.inhumeAddr(ctx, addr, prm, true) + addrsPerShard, err := e.groupObjectsBeforeInhume(ctx, prm.addrs, !prm.forceRemoval) if err != nil { - return err - } - if !ok { - ok, err := e.inhumeAddr(ctx, addr, prm, false) - if err != nil { - return err - } - if !ok { - return errInhumeFailure - } + return InhumeRes{}, err } - return nil -} - -// Returns ok if object was inhumed during this invocation or before. -func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm shard.InhumePrm, checkExists bool) (bool, error) { - root := false - var existPrm shard.ExistsPrm - var retErr error - var ok bool - - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { - defer func() { - // if object is root we continue since information about it - // can be presented in other shards - if checkExists && root { - stop = false + tasks := make([]util.WorkerTask, 0, len(addrsPerShard)) + for shardID, addrIndexes := range addrsPerShard { + tasks = append(tasks, func(ctx context.Context) error { + addr := make([]oid.Address, len(addrIndexes)) + for i, index := range addrIndexes { + addr[i] = prm.addrs[index] } - }() - if checkExists { - existPrm.Address = addr - exRes, err := sh.Exists(ctx, existPrm) - if err != nil { - if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) { - // inhumed once - no need to be inhumed again - ok = true - return true - } - - var siErr *objectSDK.SplitInfoError - var ecErr *objectSDK.ECInfoError - if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) { - e.reportShardError(ctx, sh, "could not check for presents in shard", err, zap.Stringer("address", addr)) - return - } - - root = true - } else if !exRes.Exists() { - return + var inhumePrm shard.InhumePrm + if prm.tombstone != nil { + inhumePrm.SetTarget(*prm.tombstone, addr...) + } else { + inhumePrm.MarkAsGarbage(addr...) + } + if prm.forceRemoval { + inhumePrm.ForceRemoval() } - } - _, err := sh.Inhume(ctx, prm) - if err != nil { + hs, exists := e.shards[shardID] + if !exists { + e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, + zap.Error(errors.New("this shard was expected to exist")), + zap.String("shard_id", shardID), + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), + ) + return errInhumeFailure + } + + _, err := hs.Shard.Inhume(ctx, inhumePrm) var errLocked *apistatus.ObjectLocked switch { case errors.As(err, &errLocked): - retErr = new(apistatus.ObjectLocked) - return true + err = errLocked case errors.Is(err, shard.ErrLockObjectRemoval): - retErr = meta.ErrLockObjectRemoval - return true - case errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, shard.ErrDegradedMode): - retErr = err - return true + case errors.Is(err, shard.ErrReadOnlyMode): + case errors.Is(err, shard.ErrDegradedMode): + case err != nil: + e.reportShardError(ctx, hs, "couldn't inhume object in shard", err) } + return err + }) + } - e.reportShardError(ctx, sh, "could not inhume object in shard", err, zap.Stringer("address", addr)) - return false + err = util.ExecuteWithWorkerPool(ctx, e.inhumePool, tasks) + + var errSubmit *util.WorkerPoolSubmitError + if errors.As(err, &errSubmit) { + e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err)) + err = errInhumeFailure + } + + return InhumeRes{}, err +} + +// groupObjectsBeforeInhume groups objects by shard they're stored on. +func (e *StorageEngine) groupObjectsBeforeInhume(ctx context.Context, addrs []oid.Address, checkLocked bool) (map[string][]int, error) { + type addrLocation struct { + addrIndex int + shardID string + } + + locations := make(chan addrLocation, e.inhumePoolSize) + groups := make(map[string][]int) + groupingDone := make(chan struct{}) + + go func() { + defer close(groupingDone) + for loc := range locations { + groups[loc.shardID] = append(groups[loc.shardID], loc.addrIndex) + } + }() + + tasks := make([]util.WorkerTask, len(addrs)) + for i, addr := range addrs { + tasks[i] = func(ctx context.Context) (err error) { + ids, err := e.findShards(ctx, addr, checkLocked) + if err != nil { + return err + } + for _, id := range ids { + locations <- addrLocation{addrIndex: i, shardID: id} + } + return nil + } + } + + err := util.ExecuteWithWorkerPool(ctx, e.inhumePool, tasks) + close(locations) + <-groupingDone + + if err != nil { + var errSubmit *util.WorkerPoolSubmitError + if errors.As(err, &errSubmit) { + e.log.Warn(ctx, logs.EngineCouldNotInhumeObjectInShard, zap.Error(err)) + return nil, errInhumeFailure + } + return nil, err + } + return groups, nil +} + +// findShards returns a shard which this object is stored on, +// and returns multiple shards if it's a root object. +func (e *StorageEngine) findShards(ctx context.Context, addr oid.Address, checkLocked bool) ([]string, error) { + var ( + ids []string + retErr error + + prm shard.ExistsPrm + + siErr *objectSDK.SplitInfoError + ecErr *objectSDK.ECInfoError + + isRootObject = false + ) + + e.iterateOverSortedShards(addr, func(_ int, hs hashedShard) (stop bool) { + objectExists := false + + prm.Address = addr + switch res, err := hs.Shard.Exists(ctx, prm); { + case client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err): + // Don't report the error and just keep going. + case errors.As(err, &siErr) || errors.As(err, &ecErr): + isRootObject = true + objectExists = true + case err != nil: + e.reportShardError( + ctx, hs, "couldn't check for presence in shard", + err, zap.Stringer("address", addr), + ) + case res.Exists(): + objectExists = true + default: } - ok = true - return true + if !objectExists { + return + } + + // TODO(@a-savchuk): `shard.(*Shard).Exists` can check if an object + // is locked. I wanted to use it here, but then I found out that it + // checks if the PARENT object is locked, not the object passed. + // This check is used in one place only, in `(*StorageEngine).Put`. + // + // In the future, if `shard.(*Shard).Exists` is updated to check + // the lock status of the specific object, we could use it instead of + // `shard.(*Shard).IsLocked`. + if checkLocked { + if isLocked, err := hs.Shard.IsLocked(ctx, addr); err != nil { + e.log.Warn(ctx, logs.EngineRemovingAnObjectWithoutFullLockingCheck, + zap.Error(err), + zap.Stringer("address", addr), + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), + ) + } else if isLocked { + retErr = new(apistatus.ObjectLocked) + return true + } + } + + ids = append(ids, hs.ID().String()) + + // Continue if it's a root object. + return !isRootObject }) - return ok, retErr + if retErr != nil { + return nil, retErr + } + return ids, nil } // IsLocked checks whether an object is locked according to StorageEngine's state. diff --git a/pkg/util/worker_pool.go b/pkg/util/worker_pool.go index 97d76c492..fd9a06731 100644 --- a/pkg/util/worker_pool.go +++ b/pkg/util/worker_pool.go @@ -1,6 +1,8 @@ package util import ( + "context" + "sync" "sync/atomic" "github.com/panjf2000/ants/v2" @@ -53,3 +55,63 @@ func (p *pseudoWorkerPool) Submit(fn func()) error { func (p *pseudoWorkerPool) Release() { p.closed.Store(true) } + +type WorkerTask func(ctx context.Context) error + +type WorkerPoolSubmitError struct { + err error +} + +func (e *WorkerPoolSubmitError) Error() string { + return e.err.Error() +} + +func (e *WorkerPoolSubmitError) Unwrap() error { + return e.err +} + +// ExecuteWithWorkerPool runs tasks in parallel using a pool and waits for all +// tasks to be complete. +// +// Returns [WorkerPoolSubmitError] when it couldn't submit a task. +func ExecuteWithWorkerPool(ctx context.Context, pool WorkerPool, tasks []WorkerTask) error { + taskCtx, taskCancel := context.WithCancelCause(ctx) + defer taskCancel(nil) + + var ( + wg sync.WaitGroup + cancelOnce sync.Once + ) + +loop: + for _, task := range tasks { + select { + case <-ctx.Done(): + cancelOnce.Do(func() { + taskCancel(context.Cause(taskCtx)) + }) + break loop + default: + } + + wg.Add(1) + if err := pool.Submit(func() { + defer wg.Done() + + if err := task(taskCtx); err != nil { + cancelOnce.Do(func() { + taskCancel(err) + }) + } + }); err != nil { + wg.Done() + taskCancel(err) + wg.Wait() + + return &WorkerPoolSubmitError{err} + } + } + wg.Wait() + + return context.Cause(taskCtx) +}