diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index 4535cad3f..4904d62ac 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -9,11 +9,9 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/cockroachdb/pebble" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -78,9 +76,8 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { currEpoch := db.epochState.CurrentEpoch() - err = db.database.View(func(tx *bbolt.Tx) error { - key := make([]byte, addressKeySize) - res.hdr, err = get(tx, prm.addr, key, true, prm.raw, currEpoch) + err = db.snapshot(func(s *pebble.Snapshot) error { + res.hdr, err = get(ctx, s, prm.addr, true, prm.raw, currEpoch) return err }) @@ -90,7 +87,11 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { func get(ctx context.Context, r pebble.Reader, addr oid.Address, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) { if checkStatus { - switch objectStatus(tx, addr, currEpoch) { + st, err := objectStatus(ctx, r, addr, currEpoch) + if err != nil { + return nil, err + } + switch st { case 1: return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) case 2: @@ -99,78 +100,76 @@ func get(ctx context.Context, r pebble.Reader, addr oid.Address, checkStatus, ra return nil, ErrObjectIsExpired } } - - key = objectKey(addr.Object(), key) - cnr := addr.Container() obj := objectSDK.New() - bucketName := make([]byte, bucketKeySize) // check in primary index - data := getFromBucket(tx, primaryBucketName(cnr, bucketName), key) - if len(data) != 0 { + data, err := valueSafe(r, primaryKey(addr.Container(), addr.Object())) + if err != nil { + return nil, err + } + if data != nil { return obj, obj.Unmarshal(data) } - data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key) - if len(data) != 0 { - return nil, getECInfoError(tx, cnr, data) + children, err := selectByPrefixBatch(ctx, r, ecInfoLongKeyPrefix(addr.Container(), addr.Object()), 1) // try to found any child + if err != nil { + return nil, err + } + if len(children) > 0 { + return nil, getECInfoError(ctx, r, addr) } // if not found then check in tombstone index - data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key) - if len(data) != 0 { + data, err = valueSafe(r, tombstoneKey(addr.Container(), addr.Object())) + if err != nil { + return nil, err + } + if data != nil { return obj, obj.Unmarshal(data) } // if not found then check in locker index - data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key) - if len(data) != 0 { + data, err = valueSafe(r, lockersKey(addr.Container(), addr.Object())) + if err != nil { + return nil, err + } + if data != nil { return obj, obj.Unmarshal(data) } // if not found then check if object is a virtual - return getVirtualObject(tx, cnr, key, raw) + return getVirtualObject(ctx, r, addr, raw) } -func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte { - bkt := tx.Bucket(name) - if bkt == nil { - return nil - } - - return bkt.Get(key) -} - -func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) { +func getVirtualObject(ctx context.Context, r pebble.Reader, addr oid.Address, raw bool) (*objectSDK.Object, error) { if raw { - return nil, getSplitInfoError(tx, cnr, key) + return nil, getSplitInfoError(r, addr) } - bucketName := make([]byte, bucketKeySize) - parentBucket := tx.Bucket(parentBucketName(cnr, bucketName)) - if parentBucket == nil { - return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - - relativeLst, err := decodeList(parentBucket.Get(key)) + binObjIDs, err := selectByPrefixBatch(ctx, r, parentKeyLongPrefix(addr.Container(), addr.Object()), 1) if err != nil { return nil, err } - if len(relativeLst) == 0 { // this should never happen though + if len(binObjIDs) == 0 { // this should never happen though return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) } - // pick last item, for now there is not difference which address to pick - // but later list might be sorted so first or last value can be more - // prioritized to choose - virtualOID := relativeLst[len(relativeLst)-1] - data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID) - - child := objectSDK.New() - - err = child.Unmarshal(data) + phyObjAddr, err := addressOfTargetFromParentKey(binObjIDs[0]) if err != nil { + return nil, err + } + + data, err := valueSafe(r, primaryKey(phyObjAddr.Container(), phyObjAddr.Object())) + if err != nil { + return nil, err + } + + if data == nil { // this should never happen though #2 + return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + child := objectSDK.New() + if err := child.Unmarshal(data); err != nil { return nil, fmt.Errorf("can't unmarshal child with parent: %w", err) } @@ -183,8 +182,8 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD return par, nil } -func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error { - splitInfo, err := getSplitInfo(tx, cnr, key) +func getSplitInfoError(r pebble.Reader, addr oid.Address) error { + splitInfo, err := getSplitInfo(r, addr) if err == nil { return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) }