2cb7ec9633
In the previous implementation any non-nil error that preceded object fetching from blobstor led to iterating over every storage (in other words, no storage ID information was taken into account). Now storage ID is skipped only if metabase (storage ID source) returns any error. Signed-off-by: Pavel Karpy <p.karpy@yadro.com> Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
146 lines
3.9 KiB
Go
146 lines
3.9 KiB
Go
package shard
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
meta "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
|
apistatus "github.com/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
objectSDK "github.com/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// storFetcher is a type to unify object fetching mechanism in `fetchObjectData`
|
|
// method. It represents generalization of `getSmall` and `getBig` methods.
|
|
type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error)
|
|
|
|
// GetPrm groups the parameters of Get operation.
|
|
type GetPrm struct {
|
|
addr oid.Address
|
|
skipMeta bool
|
|
}
|
|
|
|
// GetRes groups the resulting values of Get operation.
|
|
type GetRes struct {
|
|
obj *objectSDK.Object
|
|
hasMeta bool
|
|
}
|
|
|
|
// SetAddress is a Get option to set the address of the requested object.
|
|
//
|
|
// Option is required.
|
|
func (p *GetPrm) SetAddress(addr oid.Address) {
|
|
p.addr = addr
|
|
}
|
|
|
|
// SetIgnoreMeta is a Get option try to fetch object from blobstor directly,
|
|
// without accessing metabase.
|
|
func (p *GetPrm) SetIgnoreMeta(ignore bool) {
|
|
p.skipMeta = ignore
|
|
}
|
|
|
|
// Object returns the requested object.
|
|
func (r GetRes) Object() *objectSDK.Object {
|
|
return r.obj
|
|
}
|
|
|
|
// HasMeta returns true if info about the object was found in the metabase.
|
|
func (r GetRes) HasMeta() bool {
|
|
return r.hasMeta
|
|
}
|
|
|
|
// Get reads an object from shard.
|
|
//
|
|
// Returns any error encountered that
|
|
// did not allow to completely read the object part.
|
|
//
|
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard.
|
|
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
|
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
|
func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
|
s.m.RLock()
|
|
defer s.m.RUnlock()
|
|
|
|
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
|
|
var getPrm common.GetPrm
|
|
getPrm.Address = prm.addr
|
|
getPrm.StorageID = id
|
|
|
|
res, err := stor.Get(getPrm)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res.Object, nil
|
|
}
|
|
|
|
wc := func(c writecache.Cache) (*objectSDK.Object, error) {
|
|
return c.Get(prm.addr)
|
|
}
|
|
|
|
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
|
|
obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, cb, wc)
|
|
|
|
return GetRes{
|
|
obj: obj,
|
|
hasMeta: hasMeta,
|
|
}, err
|
|
}
|
|
|
|
// fetchObjectData looks through writeCache and blobStor to find object.
|
|
func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
|
|
var (
|
|
mErr error
|
|
mRes meta.ExistsRes
|
|
)
|
|
|
|
var exists bool
|
|
if !skipMeta {
|
|
var mPrm meta.ExistsPrm
|
|
mPrm.SetAddress(addr)
|
|
|
|
mRes, mErr = s.metaBase.Exists(mPrm)
|
|
if mErr != nil && !s.info.Mode.NoMetabase() {
|
|
return nil, false, mErr
|
|
}
|
|
exists = mRes.Exists()
|
|
}
|
|
|
|
if s.hasWriteCache() {
|
|
res, err := wc(s.writeCache)
|
|
if err == nil || IsErrOutOfRange(err) {
|
|
return res, false, err
|
|
}
|
|
|
|
if IsErrNotFound(err) {
|
|
s.log.Debug("object is missing in write-cache")
|
|
} else {
|
|
s.log.Error("failed to fetch object from write-cache", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
if skipMeta || mErr != nil {
|
|
res, err := cb(s.blobStor, nil)
|
|
return res, false, err
|
|
}
|
|
|
|
if !exists {
|
|
return nil, false, logicerr.Wrap(apistatus.ObjectNotFound{})
|
|
}
|
|
|
|
var mPrm meta.StorageIDPrm
|
|
mPrm.SetAddress(addr)
|
|
|
|
mExRes, err := s.metaBase.StorageID(mPrm)
|
|
if err != nil {
|
|
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
|
}
|
|
|
|
res, err := cb(s.blobStor, mExRes.StorageID())
|
|
|
|
return res, true, err
|
|
}
|