package meta import ( "bytes" "context" "fmt" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) // GetPrm groups the parameters of Get operation. type GetPrm struct { addr oid.Address raw bool } // GetRes groups the resulting values of Get operation. type GetRes struct { hdr *objectSDK.Object } // SetAddress is a Get option to set the address of the requested object. // // Option is required. func (p *GetPrm) SetAddress(addr oid.Address) { p.addr = addr } // SetRaw is a Get option to set raw flag value. If flag is unset, then Get // returns header of virtual object, otherwise it returns SplitInfo of virtual // object. func (p *GetPrm) SetRaw(raw bool) { p.raw = raw } // Header returns the requested object header. func (r GetRes) Header() *objectSDK.Object { return r.hdr } // Get returns object header for specified address. // // Returns an error of type apistatus.ObjectNotFound if object is missing in DB. // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { var ( startedAt = time.Now() success = false ) defer func() { db.metrics.AddMethodDuration("Get", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "metabase.Get", trace.WithAttributes( attribute.String("address", prm.addr.EncodeToString()), attribute.Bool("raw", prm.raw), )) defer span.End() db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { return res, ErrDegradedMode } currEpoch := db.epochState.CurrentEpoch() err = db.boltDB.View(func(tx *bbolt.Tx) error { key := make([]byte, addressKeySize) res.hdr, err = db.get(tx, prm.addr, key, true, prm.raw, currEpoch) return err }) success = err == nil return res, metaerr.Wrap(err) } func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) { if checkStatus { switch objectStatus(tx, addr, currEpoch) { case 1: return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) case 2: return nil, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved)) case 3: return nil, ErrObjectIsExpired } } key = objectKey(addr.Object(), key) cnr := addr.Container() obj := objectSDK.New() bucketName := make([]byte, bucketKeySize) // check in primary index data := getFromBucket(tx, primaryBucketName(cnr, bucketName), key) if len(data) != 0 { return obj, obj.Unmarshal(bytes.Clone(data)) } data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key) if len(data) != 0 { return nil, getECInfoError(tx, cnr, data) } // if not found then check in tombstone index data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key) if len(data) != 0 { return obj, obj.Unmarshal(bytes.Clone(data)) } // if not found then check in locker index data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key) if len(data) != 0 { return obj, obj.Unmarshal(bytes.Clone(data)) } // if not found then check if object is a virtual return getVirtualObject(tx, cnr, key, raw) } func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte { bkt := tx.Bucket(name) if bkt == nil { return nil } return bkt.Get(key) } func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) { if raw { return nil, getSplitInfoError(tx, cnr, key) } bucketName := make([]byte, bucketKeySize) parentBucket := tx.Bucket(parentBucketName(cnr, bucketName)) if parentBucket == nil { return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) } relativeLst, err := decodeList(parentBucket.Get(key)) if err != nil { return nil, err } if len(relativeLst) == 0 { // this should never happen though return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) } var data []byte for i := 0; i < len(relativeLst) && len(data) == 0; i++ { virtualOID := relativeLst[len(relativeLst)-i-1] data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID) } if len(data) == 0 { // check if any of the relatives is an EC object for _, relative := range relativeLst { data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative) if len(data) > 0 { // we can't return object headers, but can return error, // so assembler can try to assemble complex object return nil, getSplitInfoError(tx, cnr, key) } } } child := objectSDK.New() err = child.Unmarshal(data) if err != nil { return nil, fmt.Errorf("can't unmarshal child with parent: %w", err) } par := child.Parent() if par == nil { // this should never happen though return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) } return par, nil } func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error { splitInfo, err := getSplitInfo(tx, cnr, key) if err == nil { return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) } return logicerr.Wrap(new(apistatus.ObjectNotFound)) } func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error { keys, err := decodeList(data) if err != nil { return err } ecInfo := objectSDK.NewECInfo() for _, key := range keys { // check in primary index ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key) if len(ojbData) != 0 { obj := objectSDK.New() if err := obj.Unmarshal(ojbData); err != nil { return err } chunk := objectSDK.ECChunk{} id, _ := obj.ID() chunk.SetID(id) chunk.Index = obj.ECHeader().Index() chunk.Total = obj.ECHeader().Total() ecInfo.AddChunk(chunk) } } return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo)) }