package meta import ( "bytes" "context" "encoding/binary" "errors" "fmt" gio "io" "strconv" "time" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/nspcc-dev/neo-go/pkg/io" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) type ( namedBucketItem struct { name, key, val []byte } ) // PutPrm groups the parameters of Put operation. type PutPrm struct { obj *objectSDK.Object id []byte indexAttributes bool } // PutRes groups the resulting values of Put operation. type PutRes struct { Inserted bool } // SetObject is a Put option to set object to save. func (p *PutPrm) SetObject(obj *objectSDK.Object) { p.obj = obj } // SetStorageID is a Put option to set storage ID to save. func (p *PutPrm) SetStorageID(id []byte) { p.id = id } func (p *PutPrm) SetIndexAttributes(v bool) { p.indexAttributes = v } var ( ErrUnknownObjectType = errors.New("unknown object type") ErrIncorrectRootObject = errors.New("invalid root object") ) // Put saves object header in metabase. Object payload expected to be cut. // // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) { var ( startedAt = time.Now() success = false ) defer func() { db.metrics.AddMethodDuration("Put", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "metabase.Put", trace.WithAttributes( attribute.String("address", objectCore.AddressOf(prm.obj).EncodeToString()), )) defer span.End() db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { return res, ErrDegradedMode } else if db.mode.ReadOnly() { return res, ErrReadOnlyMode } currEpoch := db.epochState.CurrentEpoch() err = db.boltDB.Batch(func(tx *bbolt.Tx) error { var e error res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes) return e }) if err == nil { success = true storagelog.Write(ctx, db.log, storagelog.AddressField(objectCore.AddressOf(prm.obj)), storagelog.OpField("metabase PUT")) } return res, metaerr.Wrap(err) } func (db *DB) put(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, currEpoch uint64, indexAttributes bool, ) (PutRes, error) { cnr, ok := obj.ContainerID() if !ok { return PutRes{}, errors.New("missing container in object") } var ecParentAddress oid.Address if ecHeader := obj.ECHeader(); ecHeader != nil { ecParentAddress.SetContainer(cnr) ecParentAddress.SetObject(ecHeader.Parent()) } isParent := si != nil exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch) var splitInfoError *objectSDK.SplitInfoError if errors.As(err, &splitInfoError) { exists = true // object exists, however it is virtual } else if err != nil { return PutRes{}, err // return any error besides SplitInfoError } if exists { return PutRes{}, db.updateObj(tx, obj, id, si, isParent) } return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch, indexAttributes) } func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error { // most right child and split header overlap parent so we have to // check if object exists to not overwrite it twice // When storage engine moves objects between different sub-storages, // it calls metabase.Put method with new storage ID, thus triggering this code. if !isParent && id != nil { return setStorageID(tx, objectCore.AddressOf(obj), id, true) } // when storage already has last object in split hierarchy and there is // a linking object to put (or vice versa), we should update split info // with object ids of these objects if isParent { return updateSplitInfo(tx, objectCore.AddressOf(obj), si) } return nil } func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error { if par := obj.Parent(); par != nil && !isParent { // limit depth by two parentSI, err := splitInfoFromObject(obj) if err != nil { return err } _, err = db.put(tx, par, id, parentSI, currEpoch, indexAttributes) if err != nil { return err } } err := putUniqueIndexes(tx, obj, si, id) if err != nil { return fmt.Errorf("can't put unique indexes: %w", err) } err = updateListIndexes(tx, obj, putListIndexItem) if err != nil { return fmt.Errorf("can't put list indexes: %w", err) } if indexAttributes { err = updateFKBTIndexes(tx, obj, putFKBTIndexItem) if err != nil { return fmt.Errorf("can't put fake bucket tree indexes: %w", err) } } // update container volume size estimation if obj.Type() == objectSDK.TypeRegular && !isParent { err = changeContainerSize(tx, cnr, obj.PayloadSize(), true) if err != nil { return err } } if !isParent { if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil { return err } } return nil } func putUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error { isParent := si != nil addr := objectCore.AddressOf(obj) objKey := objectKey(addr.Object(), make([]byte, objectKeySize)) bucketName := make([]byte, bucketKeySize) if !isParent { err := putRawObjectData(tx, obj, bucketName, addr, objKey) if err != nil { return err } if id != nil { if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil { return err } } } if err := putExpirationEpoch(tx, obj, addr, objKey); err != nil { return err } return putSplitInfo(tx, obj, bucketName, addr, si, objKey) } func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error { switch obj.Type() { case objectSDK.TypeRegular: bucketName = primaryBucketName(addr.Container(), bucketName) case objectSDK.TypeTombstone: bucketName = tombstoneBucketName(addr.Container(), bucketName) case objectSDK.TypeLock: bucketName = bucketNameLockers(addr.Container(), bucketName) default: return ErrUnknownObjectType } rawObject, err := obj.CutPayload().Marshal() if err != nil { return fmt.Errorf("can't marshal object header: %w", err) } return putUniqueIndexItem(tx, namedBucketItem{ name: bucketName, key: objKey, val: rawObject, }) } func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, objKey []byte) error { if expEpoch, ok := hasExpirationEpoch(obj); ok { err := putUniqueIndexItem(tx, namedBucketItem{ name: expEpochToObjectBucketName, key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()), val: zeroValue, }) if err != nil { return err } val := make([]byte, epochSize) binary.LittleEndian.PutUint64(val, expEpoch) err = putUniqueIndexItem(tx, namedBucketItem{ name: objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)), key: objKey, val: val, }) if err != nil { return err } } return nil } func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error { if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() { if ecHead := obj.ECHeader(); ecHead != nil { parentID := ecHead.Parent() if ecHead.ParentSplitID() != nil { parentSplitParentID := ecHead.ParentSplitParentID() if parentSplitParentID == nil { return nil } si = objectSDK.NewSplitInfo() si.SetSplitID(ecHead.ParentSplitID()) si.SetLastPart(ecHead.Parent()) parentID = *parentSplitParentID } objKey = objectKey(parentID, objKey) } return updateSplitInfoIndex(tx, objKey, addr.Container(), bucketName, si) } return nil } func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error { return updateUniqueIndexItem(tx, namedBucketItem{ name: rootBucketName(cnr, bucketName), key: objKey, }, func(old, _ []byte) ([]byte, error) { switch { case si == nil && old == nil: return []byte{}, nil case si == nil: return old, nil case old == nil: return si.Marshal() default: oldSI := objectSDK.NewSplitInfo() if err := oldSI.Unmarshal(bytes.Clone(old)); err != nil { return nil, err } si = util.MergeSplitInfo(si, oldSI) return si.Marshal() } }) } type updateIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error { idObj, _ := obj.ID() cnr, _ := obj.ContainerID() objKey := objectKey(idObj, make([]byte, objectKeySize)) bucketName := make([]byte, bucketKeySize) idParent, ok := obj.ParentID() // index parent ids if ok { err := f(tx, namedBucketItem{ name: parentBucketName(cnr, bucketName), key: objectKey(idParent, make([]byte, objectKeySize)), val: objKey, }) if err != nil { return err } } // index split ids if obj.SplitID() != nil { err := f(tx, namedBucketItem{ name: splitBucketName(cnr, bucketName), key: obj.SplitID().ToV2(), val: objKey, }) if err != nil { return err } } if ech := obj.ECHeader(); ech != nil { err := f(tx, namedBucketItem{ name: ecInfoBucketName(cnr, bucketName), key: objectKey(ech.Parent(), make([]byte, objectKeySize)), val: objKey, }) if err != nil { return err } if ech.ParentSplitID() != nil { objKey := objectKey(ech.Parent(), make([]byte, objectKeySize)) err := f(tx, namedBucketItem{ name: splitBucketName(cnr, bucketName), key: ech.ParentSplitID().ToV2(), val: objKey, }) if err != nil { return err } } if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil { objKey := objectKey(ech.Parent(), make([]byte, objectKeySize)) err := f(tx, namedBucketItem{ name: parentBucketName(cnr, bucketName), key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)), val: objKey, }) if err != nil { return err } } } return nil } var indexedAttributes = map[string]struct{}{ "S3-Access-Box-CRDT-Name": {}, objectSDK.AttributeFilePath: {}, } // IsAtrributeIndexed returns True if attribute is indexed by metabase. func IsAtrributeIndexed(attr string) bool { _, found := indexedAttributes[attr] return found } func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error { id, _ := obj.ID() cnr, _ := obj.ContainerID() objKey := objectKey(id, make([]byte, objectKeySize)) key := make([]byte, bucketKeySize) var attrs []objectSDK.Attribute if obj.ECHeader() != nil { attrs = obj.ECHeader().ParentAttributes() objKey = objectKey(obj.ECHeader().Parent(), make([]byte, objectKeySize)) } else { attrs = obj.Attributes() } // user specified attributes for i := range attrs { if !IsAtrributeIndexed(attrs[i].Key()) { continue } key = attributeBucketName(cnr, attrs[i].Key(), key) err := f(tx, namedBucketItem{ name: key, key: []byte(attrs[i].Value()), val: objKey, }) if err != nil { return err } } return nil } func hasExpirationEpoch(obj *objectSDK.Object) (uint64, bool) { attributes := obj.Attributes() if ech := obj.ECHeader(); ech != nil { attributes = ech.ParentAttributes() } for _, attr := range attributes { if attr.Key() == objectV2.SysAttributeExpEpoch { expEpoch, err := strconv.ParseUint(attr.Value(), 10, 64) return expEpoch, err == nil } } return 0, false } type bucketContainer interface { Bucket([]byte) *bbolt.Bucket CreateBucket([]byte) (*bbolt.Bucket, error) CreateBucketIfNotExists([]byte) (*bbolt.Bucket, error) } func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Bucket, error) { if bkt := tx.Bucket(name); bkt != nil { return bkt, nil } return tx.CreateBucket(name) } func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error { bkt, err := createBucketLikelyExists(tx, item.name) if err != nil { return fmt.Errorf("can't create index %v: %w", item.name, err) } data, err := update(bkt.Get(item.key), item.val) if err != nil { return err } return bkt.Put(item.key, data) } func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error { return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil }) } func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error { bkt, err := createBucketLikelyExists(tx, item.name) if err != nil { return fmt.Errorf("can't create index %v: %w", item.name, err) } fkbtRoot, err := createBucketLikelyExists(bkt, item.key) if err != nil { return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err) } return fkbtRoot.Put(item.val, zeroValue) } func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error { bkt, err := createBucketLikelyExists(tx, item.name) if err != nil { return fmt.Errorf("can't create index %v: %w", item.name, err) } lst, err := decodeList(bkt.Get(item.key)) if err != nil { return fmt.Errorf("can't decode leaf list %v: %w", item.key, err) } lst = append(lst, item.val) encodedLst, err := encodeList(lst) if err != nil { return fmt.Errorf("can't encode leaf list %v: %w", item.key, err) } return bkt.Put(item.key, encodedLst) } // encodeList decodes list of bytes into a single blog for list bucket indexes. func encodeList(lst [][]byte) ([]byte, error) { w := io.NewBufBinWriter() w.WriteVarUint(uint64(len(lst))) for i := range lst { w.WriteVarBytes(lst[i]) } if w.Err != nil { return nil, w.Err } return w.Bytes(), nil } // decodeList decodes blob into the list of bytes from list bucket index. func decodeList(data []byte) (lst [][]byte, err error) { if len(data) == 0 { return nil, nil } var offset uint64 size, n, err := getVarUint(data) if err != nil { return nil, err } offset += uint64(n) lst = make([][]byte, size, size+1) for i := range lst { sz, n, err := getVarUint(data[offset:]) if err != nil { return nil, err } offset += uint64(n) next := offset + sz if uint64(len(data)) < next { return nil, gio.ErrUnexpectedEOF } lst[i] = data[offset:next] offset = next } return lst, nil } func getVarUint(data []byte) (uint64, int, error) { if len(data) == 0 { return 0, 0, gio.ErrUnexpectedEOF } switch b := data[0]; b { case 0xfd: if len(data) < 3 { return 0, 1, gio.ErrUnexpectedEOF } return uint64(binary.LittleEndian.Uint16(data[1:])), 3, nil case 0xfe: if len(data) < 5 { return 0, 1, gio.ErrUnexpectedEOF } return uint64(binary.LittleEndian.Uint32(data[1:])), 5, nil case 0xff: if len(data) < 9 { return 0, 1, gio.ErrUnexpectedEOF } return binary.LittleEndian.Uint64(data[1:]), 9, nil default: return uint64(b), 1, nil } } // setStorageID for existing objects if they were moved from one // storage location to another. func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error { key := make([]byte, bucketKeySize) bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key)) if err != nil { return err } key = objectKey(addr.Object(), key) if override || bkt.Get(key) == nil { return bkt.Put(key, id) } return nil } // updateSpliInfo for existing objects if storage filled with extra information // about last object in split hierarchy or linking object. func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo) error { objKey := objectKey(addr.Object(), make([]byte, bucketKeySize)) return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from) } // splitInfoFromObject returns split info based on last or linkin object. // Otherwise returns nil, nil. func splitInfoFromObject(obj *objectSDK.Object) (*objectSDK.SplitInfo, error) { if obj.Parent() == nil { return nil, nil } info := objectSDK.NewSplitInfo() info.SetSplitID(obj.SplitID()) switch { case isLinkObject(obj): id, ok := obj.ID() if !ok { return nil, errors.New("missing object ID") } info.SetLink(id) case isLastObject(obj): id, ok := obj.ID() if !ok { return nil, errors.New("missing object ID") } info.SetLastPart(id) default: return nil, ErrIncorrectRootObject // should never happen } return info, nil } // isLinkObject returns true if object contains parent header and list // of children. func isLinkObject(obj *objectSDK.Object) bool { return len(obj.Children()) > 0 && obj.Parent() != nil } // isLastObject returns true if object contains only parent header without list // of children. func isLastObject(obj *objectSDK.Object) bool { return len(obj.Children()) == 0 && obj.Parent() != nil }