policer: Do not drop required linking objects #854
3 changed files with 32 additions and 12 deletions
|
@ -1,6 +1,8 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
@ -10,4 +12,9 @@ import (
|
|||
type AddressWithType struct {
|
||||
Address oid.Address
|
||||
Type objectSDK.Type
|
||||
IsLinkingObject bool
|
||||
}
|
||||
|
||||
func (v AddressWithType) String() string {
|
||||
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
|
||||
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
I think with I think with `%s` we do not need to call `.String()` and `%t` is the same as `FormatBool` -- is your implementation intentional?
dstepanov-yadro
commented
Fixed Fixed
|
||||
|
|
|
@ -102,6 +102,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
|
|||
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
|
||||
threshold := cursor == nil // threshold is a flag to ignore cursor
|
||||
var bucketName []byte
|
||||
var err error
|
||||
|
||||
c := tx.Cursor()
|
||||
name, _ := c.First()
|
||||
|
@ -140,8 +141,11 @@ loop:
|
|||
bkt := tx.Bucket(name)
|
||||
if bkt != nil {
|
||||
copy(rawAddr, cidRaw)
|
||||
result, offset, cursor = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||
result, count, cursor, threshold)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
bucketName = name
|
||||
if len(result) >= count {
|
||||
|
@ -183,23 +187,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
limit int, // stop listing at `limit` items in result
|
||||
cursor *Cursor, // start from cursor object
|
||||
threshold bool, // ignore cursor and start immediately
|
||||
) ([]objectcore.AddressWithType, []byte, *Cursor) {
|
||||
) ([]objectcore.AddressWithType, []byte, *Cursor, error) {
|
||||
if cursor == nil {
|
||||
cursor = new(Cursor)
|
||||
}
|
||||
|
||||
count := len(to)
|
||||
c := bkt.Cursor()
|
||||
k, _ := c.First()
|
||||
k, v := c.First()
|
||||
|
||||
offset := cursor.inBucketOffset
|
||||
|
||||
if !threshold {
|
||||
c.Seek(offset)
|
||||
k, _ = c.Next() // we are looking for objects _after_ the cursor
|
||||
k, v = c.Next() // we are looking for objects _after_ the cursor
|
||||
}
|
||||
|
||||
for ; k != nil; k, _ = c.Next() {
|
||||
for ; k != nil; k, v = c.Next() {
|
||||
if count >= limit {
|
||||
break
|
||||
}
|
||||
|
@ -214,14 +218,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
continue
|
||||
}
|
||||
|
||||
var isLinkingObj bool
|
||||
if objType == objectSDK.TypeRegular {
|
||||
var o objectSDK.Object
|
||||
if err := o.Unmarshal(v); err != nil {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Doing this on every select might slow down lot's of things (policer, evacuation, basically any listing) Doing this on every select might slow down lot's of things (policer, evacuation, basically any listing)
Don't we have some bucket from where we could take this info?
dstepanov-yadro
commented
Also this is metabase: objects in metabase don't contain payload, only headers etc. `List` used by evacuation, doctor and policier. Evacuation and doctor are mostly I/O bounded operations, so I don't think there is great impact of unmarshaling. And policier requires unmarshaling: there is no special bucket for linking objects.
Also this is metabase: objects in metabase don't contain payload, only headers etc.
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
isLinkingObj = isLinkObject(&o)
|
||||
}
|
||||
|
||||
var a oid.Address
|
||||
a.SetContainer(cnt)
|
||||
a.SetObject(obj)
|
||||
to = append(to, objectcore.AddressWithType{Address: a, Type: objType})
|
||||
to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj})
|
||||
count++
|
||||
}
|
||||
|
||||
return to, offset, cursor
|
||||
return to, offset, cursor, nil
|
||||
}
|
||||
|
||||
func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) {
|
||||
|
|
|
@ -93,10 +93,10 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
|||
// Number of copies that are stored on maintenance nodes.
|
||||
var uncheckedCopies int
|
||||
|
||||
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone {
|
||||
// all nodes of a container must store the `LOCK` and `TOMBSTONE` objects
|
||||
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject {
|
||||
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
|
||||
// for correct object removal protection:
|
||||
// - `LOCK` and `TOMBSTONE` objects are broadcast on their PUT requests;
|
||||
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
|
||||
// - `LOCK` object removal is a prohibited action in the GC.
|
||||
shortage = uint32(len(nodes))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue
To use in
printf
-methods with%s