diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 491b226b..25b533bd 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -65,122 +65,51 @@ func (e *StorageEngine) GetRange(prm RngPrm) (res RngRes, err error) { return } -// nolint: funlen func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { if e.metrics != nil { defer elapsed(e.metrics.AddRangeDuration)() } - var ( - obj *objectSDK.Object - siErr *objectSDK.SplitInfoError - - errNotFound apistatus.ObjectNotFound - - outSI *objectSDK.SplitInfo - outError error = errNotFound - - shardWithMeta hashedShard - metaError error - ) - - var hasDegraded bool + var errNotFound apistatus.ObjectNotFound var shPrm shard.RngPrm shPrm.SetAddress(prm.addr) shPrm.SetRange(prm.off, prm.ln) - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { - noMeta := sh.GetMode().NoMetabase() - hasDegraded = hasDegraded || noMeta - shPrm.SetIgnoreMeta(noMeta) - - res, err := sh.GetRange(shPrm) - if err != nil { - if res.HasMeta() { - shardWithMeta = sh - metaError = err - } - switch { - case shard.IsErrNotFound(err): - return false // ignore, go to next shard - case errors.As(err, &siErr): - if outSI == nil { - outSI = objectSDK.NewSplitInfo() - } - - util.MergeSplitInfo(siErr.SplitInfo(), outSI) - - _, withLink := outSI.Link() - _, withLast := outSI.LastPart() - - // stop iterating over shards if SplitInfo structure is complete - if withLink && withLast { - return true - } - - return false - case - shard.IsErrRemoved(err), - shard.IsErrOutOfRange(err): - outError = err - - return true // stop, return it back - default: - e.reportShardError(sh, "could not get object from shard", err) - return false - } - } - - obj = res.Object() - - return true - }) - - if outSI != nil { - return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + it := &getRangeShardIterator{ + OutError: errNotFound, + ShardPrm: shPrm, + Address: prm.addr, + Engine: e, } - if obj == nil { + it.tryGetWithMeta() + + if it.SplitInfo != nil { + return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo)) + } + + if it.Object == nil { // If any shard is in a degraded mode, we should assume that metabase could store // info about some object. - if shardWithMeta.Shard == nil && !hasDegraded || !shard.IsErrNotFound(outError) { - return RngRes{}, outError + if it.ShardWithMeta.Shard == nil && !it.HasDegraded || !shard.IsErrNotFound(it.OutError) { + return RngRes{}, it.OutError } - // If the object is not found but is present in metabase, - // try to fetch it from blobstor directly. If it is found in any - // blobstor, increase the error counter for the shard which contains the meta. - shPrm.SetIgnoreMeta(true) + it.tryGetFromBlobstor() - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { - if sh.GetMode().NoMetabase() { - // Already processed it without a metabase. - return false - } - - res, err := sh.GetRange(shPrm) - if shard.IsErrOutOfRange(err) { - var errOutOfRange apistatus.ObjectOutOfRange - - outError = errOutOfRange - return true - } - obj = res.Object() - return err == nil - }) - if obj == nil { - return RngRes{}, outError + if it.Object == nil { + return RngRes{}, it.OutError } - if shardWithMeta.Shard != nil { - e.reportShardError(shardWithMeta, "meta info was present, but object is missing", - metaError, + if it.ShardWithMeta.Shard != nil { + e.reportShardError(it.ShardWithMeta, "meta info was present, but object is missing", + it.MetaError, zap.Stringer("address", prm.addr)) } } return RngRes{ - obj: obj, + obj: it.Object, }, nil } @@ -197,3 +126,85 @@ func GetRange(storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([ return res.Object().Payload(), nil } + +type getRangeShardIterator struct { + Object *objectSDK.Object + SplitInfoError *objectSDK.SplitInfoError + SplitInfo *objectSDK.SplitInfo + OutError error + ShardWithMeta hashedShard + MetaError error + HasDegraded bool + + ShardPrm shard.RngPrm + Address oid.Address + Engine *StorageEngine +} + +func (i *getRangeShardIterator) tryGetWithMeta() { + i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) { + noMeta := sh.GetMode().NoMetabase() + i.HasDegraded = i.HasDegraded || noMeta + i.ShardPrm.SetIgnoreMeta(noMeta) + + res, err := sh.GetRange(i.ShardPrm) + if err == nil { + i.Object = res.Object() + return true + } + + if res.HasMeta() { + i.ShardWithMeta = sh + i.MetaError = err + } + switch { + case shard.IsErrNotFound(err): + return false // ignore, go to next shard + case errors.As(err, &i.SplitInfoError): + if i.SplitInfo == nil { + i.SplitInfo = objectSDK.NewSplitInfo() + } + + util.MergeSplitInfo(i.SplitInfoError.SplitInfo(), i.SplitInfo) + + _, withLink := i.SplitInfo.Link() + _, withLast := i.SplitInfo.LastPart() + + // stop iterating over shards if SplitInfo structure is complete + return withLink && withLast + case + shard.IsErrRemoved(err), + shard.IsErrOutOfRange(err): + i.OutError = err + + return true // stop, return it back + default: + i.Engine.reportShardError(sh, "could not get object from shard", err) + return false + } + }) +} + +func (i *getRangeShardIterator) tryGetFromBlobstor() { + // If the object is not found but is present in metabase, + // try to fetch it from blobstor directly. If it is found in any + // blobstor, increase the error counter for the shard which contains the meta. + i.ShardPrm.SetIgnoreMeta(true) + + i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) { + if sh.GetMode().NoMetabase() { + // Already processed it without a metabase. + return false + } + + res, err := sh.GetRange(i.ShardPrm) + if shard.IsErrOutOfRange(err) { + var errOutOfRange apistatus.ObjectOutOfRange + + i.OutError = errOutOfRange + return true + } + i.Object = res.Object() + return err == nil + }) +}