frostfs-node/pkg/local_object_storage/shard/range.go

138 lines
3.9 KiB
Go

package shard
import (
"context"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// RngPrm groups the parameters of GetRange operation.
type RngPrm struct {
ln uint64
off uint64
addr oid.Address
skipMeta bool
}
// RngRes groups the resulting values of GetRange operation.
type RngRes struct {
obj *objectSDK.Object
hasMeta bool
}
// SetAddress is a Rng option to set the address of the requested object.
//
// Option is required.
func (p *RngPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// SetRange is a GetRange option to set range of requested payload data.
func (p *RngPrm) SetRange(off uint64, ln uint64) {
p.off, p.ln = off, ln
}
// SetIgnoreMeta is a Get option try to fetch object from blobstor directly,
// without accessing metabase.
func (p *RngPrm) SetIgnoreMeta(ignore bool) {
p.skipMeta = ignore
}
// Object returns the requested object part.
//
// Instance payload contains the requested range of the original object.
func (r RngRes) Object() *objectSDK.Object {
return r.obj
}
// HasMeta returns true if info about the object was found in the metabase.
func (r RngRes) HasMeta() bool {
return r.hasMeta
}
// GetRange reads part of an object from shard.
//
// Returns any error encountered that
// did not allow to completely read the object part.
//
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
// Returns the objectSDK.ErrObjectIsExpired if the object is presented but already expired.
// Returns the ErrShardDisabled if the shard is disabled.
func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetRange",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", prm.addr.EncodeToString()),
attribute.Bool("skip_meta", prm.skipMeta),
attribute.String("offset", strconv.FormatUint(prm.off, 10)),
attribute.String("length", strconv.FormatUint(prm.ln, 10)),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.Disabled() {
return RngRes{}, ErrShardDisabled
}
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
var getRngPrm common.GetRangePrm
getRngPrm.Address = prm.addr
getRngPrm.Range.SetOffset(prm.off)
getRngPrm.Range.SetLength(prm.ln)
getRngPrm.StorageID = id
res, err := stor.GetRange(ctx, getRngPrm)
if err != nil {
return nil, err
}
obj := objectSDK.New()
obj.SetPayload(res.Data)
return obj, nil
}
wc := func(c writecache.Cache) (*objectSDK.Object, error) {
res, err := c.Get(ctx, prm.addr)
if err != nil {
return nil, err
}
payload := res.Payload()
from := prm.off
to := from + prm.ln
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return nil, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
}
obj := objectSDK.New()
obj.SetPayload(payload[from:to])
return obj, nil
}
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
obj, hasMeta, err := s.fetchObjectData(ctx, prm.addr, skipMeta, cb, wc)
return RngRes{
obj: obj,
hasMeta: hasMeta,
}, err
}