[#9999] metabase: Fix db engine to pebble in get.go

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-07-02 09:37:52 +03:00
parent bfb0bc3148
commit d3898ebd2c

View file

@ -9,11 +9,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/cockroachdb/pebble"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -78,9 +76,8 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
currEpoch := db.epochState.CurrentEpoch()
err = db.database.View(func(tx *bbolt.Tx) error {
key := make([]byte, addressKeySize)
res.hdr, err = get(tx, prm.addr, key, true, prm.raw, currEpoch)
err = db.snapshot(func(s *pebble.Snapshot) error {
res.hdr, err = get(ctx, s, prm.addr, true, prm.raw, currEpoch)
return err
})
@ -90,7 +87,11 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
func get(ctx context.Context, r pebble.Reader, addr oid.Address, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
if checkStatus {
switch objectStatus(tx, addr, currEpoch) {
st, err := objectStatus(ctx, r, addr, currEpoch)
if err != nil {
return nil, err
}
switch st {
case 1:
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
case 2:
@ -99,78 +100,76 @@ func get(ctx context.Context, r pebble.Reader, addr oid.Address, checkStatus, ra
return nil, ErrObjectIsExpired
}
}
key = objectKey(addr.Object(), key)
cnr := addr.Container()
obj := objectSDK.New()
bucketName := make([]byte, bucketKeySize)
// check in primary index
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), key)
if len(data) != 0 {
data, err := valueSafe(r, primaryKey(addr.Container(), addr.Object()))
if err != nil {
return nil, err
}
if data != nil {
return obj, obj.Unmarshal(data)
}
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
if len(data) != 0 {
return nil, getECInfoError(tx, cnr, data)
children, err := selectByPrefixBatch(ctx, r, ecInfoLongKeyPrefix(addr.Container(), addr.Object()), 1) // try to found any child
if err != nil {
return nil, err
}
if len(children) > 0 {
return nil, getECInfoError(ctx, r, addr)
}
// if not found then check in tombstone index
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
if len(data) != 0 {
data, err = valueSafe(r, tombstoneKey(addr.Container(), addr.Object()))
if err != nil {
return nil, err
}
if data != nil {
return obj, obj.Unmarshal(data)
}
// if not found then check in locker index
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
if len(data) != 0 {
data, err = valueSafe(r, lockersKey(addr.Container(), addr.Object()))
if err != nil {
return nil, err
}
if data != nil {
return obj, obj.Unmarshal(data)
}
// if not found then check if object is a virtual
return getVirtualObject(tx, cnr, key, raw)
return getVirtualObject(ctx, r, addr, raw)
}
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
bkt := tx.Bucket(name)
if bkt == nil {
return nil
}
return bkt.Get(key)
}
func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) {
func getVirtualObject(ctx context.Context, r pebble.Reader, addr oid.Address, raw bool) (*objectSDK.Object, error) {
if raw {
return nil, getSplitInfoError(tx, cnr, key)
return nil, getSplitInfoError(r, addr)
}
bucketName := make([]byte, bucketKeySize)
parentBucket := tx.Bucket(parentBucketName(cnr, bucketName))
if parentBucket == nil {
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
relativeLst, err := decodeList(parentBucket.Get(key))
binObjIDs, err := selectByPrefixBatch(ctx, r, parentKeyLongPrefix(addr.Container(), addr.Object()), 1)
if err != nil {
return nil, err
}
if len(relativeLst) == 0 { // this should never happen though
if len(binObjIDs) == 0 { // this should never happen though
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
// pick last item, for now there is not difference which address to pick
// but later list might be sorted so first or last value can be more
// prioritized to choose
virtualOID := relativeLst[len(relativeLst)-1]
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
child := objectSDK.New()
err = child.Unmarshal(data)
phyObjAddr, err := addressOfTargetFromParentKey(binObjIDs[0])
if err != nil {
return nil, err
}
data, err := valueSafe(r, primaryKey(phyObjAddr.Container(), phyObjAddr.Object()))
if err != nil {
return nil, err
}
if data == nil { // this should never happen though #2
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
child := objectSDK.New()
if err := child.Unmarshal(data); err != nil {
return nil, fmt.Errorf("can't unmarshal child with parent: %w", err)
}
@ -183,8 +182,8 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
return par, nil
}
func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
splitInfo, err := getSplitInfo(tx, cnr, key)
func getSplitInfoError(r pebble.Reader, addr oid.Address) error {
splitInfo, err := getSplitInfo(r, addr)
if err == nil {
return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
}