Anton Nikiforov
108e4e07be
All checks were successful
DCO action / DCO (pull_request) Successful in 1m31s
Vulncheck / Vulncheck (pull_request) Successful in 1m32s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m12s
Tests and linters / Run gofumpt (pull_request) Successful in 2m21s
Build / Build Components (pull_request) Successful in 2m32s
Tests and linters / gopls check (pull_request) Successful in 2m36s
Tests and linters / Staticcheck (pull_request) Successful in 2m55s
Tests and linters / Tests with -race (pull_request) Successful in 3m26s
Tests and linters / Lint (pull_request) Successful in 3m31s
Tests and linters / Tests (pull_request) Successful in 3m40s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
192 lines
5.7 KiB
Go
192 lines
5.7 KiB
Go
package shard
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// storFetcher is a type to unify object fetching mechanism in `fetchObjectData`
|
|
// method. It represents generalization of `getSmall` and `getBig` methods.
|
|
type storFetcher = func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error)
|
|
|
|
// GetPrm groups the parameters of Get operation.
|
|
type GetPrm struct {
|
|
addr oid.Address
|
|
skipMeta bool
|
|
skipEvacCheck bool
|
|
}
|
|
|
|
// GetRes groups the resulting values of Get operation.
|
|
type GetRes struct {
|
|
obj *objectSDK.Object
|
|
hasMeta bool
|
|
}
|
|
|
|
// SetAddress is a Get option to set the address of the requested object.
|
|
//
|
|
// Option is required.
|
|
func (p *GetPrm) SetAddress(addr oid.Address) {
|
|
p.addr = addr
|
|
}
|
|
|
|
// SetIgnoreMeta is a Get option try to fetch object from blobstor directly,
|
|
// without accessing metabase.
|
|
func (p *GetPrm) SetIgnoreMeta(ignore bool) {
|
|
p.skipMeta = ignore
|
|
}
|
|
|
|
// SkipEvacCheck is a Get option which instruct to skip check is evacuation in progress.
|
|
func (p *GetPrm) SkipEvacCheck(val bool) {
|
|
p.skipEvacCheck = val
|
|
}
|
|
|
|
// Object returns the requested object.
|
|
func (r GetRes) Object() *objectSDK.Object {
|
|
return r.obj
|
|
}
|
|
|
|
// HasMeta returns true if info about the object was found in the metabase.
|
|
func (r GetRes) HasMeta() bool {
|
|
return r.hasMeta
|
|
}
|
|
|
|
// Get reads an object from shard.
|
|
//
|
|
// Returns any error encountered that
|
|
// did not allow to completely read the object part.
|
|
//
|
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard.
|
|
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
|
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
|
// Returns the ErrShardDisabled if the shard is disabled.
|
|
func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Get",
|
|
trace.WithAttributes(
|
|
attribute.String("shard_id", s.ID().String()),
|
|
attribute.String("address", prm.addr.EncodeToString()),
|
|
attribute.Bool("skip_meta", prm.skipMeta),
|
|
))
|
|
defer span.End()
|
|
|
|
s.m.RLock()
|
|
defer s.m.RUnlock()
|
|
|
|
if s.info.Mode.Disabled() {
|
|
return GetRes{}, ErrShardDisabled
|
|
}
|
|
|
|
if s.info.EvacuationInProgress && !prm.skipEvacCheck {
|
|
return GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
|
}
|
|
|
|
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
|
|
var getPrm common.GetPrm
|
|
getPrm.Address = prm.addr
|
|
getPrm.StorageID = id
|
|
|
|
res, err := stor.Get(ctx, getPrm)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res.Object, nil
|
|
}
|
|
|
|
wc := func(c writecache.Cache) (*objectSDK.Object, error) {
|
|
return c.Get(ctx, prm.addr)
|
|
}
|
|
|
|
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
|
|
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)
|
|
|
|
return GetRes{
|
|
obj: obj,
|
|
hasMeta: hasMeta,
|
|
}, err
|
|
}
|
|
|
|
// emptyStorageID is an empty storageID that indicates that
|
|
// an object is big (and is stored in an FSTree, not in a blobovnicza).
|
|
var emptyStorageID = make([]byte, 0)
|
|
|
|
// fetchObjectData looks through writeCache and blobStor to find object.
|
|
func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
|
|
var (
|
|
mErr error
|
|
mRes meta.ExistsRes
|
|
)
|
|
|
|
if !skipMeta {
|
|
var mPrm meta.ExistsPrm
|
|
mPrm.SetAddress(addr)
|
|
mRes, mErr = s.metaBase.Exists(ctx, mPrm)
|
|
if mErr != nil && !s.info.Mode.NoMetabase() {
|
|
return nil, false, mErr
|
|
}
|
|
|
|
if !mRes.Exists() {
|
|
return nil, false, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
|
}
|
|
} else {
|
|
s.log.Warn(logs.ShardFetchingObjectWithoutMeta, zap.Stringer("addr", addr))
|
|
}
|
|
|
|
if s.hasWriteCache() {
|
|
res, err := wc(s.writeCache)
|
|
if err == nil || IsErrOutOfRange(err) {
|
|
return res, false, err
|
|
}
|
|
if client.IsErrObjectNotFound(err) {
|
|
s.log.Debug(logs.ShardObjectIsMissingInWritecache,
|
|
zap.Stringer("addr", addr),
|
|
zap.Bool("skip_meta", skipMeta),
|
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
} else {
|
|
s.log.Error(logs.ShardFailedToFetchObjectFromWritecache,
|
|
zap.Error(err),
|
|
zap.Stringer("addr", addr),
|
|
zap.Bool("skip_meta", skipMeta),
|
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
}
|
|
}
|
|
if skipMeta || mErr != nil {
|
|
res, err := cb(s.blobStor, nil)
|
|
return res, false, err
|
|
}
|
|
|
|
var mPrm meta.StorageIDPrm
|
|
mPrm.SetAddress(addr)
|
|
|
|
mExRes, err := s.metaBase.StorageID(ctx, mPrm)
|
|
if err != nil {
|
|
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
|
}
|
|
|
|
storageID := mExRes.StorageID()
|
|
if storageID == nil {
|
|
// `nil` storageID returned without any error
|
|
// means that object is big, `cb` expects an
|
|
// empty (but non-nil) storageID in such cases
|
|
storageID = emptyStorageID
|
|
}
|
|
|
|
res, err := cb(s.blobStor, storageID)
|
|
|
|
return res, true, err
|
|
}
|