From 3a441f072ffb955c6513eb7dd7733da311590ad7 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 21 Apr 2025 12:13:32 +0300 Subject: [PATCH] [#1709] shard: Check if context canceled for shard iteration If context has already been canceled, then there is no need to check other shards. At the same time, it is necessary to avoid handling context cancellation in each handler. Therefore, the context check has been moved to the shard iteration method, which now returns an error. Change-Id: I70030ace36593ce7d2b8376bee39fe82e9dbf88f Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 1 + pkg/local_object_storage/engine/container.go | 27 ++++--- pkg/local_object_storage/engine/delete.go | 12 +-- pkg/local_object_storage/engine/exists.go | 6 +- pkg/local_object_storage/engine/get.go | 16 ++-- pkg/local_object_storage/engine/head.go | 6 +- pkg/local_object_storage/engine/inhume.go | 77 ++++++++++++-------- pkg/local_object_storage/engine/lock.go | 16 +++- pkg/local_object_storage/engine/put.go | 6 +- pkg/local_object_storage/engine/range.go | 16 ++-- pkg/local_object_storage/engine/select.go | 30 +++++--- pkg/local_object_storage/engine/shards.go | 22 +++++- 12 files changed, 149 insertions(+), 86 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index dedc7e12cc..626372f435 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -198,6 +198,7 @@ const ( EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks" EngineInterruptGettingLockers = "can't get object's lockers" EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks" + EngineInterruptProcessingTheExpiredTombstones = "interrupt processing the expired tombstones" EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only" EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode" EngineShardIsMovedInReadonlyModeDueToErrorThreshold = "shard is moved in read-only mode due to error threshold" diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 03a299b93b..e0617a8326 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -48,8 +48,9 @@ func (e *StorageEngine) ContainerSize(ctx context.Context, prm ContainerSizePrm) defer elapsed("ContainerSize", e.metrics.AddMethodDuration)() err = e.execIfNotBlocked(func() error { - res = e.containerSize(ctx, prm) - return nil + var csErr error + res, csErr = e.containerSize(ctx, prm) + return csErr }) return @@ -69,8 +70,9 @@ func ContainerSize(ctx context.Context, e *StorageEngine, id cid.ID) (uint64, er return res.Size(), nil } -func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes) { - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { +func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (ContainerSizeRes, error) { + var res ContainerSizeRes + err := e.iterateOverUnsortedShards(ctx, func(sh hashedShard) (stop bool) { var csPrm shard.ContainerSizePrm csPrm.SetContainerID(prm.cnr) @@ -86,7 +88,7 @@ func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) return false }) - return + return res, err } // ListContainers returns a unique container IDs presented in the engine objects. @@ -96,8 +98,9 @@ func (e *StorageEngine) ListContainers(ctx context.Context, _ ListContainersPrm) defer elapsed("ListContainers", e.metrics.AddMethodDuration)() err = e.execIfNotBlocked(func() error { - res = e.listContainers(ctx) - return nil + var lcErr error + res, lcErr = e.listContainers(ctx) + return lcErr }) return @@ -115,10 +118,10 @@ func ListContainers(ctx context.Context, e *StorageEngine) ([]cid.ID, error) { return res.Containers(), nil } -func (e *StorageEngine) listContainers(ctx context.Context) ListContainersRes { +func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes, error) { uniqueIDs := make(map[string]cid.ID) - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + if err := e.iterateOverUnsortedShards(ctx, func(sh hashedShard) (stop bool) { res, err := sh.ListContainers(ctx, shard.ListContainersPrm{}) if err != nil { e.reportShardError(ctx, sh, "can't get list of containers", err) @@ -133,7 +136,9 @@ func (e *StorageEngine) listContainers(ctx context.Context) ListContainersRes { } return false - }) + }); err != nil { + return ListContainersRes{}, err + } result := make([]cid.ID, 0, len(uniqueIDs)) for _, v := range uniqueIDs { @@ -142,5 +147,5 @@ func (e *StorageEngine) listContainers(ctx context.Context) ListContainersRes { return ListContainersRes{ containers: result, - } + }, nil } diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 5e5f65fa2f..223cdbc488 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -71,7 +71,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) error { // Removal of a big object is done in multiple stages: // 1. Remove the parent object. If it is locked or already removed, return immediately. // 2. Otherwise, search for all objects with a particular SplitID and delete them too. - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + if err := e.iterateOverSortedShards(ctx, prm.addr, func(_ int, sh hashedShard) (stop bool) { var existsPrm shard.ExistsPrm existsPrm.Address = prm.addr @@ -116,20 +116,22 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) error { // If a parent object is removed we should set GC mark on each shard. return splitInfo == nil - }) + }); err != nil { + return err + } if locked.is { return new(apistatus.ObjectLocked) } if splitInfo != nil { - e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID()) + return e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID()) } return nil } -func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) { +func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) error { var fs objectSDK.SearchFilters fs.AddSplitIDFilter(objectSDK.MatchStringEqual, splitID) @@ -142,7 +144,7 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo inhumePrm.ForceRemoval() } - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + return e.iterateOverSortedShards(ctx, addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Select(ctx, selectPrm) if err != nil { e.log.Warn(ctx, logs.EngineErrorDuringSearchingForObjectChildren, diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index 9d2b1c1b78..7dac9eb97f 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -18,7 +18,7 @@ func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool exists := false locked := false - e.iterateOverSortedShards(shPrm.Address, func(_ int, sh hashedShard) (stop bool) { + if err := e.iterateOverSortedShards(ctx, shPrm.Address, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Exists(ctx, shPrm) if err != nil { if client.IsErrObjectAlreadyRemoved(err) { @@ -50,7 +50,9 @@ func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool } return false - }) + }); err != nil { + return false, false, err + } if alreadyRemoved { return false, false, new(apistatus.ObjectAlreadyRemoved) diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 74c64bbb60..0694c53f35 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -78,7 +78,9 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) { Engine: e, } - it.tryGetWithMeta(ctx) + if err := it.tryGetWithMeta(ctx); err != nil { + return GetRes{}, err + } if it.SplitInfo != nil { return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo)) @@ -97,7 +99,9 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) { return GetRes{}, it.OutError } - it.tryGetFromBlobstore(ctx) + if err := it.tryGetFromBlobstore(ctx); err != nil { + return GetRes{}, err + } if it.Object == nil { return GetRes{}, it.OutError @@ -133,8 +137,8 @@ type getShardIterator struct { ecInfoErr *objectSDK.ECInfoError } -func (i *getShardIterator) tryGetWithMeta(ctx context.Context) { - i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) { +func (i *getShardIterator) tryGetWithMeta(ctx context.Context) error { + return i.Engine.iterateOverSortedShards(ctx, i.Address, func(_ int, sh hashedShard) (stop bool) { noMeta := sh.GetMode().NoMetabase() i.ShardPrm.SetIgnoreMeta(noMeta) @@ -187,13 +191,13 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) { }) } -func (i *getShardIterator) tryGetFromBlobstore(ctx context.Context) { +func (i *getShardIterator) tryGetFromBlobstore(ctx context.Context) error { // If the object is not found but is present in metabase, // try to fetch it from blobstor directly. If it is found in any // blobstor, increase the error counter for the shard which contains the meta. i.ShardPrm.SetIgnoreMeta(true) - i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) { + return i.Engine.iterateOverSortedShards(ctx, i.Address, func(_ int, sh hashedShard) (stop bool) { if sh.GetMode().NoMetabase() { // Already visited. return false diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index d6892f1291..d436dd4113 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -82,7 +82,7 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) shPrm.SetAddress(prm.addr) shPrm.SetRaw(prm.raw) - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + if err := e.iterateOverSortedShards(ctx, prm.addr, func(_ int, sh hashedShard) (stop bool) { shPrm.ShardLooksBad = sh.errorCount.Load() >= e.errorsThreshold res, err := sh.Head(ctx, shPrm) if err != nil { @@ -123,7 +123,9 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) } head = res.Object() return true - }) + }); err != nil { + return HeadRes{}, err + } if head != nil { return HeadRes{head: head}, nil diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index e5a2396e1c..e13f04927d 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -158,7 +158,7 @@ func (e *StorageEngine) findShards(ctx context.Context, addr oid.Address, checkL objectExists bool ) - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + if err := e.iterateOverSortedShards(ctx, addr, func(_ int, sh hashedShard) (stop bool) { objectExists = false prm.Address = addr @@ -209,7 +209,9 @@ func (e *StorageEngine) findShards(ctx context.Context, addr oid.Address, checkL // Continue if it's a root object. return !isRootObject - }) + }); err != nil { + return nil, err + } if retErr != nil { return nil, retErr @@ -229,7 +231,7 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e var err error var outErr error - e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) { + if err := e.iterateOverUnsortedShards(ctx, func(h hashedShard) (stop bool) { locked, err = h.IsLocked(ctx, addr) if err != nil { e.reportShardError(ctx, h, "can't check object's lockers", err, zap.Stringer("address", addr)) @@ -238,7 +240,9 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e } return locked - }) + }); err != nil { + return false, err + } if locked { return locked, nil @@ -258,7 +262,7 @@ func (e *StorageEngine) GetLocks(ctx context.Context, addr oid.Address) ([]oid.I var allLocks []oid.ID var outErr error - e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) { + if err := e.iterateOverUnsortedShards(ctx, func(h hashedShard) (stop bool) { locks, err := h.GetLocks(ctx, addr) if err != nil { e.reportShardError(ctx, h, logs.EngineInterruptGettingLockers, err, zap.Stringer("address", addr)) @@ -266,7 +270,9 @@ func (e *StorageEngine) GetLocks(ctx context.Context, addr oid.Address) ([]oid.I } allLocks = append(allLocks, locks...) return false - }) + }); err != nil { + return nil, err + } if len(allLocks) > 0 { return allLocks, nil } @@ -274,20 +280,23 @@ func (e *StorageEngine) GetLocks(ctx context.Context, addr oid.Address) ([]oid.I } func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) { - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + if err := e.iterateOverUnsortedShards(ctx, func(sh hashedShard) (stop bool) { sh.HandleExpiredTombstones(ctx, addrs) select { case <-ctx.Done(): + e.log.Info(ctx, logs.EngineInterruptProcessingTheExpiredTombstones, zap.Error(ctx.Err())) return true default: return false } - }) + }); err != nil { + e.log.Info(ctx, logs.EngineInterruptProcessingTheExpiredTombstones, zap.Error(err)) + } } func (e *StorageEngine) processExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) { - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + if err := e.iterateOverUnsortedShards(ctx, func(sh hashedShard) (stop bool) { sh.HandleExpiredLocks(ctx, epoch, lockers) select { @@ -297,11 +306,13 @@ func (e *StorageEngine) processExpiredLocks(ctx context.Context, epoch uint64, l default: return false } - }) + }); err != nil { + e.log.Info(ctx, logs.EngineInterruptProcessingTheExpiredLocks, zap.Error(err)) + } } func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.Address) { - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + if err := e.iterateOverUnsortedShards(ctx, func(sh hashedShard) (stop bool) { sh.HandleDeletedLocks(ctx, lockers) select { @@ -311,26 +322,25 @@ func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.A default: return false } - }) + }); err != nil { + e.log.Info(ctx, logs.EngineInterruptProcessingTheDeletedLocks, zap.Error(err)) + } } func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid.ID) { if len(ids) == 0 { return } - idMap, err := e.selectNonExistentIDs(ctx, ids) if err != nil { return } - if len(idMap) == 0 { return } - var failed bool var prm shard.ContainerSizePrm - e.iterateOverUnsortedShards(func(sh hashedShard) bool { + if err := e.iterateOverUnsortedShards(ctx, func(sh hashedShard) bool { select { case <-ctx.Done(): e.log.Info(ctx, logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err())) @@ -357,13 +367,15 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid } return len(idMap) == 0 - }) - + }); err != nil { + e.log.Info(ctx, logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(err)) + return + } if failed || len(idMap) == 0 { return } - e.iterateOverUnsortedShards(func(sh hashedShard) bool { + if err := e.iterateOverUnsortedShards(ctx, func(sh hashedShard) bool { select { case <-ctx.Done(): e.log.Info(ctx, logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err())) @@ -381,12 +393,13 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid } return false - }) - + }); err != nil { + e.log.Info(ctx, logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(err)) + return + } if failed { return } - for id := range idMap { e.metrics.DeleteContainerSize(id.EncodeToString()) } @@ -396,19 +409,16 @@ func (e *StorageEngine) processZeroCountContainers(ctx context.Context, ids []ci if len(ids) == 0 { return } - idMap, err := e.selectNonExistentIDs(ctx, ids) if err != nil { return } - if len(idMap) == 0 { return } - var failed bool var prm shard.ContainerCountPrm - e.iterateOverUnsortedShards(func(sh hashedShard) bool { + if err := e.iterateOverUnsortedShards(ctx, func(sh hashedShard) bool { select { case <-ctx.Done(): e.log.Info(ctx, logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err())) @@ -435,13 +445,15 @@ func (e *StorageEngine) processZeroCountContainers(ctx context.Context, ids []ci } return len(idMap) == 0 - }) - + }); err != nil { + e.log.Info(ctx, logs.EngineInterruptProcessingZeroCountContainers, zap.Error(err)) + return + } if failed || len(idMap) == 0 { return } - e.iterateOverUnsortedShards(func(sh hashedShard) bool { + if err := e.iterateOverUnsortedShards(ctx, func(sh hashedShard) bool { select { case <-ctx.Done(): e.log.Info(ctx, logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err())) @@ -459,12 +471,13 @@ func (e *StorageEngine) processZeroCountContainers(ctx context.Context, ids []ci } return false - }) - + }); err != nil { + e.log.Info(ctx, logs.EngineInterruptProcessingZeroCountContainers, zap.Error(err)) + return + } if failed { return } - for id := range idMap { e.metrics.DeleteContainerCount(id.EncodeToString()) } diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index 18fff9cad8..3b0cf74f99 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -41,11 +41,19 @@ func (e *StorageEngine) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, locked []oid.ID) error { for i := range locked { - switch e.lockSingle(ctx, idCnr, locker, locked[i], true) { + st, err := e.lockSingle(ctx, idCnr, locker, locked[i], true) + if err != nil { + return err + } + switch st { case 1: return logicerr.Wrap(new(apistatus.LockNonRegularObject)) case 0: - switch e.lockSingle(ctx, idCnr, locker, locked[i], false) { + st, err = e.lockSingle(ctx, idCnr, locker, locked[i], false) + if err != nil { + return err + } + switch st { case 1: return logicerr.Wrap(new(apistatus.LockNonRegularObject)) case 0: @@ -61,13 +69,13 @@ func (e *StorageEngine) lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l // - 0: fail // - 1: locking irregular object // - 2: ok -func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) { +func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8, retErr error) { // code is pretty similar to inhumeAddr, maybe unify? root := false var addrLocked oid.Address addrLocked.SetContainer(idCnr) addrLocked.SetObject(locked) - e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) { + retErr = e.iterateOverSortedShards(ctx, addrLocked, func(_ int, sh hashedShard) (stop bool) { defer func() { // if object is root we continue since information about it // can be presented in other shards diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index b348d13a23..10cf5ffd5d 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -96,7 +96,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { } var shRes putToShardRes - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + if err := e.iterateOverSortedShards(ctx, addr, func(_ int, sh hashedShard) (stop bool) { e.mtx.RLock() _, ok := e.shards[sh.ID().String()] e.mtx.RUnlock() @@ -106,7 +106,9 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { } shRes = e.putToShard(ctx, sh, addr, prm.Object, prm.IsIndexedContainer) return shRes.status != putToShardUnknown - }) + }); err != nil { + return err + } switch shRes.status { case putToShardUnknown: return errPutShard diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index a468cf5944..7ec4742d8c 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -93,7 +93,9 @@ func (e *StorageEngine) getRange(ctx context.Context, prm RngPrm) (RngRes, error Engine: e, } - it.tryGetWithMeta(ctx) + if err := it.tryGetWithMeta(ctx); err != nil { + return RngRes{}, err + } if it.SplitInfo != nil { return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo)) @@ -109,7 +111,9 @@ func (e *StorageEngine) getRange(ctx context.Context, prm RngPrm) (RngRes, error return RngRes{}, it.OutError } - it.tryGetFromBlobstor(ctx) + if err := it.tryGetFromBlobstor(ctx); err != nil { + return RngRes{}, err + } if it.Object == nil { return RngRes{}, it.OutError @@ -157,8 +161,8 @@ type getRangeShardIterator struct { Engine *StorageEngine } -func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) { - i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) { +func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) error { + return i.Engine.iterateOverSortedShards(ctx, i.Address, func(_ int, sh hashedShard) (stop bool) { noMeta := sh.GetMode().NoMetabase() i.HasDegraded = i.HasDegraded || noMeta i.ShardPrm.SetIgnoreMeta(noMeta) @@ -209,13 +213,13 @@ func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) { }) } -func (i *getRangeShardIterator) tryGetFromBlobstor(ctx context.Context) { +func (i *getRangeShardIterator) tryGetFromBlobstor(ctx context.Context) error { // If the object is not found but is present in metabase, // try to fetch it from blobstor directly. If it is found in any // blobstor, increase the error counter for the shard which contains the meta. i.ShardPrm.SetIgnoreMeta(true) - i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) { + return i.Engine.iterateOverSortedShards(ctx, i.Address, func(_ int, sh hashedShard) (stop bool) { if sh.GetMode().NoMetabase() { // Already processed it without a metabase. return false diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index fc8b4a9a72..4243a54819 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -54,14 +54,15 @@ func (e *StorageEngine) Select(ctx context.Context, prm SelectPrm) (res SelectRe defer elapsed("Select", e.metrics.AddMethodDuration)() err = e.execIfNotBlocked(func() error { - res = e._select(ctx, prm) - return nil + var sErr error + res, sErr = e._select(ctx, prm) + return sErr }) return } -func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) SelectRes { +func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes, error) { addrList := make([]oid.Address, 0) uniqueMap := make(map[string]struct{}) @@ -69,7 +70,7 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) SelectRes { shPrm.SetContainerID(prm.cnr, prm.indexedContainer) shPrm.SetFilters(prm.filters) - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + if err := e.iterateOverUnsortedShards(ctx, func(sh hashedShard) (stop bool) { res, err := sh.Select(ctx, shPrm) if err != nil { e.reportShardError(ctx, sh, "could not select objects from shard", err) @@ -84,11 +85,13 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) SelectRes { } return false - }) + }); err != nil { + return SelectRes{}, err + } return SelectRes{ addrList: addrList, - } + }, nil } // List returns `limit` available physically storage object addresses in engine. @@ -98,20 +101,21 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) SelectRes { func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) { defer elapsed("List", e.metrics.AddMethodDuration)() err = e.execIfNotBlocked(func() error { - res = e.list(ctx, limit) - return nil + var lErr error + res, lErr = e.list(ctx, limit) + return lErr }) return } -func (e *StorageEngine) list(ctx context.Context, limit uint64) SelectRes { +func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, error) { addrList := make([]oid.Address, 0, limit) uniqueMap := make(map[string]struct{}) ln := uint64(0) // consider iterating over shuffled shards - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + if err := e.iterateOverUnsortedShards(ctx, func(sh hashedShard) (stop bool) { res, err := sh.List(ctx) // consider limit result of shard iterator if err != nil { e.reportShardError(ctx, sh, "could not select objects from shard", err) @@ -130,11 +134,13 @@ func (e *StorageEngine) list(ctx context.Context, limit uint64) SelectRes { } return false - }) + }); err != nil { + return SelectRes{}, err + } return SelectRes{ addrList: addrList, - } + }, nil } // Select selects objects from local storage using provided filters. diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 6e6c08bb55..69067c500b 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -280,20 +280,32 @@ func (e *StorageEngine) unsortedShards() []hashedShard { return shards } -func (e *StorageEngine) iterateOverSortedShards(addr oid.Address, handler func(int, hashedShard) (stop bool)) { +func (e *StorageEngine) iterateOverSortedShards(ctx context.Context, addr oid.Address, handler func(int, hashedShard) (stop bool)) error { for i, sh := range e.sortShards(addr) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } if handler(i, sh) { break } } + return nil } -func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (stop bool)) { +func (e *StorageEngine) iterateOverUnsortedShards(ctx context.Context, handler func(hashedShard) (stop bool)) error { for _, sh := range e.unsortedShards() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } if handler(sh) { break } } + return nil } // SetShardMode sets mode of the shard with provided identifier. @@ -433,7 +445,7 @@ func (e *StorageEngine) ListShardsForObject(ctx context.Context, obj oid.Address var siErr *objectSDK.SplitInfoError var ecErr *objectSDK.ECInfoError - e.iterateOverUnsortedShards(func(hs hashedShard) (stop bool) { + if itErr := e.iterateOverUnsortedShards(ctx, func(hs hashedShard) (stop bool) { res, exErr := hs.Exists(ctx, prm) if exErr != nil { if client.IsErrObjectAlreadyRemoved(exErr) { @@ -463,6 +475,8 @@ func (e *StorageEngine) ListShardsForObject(ctx context.Context, obj oid.Address info = append(info, hs.DumpInfo()) } return false - }) + }); itErr != nil { + return nil, itErr + } return info, err }