package meta import ( "bytes" "context" "time" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) // ErrEndOfListing is returned from object listing with cursor // when storage can't return any more objects after provided // cursor. Use nil cursor object to start listing again. var ErrEndOfListing = logicerr.New("end of object listing") // Cursor is a type for continuous object listing. type Cursor struct { bucketName []byte inBucketOffset []byte } // ListPrm contains parameters for ListWithCursor operation. type ListPrm struct { count int cursor *Cursor } // SetCount sets maximum amount of addresses that ListWithCursor should return. func (l *ListPrm) SetCount(count uint32) { l.count = int(count) } // SetCursor sets cursor for ListWithCursor operation. For initial request // ignore this param or use nil value. For consecutive requests, use value // from ListRes. func (l *ListPrm) SetCursor(cursor *Cursor) { l.cursor = cursor } // ListRes contains values returned from ListWithCursor operation. type ListRes struct { addrList []objectcore.Info cursor *Cursor } // AddressList returns addresses selected by ListWithCursor operation. func (l ListRes) AddressList() []objectcore.Info { return l.addrList } // Cursor returns cursor for consecutive listing requests. func (l ListRes) Cursor() *Cursor { return l.cursor } // IterateOverContainersPrm contains parameters for IterateOverContainers operation. type IterateOverContainersPrm struct { // Handler function executed upon containers in db. Handler func(context.Context, objectSDK.Type, cid.ID) error } // IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. type IterateOverObjectsInContainerPrm struct { // ObjectType type of objects to iterate over. ObjectType objectSDK.Type // ContainerID container for objects to iterate over. ContainerID cid.ID // Handler function executed upon objects in db. Handler func(context.Context, *objectcore.Info) error } // CountAliveObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. type CountAliveObjectsInContainerPrm struct { // ObjectType type of objects to iterate over. ObjectType objectSDK.Type // ContainerID container for objects to iterate over. ContainerID cid.ID } // ListWithCursor lists physical objects available in metabase starting from // cursor. Includes objects of all types. Does not include inhumed objects. // Use cursor value from response for consecutive requests. // // Returns ErrEndOfListing if there are no more objects to return or count // parameter is set to zero. func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err error) { var ( startedAt = time.Now() success = false ) defer func() { db.metrics.AddMethodDuration("ListWithCursor", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "metabase.ListWithCursor", trace.WithAttributes( attribute.Int("count", prm.count), attribute.Bool("has_cursor", prm.cursor != nil), )) defer span.End() db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { return res, ErrDegradedMode } result := make([]objectcore.Info, 0, prm.count) err = db.boltDB.View(func(tx *bbolt.Tx) error { res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor) return err }) success = err == nil return res, metaerr.Wrap(err) } func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) { threshold := cursor == nil // threshold is a flag to ignore cursor var bucketName []byte var err error c := tx.Cursor() name, _ := c.First() if !threshold { name, _ = c.Seek(cursor.bucketName) } var containerID cid.ID var offset []byte graveyardBkt := tx.Bucket(graveyardBucketName) garbageBkt := tx.Bucket(garbageBucketName) rawAddr := make([]byte, cidSize, addressKeySize) loop: for ; name != nil; name, _ = c.Next() { cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name) if cidRaw == nil { continue } var objType objectSDK.Type switch prefix { case primaryPrefix: objType = objectSDK.TypeRegular case lockersPrefix: objType = objectSDK.TypeLock case tombstonePrefix: objType = objectSDK.TypeTombstone default: continue } bkt := tx.Bucket(name) if bkt != nil { copy(rawAddr, cidRaw) result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID, result, count, cursor, threshold) if err != nil { return nil, nil, err } } bucketName = name if len(result) >= count { break loop } // set threshold flag after first `selectNFromBucket` invocation // first invocation must look for cursor object threshold = true } if offset != nil { // new slice is much faster but less memory efficient // we need to copy, because offset exists during bbolt tx cursor.inBucketOffset = make([]byte, len(offset)) copy(cursor.inBucketOffset, offset) } if len(result) == 0 { return nil, nil, ErrEndOfListing } // new slice is much faster but less memory efficient // we need to copy, because bucketName exists during bbolt tx cursor.bucketName = make([]byte, len(bucketName)) copy(cursor.bucketName, bucketName) return result, cursor, nil } // selectNFromBucket similar to selectAllFromBucket but uses cursor to find // object to start selecting from. Ignores inhumed objects. func selectNFromBucket(bkt *bbolt.Bucket, // main bucket objType objectSDK.Type, // type of the objects stored in the main bucket graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets cidRaw []byte, // container ID prefix, optimization cnt cid.ID, // container ID to []objectcore.Info, // listing result limit int, // stop listing at `limit` items in result cursor *Cursor, // start from cursor object threshold bool, // ignore cursor and start immediately ) ([]objectcore.Info, []byte, *Cursor, error) { if cursor == nil { cursor = new(Cursor) } count := len(to) c := bkt.Cursor() k, v := c.First() offset := cursor.inBucketOffset if !threshold { c.Seek(offset) k, v = c.Next() // we are looking for objects _after_ the cursor } for ; k != nil; k, v = c.Next() { if count >= limit { break } var obj oid.ID if err := obj.Decode(k); err != nil { break } offset = k if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { continue } var isLinkingObj bool var ecInfo *objectcore.ECInfo if objType == objectSDK.TypeRegular { var o objectSDK.Object if err := o.Unmarshal(bytes.Clone(v)); err != nil { return nil, nil, nil, err } isLinkingObj = isLinkObject(&o) ecHeader := o.ECHeader() if ecHeader != nil { ecInfo = &objectcore.ECInfo{ ParentID: ecHeader.Parent(), Index: ecHeader.Index(), Total: ecHeader.Total(), } } } var a oid.Address a.SetContainer(cnt) a.SetObject(obj) to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}) count++ } return to, offset, cursor, nil } func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) { if len(name) < bucketKeySize { return nil, 0 } rawID := name[1:bucketKeySize] if err := containerID.Decode(rawID); err != nil { return nil, 0 } return rawID, name[0] } // IterateOverContainers lists physical containers available in metabase starting from first. func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error { var ( startedAt = time.Now() success = false ) defer func() { db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers", trace.WithAttributes( attribute.Bool("has_handler", prm.Handler != nil), )) defer span.End() db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { return ErrDegradedMode } err := db.boltDB.View(func(tx *bbolt.Tx) error { return db.iterateOverContainers(ctx, tx, prm) }) success = err == nil return metaerr.Wrap(err) } func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error { var containerID cid.ID for _, prefix := range [][]byte{{byte(primaryPrefix)}, {byte(lockersPrefix)}, {byte(tombstonePrefix)}} { c := tx.Cursor() for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() { cidRaw, _ := parseContainerIDWithPrefix(&containerID, name) if cidRaw == nil { continue } var cnt cid.ID copy(cnt[:], containerID[:]) var objType objectSDK.Type switch prefix[0] { case primaryPrefix: objType = objectSDK.TypeRegular case lockersPrefix: objType = objectSDK.TypeLock case tombstonePrefix: objType = objectSDK.TypeTombstone default: continue } err := prm.Handler(ctx, objType, cnt) if err != nil { return err } } } return nil } // IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first. func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error { var ( startedAt = time.Now() success = false ) defer func() { db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer", trace.WithAttributes( attribute.Bool("has_handler", prm.Handler != nil), )) defer span.End() db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { return ErrDegradedMode } err := db.boltDB.View(func(tx *bbolt.Tx) error { return db.iterateOverObjectsInContainer(ctx, tx, prm) }) success = err == nil return metaerr.Wrap(err) } func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, prm IterateOverObjectsInContainerPrm) error { var prefix byte switch prm.ObjectType { case objectSDK.TypeRegular: prefix = primaryPrefix case objectSDK.TypeLock: prefix = lockersPrefix case objectSDK.TypeTombstone: prefix = tombstonePrefix default: return nil } bucketName := []byte{prefix} bucketName = append(bucketName, prm.ContainerID[:]...) bkt := tx.Bucket(bucketName) if bkt == nil { return nil } graveyardBkt := tx.Bucket(graveyardBucketName) garbageBkt := tx.Bucket(garbageBucketName) c := bkt.Cursor() k, v := c.First() for ; k != nil; k, v = c.Next() { var obj oid.ID if err := obj.Decode(k); err != nil { break } if inGraveyardWithKey(append(prm.ContainerID[:], k...), graveyardBkt, garbageBkt) > 0 { continue } var isLinkingObj bool var ecInfo *objectcore.ECInfo if prm.ObjectType == objectSDK.TypeRegular { var o objectSDK.Object if err := o.Unmarshal(bytes.Clone(v)); err != nil { return err } isLinkingObj = isLinkObject(&o) ecHeader := o.ECHeader() if ecHeader != nil { ecInfo = &objectcore.ECInfo{ ParentID: ecHeader.Parent(), Index: ecHeader.Index(), Total: ecHeader.Total(), } } } var a oid.Address a.SetContainer(prm.ContainerID) a.SetObject(obj) objInfo := objectcore.Info{Address: a, Type: prm.ObjectType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo} err := prm.Handler(ctx, &objInfo) if err != nil { return err } } return nil } // CountAliveObjectsInContainer count objects in bucket which aren't in graveyard or garbage. func (db *DB) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) { var ( startedAt = time.Now() success = false ) defer func() { db.metrics.AddMethodDuration("CountAliveObjectsInBucket", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "metabase.CountAliveObjectsInBucket") defer span.End() db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { return 0, ErrDegradedMode } var prefix byte switch prm.ObjectType { case objectSDK.TypeRegular: prefix = primaryPrefix case objectSDK.TypeLock: prefix = lockersPrefix case objectSDK.TypeTombstone: prefix = tombstonePrefix default: return 0, nil } bucketName := []byte{prefix} bucketName = append(bucketName, prm.ContainerID[:]...) var count uint64 err := db.boltDB.View(func(tx *bbolt.Tx) error { bkt := tx.Bucket(bucketName) if bkt == nil { return nil } graveyardBkt := tx.Bucket(graveyardBucketName) garbageBkt := tx.Bucket(garbageBucketName) c := bkt.Cursor() k, _ := c.First() for ; k != nil; k, _ = c.Next() { if inGraveyardWithKey(append(prm.ContainerID[:], k...), graveyardBkt, garbageBkt) > 0 { continue } count++ } return nil }) success = err == nil return count, metaerr.Wrap(err) }