policer: Do not drop required linking objects #854

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:fix/linking_object_replication into master 2023-12-12 11:04:04 +00:00
3 changed files with 32 additions and 12 deletions

View file

@ -1,6 +1,8 @@
package object package object
import ( import (
"fmt"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
) )
@ -8,6 +10,11 @@ import (
// AddressWithType groups object address with its FrostFS // AddressWithType groups object address with its FrostFS
// object type. // object type.
type AddressWithType struct { type AddressWithType struct {
Address oid.Address Address oid.Address
Type objectSDK.Type Type objectSDK.Type
IsLinkingObject bool
}
func (v AddressWithType) String() string {
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
} }

View file

@ -102,6 +102,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
threshold := cursor == nil // threshold is a flag to ignore cursor threshold := cursor == nil // threshold is a flag to ignore cursor
var bucketName []byte var bucketName []byte
var err error
c := tx.Cursor() c := tx.Cursor()
name, _ := c.First() name, _ := c.First()
@ -140,8 +141,11 @@ loop:
bkt := tx.Bucket(name) bkt := tx.Bucket(name)
if bkt != nil { if bkt != nil {
copy(rawAddr, cidRaw) copy(rawAddr, cidRaw)
result, offset, cursor = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID, result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
result, count, cursor, threshold) result, count, cursor, threshold)
if err != nil {
return nil, nil, err
}
} }
bucketName = name bucketName = name
if len(result) >= count { if len(result) >= count {
@ -183,23 +187,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
limit int, // stop listing at `limit` items in result limit int, // stop listing at `limit` items in result
cursor *Cursor, // start from cursor object cursor *Cursor, // start from cursor object
threshold bool, // ignore cursor and start immediately threshold bool, // ignore cursor and start immediately
) ([]objectcore.AddressWithType, []byte, *Cursor) { ) ([]objectcore.AddressWithType, []byte, *Cursor, error) {
if cursor == nil { if cursor == nil {
cursor = new(Cursor) cursor = new(Cursor)
} }
count := len(to) count := len(to)
c := bkt.Cursor() c := bkt.Cursor()
k, _ := c.First() k, v := c.First()
offset := cursor.inBucketOffset offset := cursor.inBucketOffset
if !threshold { if !threshold {
c.Seek(offset) c.Seek(offset)
k, _ = c.Next() // we are looking for objects _after_ the cursor k, v = c.Next() // we are looking for objects _after_ the cursor
} }
for ; k != nil; k, _ = c.Next() { for ; k != nil; k, v = c.Next() {
if count >= limit { if count >= limit {
break break
} }
@ -214,14 +218,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
continue continue
} }
var isLinkingObj bool
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return nil, nil, nil, err
}
isLinkingObj = isLinkObject(&o)
}
var a oid.Address var a oid.Address
a.SetContainer(cnt) a.SetContainer(cnt)
a.SetObject(obj) a.SetObject(obj)
to = append(to, objectcore.AddressWithType{Address: a, Type: objType}) to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj})
count++ count++
} }
return to, offset, cursor return to, offset, cursor, nil
} }
func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) { func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) {

View file

@ -93,10 +93,10 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
// Number of copies that are stored on maintenance nodes. // Number of copies that are stored on maintenance nodes.
var uncheckedCopies int var uncheckedCopies int
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone { if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject {
// all nodes of a container must store the `LOCK` and `TOMBSTONE` objects // all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
// for correct object removal protection: // for correct object removal protection:
// - `LOCK` and `TOMBSTONE` objects are broadcast on their PUT requests; // - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
// - `LOCK` object removal is a prohibited action in the GC. // - `LOCK` object removal is a prohibited action in the GC.
shortage = uint32(len(nodes)) shortage = uint32(len(nodes))
} }