policer: Do not drop required linking objects #854

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:fix/linking_object_replication into master 2023-12-12 11:04:04 +00:00
3 changed files with 32 additions and 12 deletions

View file

@ -1,6 +1,8 @@
package object
import (
"fmt"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
@ -8,6 +10,11 @@ import (
// AddressWithType groups object address with its FrostFS
// object type.
type AddressWithType struct {
Address oid.Address
Type objectSDK.Type
Address oid.Address
Type objectSDK.Type
IsLinkingObject bool
}
func (v AddressWithType) String() string {
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)

To use in printf-methods with %s

To use in `printf`-methods with `%s`
}
fyrchik marked this conversation as resolved Outdated

I think with %s we do not need to call .String() and %t is the same as FormatBool -- is your implementation intentional?

I think with `%s` we do not need to call `.String()` and `%t` is the same as `FormatBool` -- is your implementation intentional?

Fixed

Fixed

View file

@ -102,6 +102,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
threshold := cursor == nil // threshold is a flag to ignore cursor
var bucketName []byte
var err error
c := tx.Cursor()
name, _ := c.First()
@ -140,8 +141,11 @@ loop:
bkt := tx.Bucket(name)
if bkt != nil {
copy(rawAddr, cidRaw)
result, offset, cursor = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
result, count, cursor, threshold)
if err != nil {
return nil, nil, err
}
}
bucketName = name
if len(result) >= count {
@ -183,23 +187,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
limit int, // stop listing at `limit` items in result
cursor *Cursor, // start from cursor object
threshold bool, // ignore cursor and start immediately
) ([]objectcore.AddressWithType, []byte, *Cursor) {
) ([]objectcore.AddressWithType, []byte, *Cursor, error) {
if cursor == nil {
cursor = new(Cursor)
}
count := len(to)
c := bkt.Cursor()
k, _ := c.First()
k, v := c.First()
offset := cursor.inBucketOffset
if !threshold {
c.Seek(offset)
k, _ = c.Next() // we are looking for objects _after_ the cursor
k, v = c.Next() // we are looking for objects _after_ the cursor
}
for ; k != nil; k, _ = c.Next() {
for ; k != nil; k, v = c.Next() {
if count >= limit {
break
}
@ -214,14 +218,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
continue
}
var isLinkingObj bool
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
fyrchik marked this conversation as resolved Outdated

Doing this on every select might slow down lot's of things (policer, evacuation, basically any listing)
Don't we have some bucket from where we could take this info?

Doing this on every select might slow down lot's of things (policer, evacuation, basically any listing) Don't we have some bucket from where we could take this info?

List used by evacuation, doctor and policier. Evacuation and doctor are mostly I/O bounded operations, so I don't think there is great impact of unmarshaling. And policier requires unmarshaling: there is no special bucket for linking objects.

Also this is metabase: objects in metabase don't contain payload, only headers etc.

`List` used by evacuation, doctor and policier. Evacuation and doctor are mostly I/O bounded operations, so I don't think there is great impact of unmarshaling. And policier requires unmarshaling: there is no special bucket for linking objects. Also this is metabase: objects in metabase don't contain payload, only headers etc.
return nil, nil, nil, err
}
isLinkingObj = isLinkObject(&o)
}
var a oid.Address
a.SetContainer(cnt)
a.SetObject(obj)
to = append(to, objectcore.AddressWithType{Address: a, Type: objType})
to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj})
count++
}
return to, offset, cursor
return to, offset, cursor, nil
}
func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) {

View file

@ -93,10 +93,10 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
// Number of copies that are stored on maintenance nodes.
var uncheckedCopies int
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone {
// all nodes of a container must store the `LOCK` and `TOMBSTONE` objects
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject {
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
// for correct object removal protection:
// - `LOCK` and `TOMBSTONE` objects are broadcast on their PUT requests;
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
// - `LOCK` object removal is a prohibited action in the GC.
shortage = uint32(len(nodes))
}