diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 787a7bac8..4d0a30bc8 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -52,124 +52,134 @@ func (e *StorageEngine) Get(prm GetPrm) (res GetRes, err error) { return } -// nolint: funlen func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { if e.metrics != nil { defer elapsed(e.metrics.AddGetDuration)() } - var ( - obj *objectSDK.Object - siErr *objectSDK.SplitInfoError - - errNotFound apistatus.ObjectNotFound - - outSI *objectSDK.SplitInfo - outError error = errNotFound - - shardWithMeta hashedShard - metaError error - ) + var errNotFound apistatus.ObjectNotFound var shPrm shard.GetPrm shPrm.SetAddress(prm.addr) - var hasDegraded bool - var objectExpired bool - - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { - noMeta := sh.GetMode().NoMetabase() - shPrm.SetIgnoreMeta(noMeta) - - hasDegraded = hasDegraded || noMeta - - res, err := sh.Get(shPrm) - if err != nil { - if res.HasMeta() { - shardWithMeta = sh - metaError = err - } - switch { - case shard.IsErrNotFound(err): - return false // ignore, go to next shard - case errors.As(err, &siErr): - if outSI == nil { - outSI = objectSDK.NewSplitInfo() - } - - util.MergeSplitInfo(siErr.SplitInfo(), outSI) - - _, withLink := outSI.Link() - _, withLast := outSI.LastPart() - - // stop iterating over shards if SplitInfo structure is complete - if withLink && withLast { - return true - } - - return false - case shard.IsErrRemoved(err): - outError = err - - return true // stop, return it back - case shard.IsErrObjectExpired(err): - // object is found but should not - // be returned - objectExpired = true - return true - default: - e.reportShardError(sh, "could not get object from shard", err) - return false - } - } - - obj = res.Object() - - return true - }) - - if outSI != nil { - return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + it := &getShardIterator{ + OutError: errNotFound, + ShardPrm: shPrm, + Address: prm.addr, + Engine: e, } - if objectExpired { + it.tryGetWithMeta() + + if it.SplitInfo != nil { + return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo)) + } + + if it.ObjectExpired { return GetRes{}, errNotFound } - if obj == nil { - if !hasDegraded && shardWithMeta.Shard == nil || !shard.IsErrNotFound(outError) { - return GetRes{}, outError + if it.Object == nil { + if !it.HasDegraded && it.ShardWithMeta.Shard == nil || !shard.IsErrNotFound(it.OutError) { + return GetRes{}, it.OutError } - // If the object is not found but is present in metabase, - // try to fetch it from blobstor directly. If it is found in any - // blobstor, increase the error counter for the shard which contains the meta. - shPrm.SetIgnoreMeta(true) + it.tryGetFromBlobstore() - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { - if sh.GetMode().NoMetabase() { - // Already visited. - return false - } - - res, err := sh.Get(shPrm) - obj = res.Object() - return err == nil - }) - if obj == nil { - return GetRes{}, outError + if it.Object == nil { + return GetRes{}, it.OutError } - if shardWithMeta.Shard != nil { - e.reportShardError(shardWithMeta, "meta info was present, but object is missing", - metaError, zap.Stringer("address", prm.addr)) + if it.ShardWithMeta.Shard != nil { + e.reportShardError(it.ShardWithMeta, "meta info was present, but object is missing", + it.MetaError, zap.Stringer("address", prm.addr)) } } return GetRes{ - obj: obj, + obj: it.Object, }, nil } +type getShardIterator struct { + Object *objectSDK.Object + SplitInfo *objectSDK.SplitInfo + OutError error + ShardWithMeta hashedShard + MetaError error + HasDegraded bool + ObjectExpired bool + + ShardPrm shard.GetPrm + Address oid.Address + Engine *StorageEngine + + splitInfoErr *objectSDK.SplitInfoError +} + +func (i *getShardIterator) tryGetWithMeta() { + i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) { + noMeta := sh.GetMode().NoMetabase() + i.ShardPrm.SetIgnoreMeta(noMeta) + + i.HasDegraded = i.HasDegraded || noMeta + + res, err := sh.Get(i.ShardPrm) + if err == nil { + i.Object = res.Object() + return true + } + + if res.HasMeta() { + i.ShardWithMeta = sh + i.MetaError = err + } + switch { + case shard.IsErrNotFound(err): + return false // ignore, go to next shard + case errors.As(err, &i.splitInfoErr): + if i.SplitInfo == nil { + i.SplitInfo = objectSDK.NewSplitInfo() + } + + util.MergeSplitInfo(i.splitInfoErr.SplitInfo(), i.SplitInfo) + + _, withLink := i.SplitInfo.Link() + _, withLast := i.SplitInfo.LastPart() + + // stop iterating over shards if SplitInfo structure is complete + return withLink && withLast + case shard.IsErrRemoved(err): + i.OutError = err + return true // stop, return it back + case shard.IsErrObjectExpired(err): + // object is found but should not be returned + i.ObjectExpired = true + return true + default: + i.Engine.reportShardError(sh, "could not get object from shard", err) + return false + } + }) +} + +func (i *getShardIterator) tryGetFromBlobstore() { + // If the object is not found but is present in metabase, + // try to fetch it from blobstor directly. If it is found in any + // blobstor, increase the error counter for the shard which contains the meta. + i.ShardPrm.SetIgnoreMeta(true) + + i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) { + if sh.GetMode().NoMetabase() { + // Already visited. + return false + } + + res, err := sh.Get(i.ShardPrm) + i.Object = res.Object() + return err == nil + }) +} + // Get reads object from local storage by provided address. func Get(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { var getPrm GetPrm