policer: Do not drop required linking objects #854
3 changed files with 32 additions and 12 deletions
|
@ -1,6 +1,8 @@
|
||||||
package object
|
package object
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
@ -8,6 +10,11 @@ import (
|
||||||
// AddressWithType groups object address with its FrostFS
|
// AddressWithType groups object address with its FrostFS
|
||||||
// object type.
|
// object type.
|
||||||
type AddressWithType struct {
|
type AddressWithType struct {
|
||||||
Address oid.Address
|
Address oid.Address
|
||||||
Type objectSDK.Type
|
Type objectSDK.Type
|
||||||
|
IsLinkingObject bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v AddressWithType) String() string {
|
||||||
|
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,6 +102,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
|
||||||
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
|
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
|
||||||
threshold := cursor == nil // threshold is a flag to ignore cursor
|
threshold := cursor == nil // threshold is a flag to ignore cursor
|
||||||
var bucketName []byte
|
var bucketName []byte
|
||||||
|
var err error
|
||||||
|
|
||||||
c := tx.Cursor()
|
c := tx.Cursor()
|
||||||
name, _ := c.First()
|
name, _ := c.First()
|
||||||
|
@ -140,8 +141,11 @@ loop:
|
||||||
bkt := tx.Bucket(name)
|
bkt := tx.Bucket(name)
|
||||||
if bkt != nil {
|
if bkt != nil {
|
||||||
copy(rawAddr, cidRaw)
|
copy(rawAddr, cidRaw)
|
||||||
result, offset, cursor = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||||
result, count, cursor, threshold)
|
result, count, cursor, threshold)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
bucketName = name
|
bucketName = name
|
||||||
if len(result) >= count {
|
if len(result) >= count {
|
||||||
|
@ -183,23 +187,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
limit int, // stop listing at `limit` items in result
|
limit int, // stop listing at `limit` items in result
|
||||||
cursor *Cursor, // start from cursor object
|
cursor *Cursor, // start from cursor object
|
||||||
threshold bool, // ignore cursor and start immediately
|
threshold bool, // ignore cursor and start immediately
|
||||||
) ([]objectcore.AddressWithType, []byte, *Cursor) {
|
) ([]objectcore.AddressWithType, []byte, *Cursor, error) {
|
||||||
if cursor == nil {
|
if cursor == nil {
|
||||||
cursor = new(Cursor)
|
cursor = new(Cursor)
|
||||||
}
|
}
|
||||||
|
|
||||||
count := len(to)
|
count := len(to)
|
||||||
c := bkt.Cursor()
|
c := bkt.Cursor()
|
||||||
k, _ := c.First()
|
k, v := c.First()
|
||||||
|
|
||||||
offset := cursor.inBucketOffset
|
offset := cursor.inBucketOffset
|
||||||
|
|
||||||
if !threshold {
|
if !threshold {
|
||||||
c.Seek(offset)
|
c.Seek(offset)
|
||||||
k, _ = c.Next() // we are looking for objects _after_ the cursor
|
k, v = c.Next() // we are looking for objects _after_ the cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
for ; k != nil; k, _ = c.Next() {
|
for ; k != nil; k, v = c.Next() {
|
||||||
if count >= limit {
|
if count >= limit {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -214,14 +218,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var isLinkingObj bool
|
||||||
|
if objType == objectSDK.TypeRegular {
|
||||||
|
var o objectSDK.Object
|
||||||
|
if err := o.Unmarshal(v); err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
isLinkingObj = isLinkObject(&o)
|
||||||
|
}
|
||||||
|
|
||||||
var a oid.Address
|
var a oid.Address
|
||||||
a.SetContainer(cnt)
|
a.SetContainer(cnt)
|
||||||
a.SetObject(obj)
|
a.SetObject(obj)
|
||||||
to = append(to, objectcore.AddressWithType{Address: a, Type: objType})
|
to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj})
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
|
|
||||||
return to, offset, cursor
|
return to, offset, cursor, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) {
|
func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) {
|
||||||
|
|
|
@ -93,10 +93,10 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
||||||
// Number of copies that are stored on maintenance nodes.
|
// Number of copies that are stored on maintenance nodes.
|
||||||
var uncheckedCopies int
|
var uncheckedCopies int
|
||||||
|
|
||||||
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone {
|
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject {
|
||||||
// all nodes of a container must store the `LOCK` and `TOMBSTONE` objects
|
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
|
||||||
// for correct object removal protection:
|
// for correct object removal protection:
|
||||||
// - `LOCK` and `TOMBSTONE` objects are broadcast on their PUT requests;
|
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
|
||||||
// - `LOCK` object removal is a prohibited action in the GC.
|
// - `LOCK` object removal is a prohibited action in the GC.
|
||||||
shortage = uint32(len(nodes))
|
shortage = uint32(len(nodes))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue