diff --git a/pkg/core/object/address.go b/pkg/core/object/address.go index 12e5c89c..a25d853e 100644 --- a/pkg/core/object/address.go +++ b/pkg/core/object/address.go @@ -1,6 +1,8 @@ package object import ( + "fmt" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) @@ -8,6 +10,11 @@ import ( // AddressWithType groups object address with its FrostFS // object type. type AddressWithType struct { - Address oid.Address - Type objectSDK.Type + Address oid.Address + Type objectSDK.Type + IsLinkingObject bool +} + +func (v AddressWithType) String() string { + return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject) } diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index 5099a161..f7a31445 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -102,6 +102,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { threshold := cursor == nil // threshold is a flag to ignore cursor var bucketName []byte + var err error c := tx.Cursor() name, _ := c.First() @@ -140,8 +141,11 @@ loop: bkt := tx.Bucket(name) if bkt != nil { copy(rawAddr, cidRaw) - result, offset, cursor = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID, + result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID, result, count, cursor, threshold) + if err != nil { + return nil, nil, err + } } bucketName = name if len(result) >= count { @@ -183,23 +187,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket limit int, // stop listing at `limit` items in result cursor *Cursor, // start from cursor object threshold bool, // ignore cursor and start immediately -) ([]objectcore.AddressWithType, []byte, *Cursor) { +) ([]objectcore.AddressWithType, []byte, *Cursor, error) { if cursor == nil { cursor = new(Cursor) } count := len(to) c := bkt.Cursor() - k, _ := c.First() + k, v := c.First() offset := cursor.inBucketOffset if !threshold { c.Seek(offset) - k, _ = c.Next() // we are looking for objects _after_ the cursor + k, v = c.Next() // we are looking for objects _after_ the cursor } - for ; k != nil; k, _ = c.Next() { + for ; k != nil; k, v = c.Next() { if count >= limit { break } @@ -214,14 +218,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket continue } + var isLinkingObj bool + if objType == objectSDK.TypeRegular { + var o objectSDK.Object + if err := o.Unmarshal(v); err != nil { + return nil, nil, nil, err + } + isLinkingObj = isLinkObject(&o) + } + var a oid.Address a.SetContainer(cnt) a.SetObject(obj) - to = append(to, objectcore.AddressWithType{Address: a, Type: objType}) + to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj}) count++ } - return to, offset, cursor + return to, offset, cursor, nil } func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) { diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 9e40219d..73f83b2a 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -93,10 +93,10 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi // Number of copies that are stored on maintenance nodes. var uncheckedCopies int - if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone { - // all nodes of a container must store the `LOCK` and `TOMBSTONE` objects + if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject { + // all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects // for correct object removal protection: - // - `LOCK` and `TOMBSTONE` objects are broadcast on their PUT requests; + // - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests; // - `LOCK` object removal is a prohibited action in the GC. shortage = uint32(len(nodes)) }