[#825] policer: Do not drop required linking objects
All checks were successful
DCO action / DCO (pull_request) Successful in 2m30s
Vulncheck / Vulncheck (pull_request) Successful in 3m28s
Build / Build Components (1.20) (pull_request) Successful in 4m23s
Build / Build Components (1.21) (pull_request) Successful in 4m27s
Tests and linters / Staticcheck (pull_request) Successful in 6m3s
Tests and linters / Lint (pull_request) Successful in 7m13s
Tests and linters / Tests (1.21) (pull_request) Successful in 14m48s
Tests and linters / Tests (1.20) (pull_request) Successful in 16m1s
Tests and linters / Tests with -race (pull_request) Successful in 15m57s
All checks were successful
DCO action / DCO (pull_request) Successful in 2m30s
Vulncheck / Vulncheck (pull_request) Successful in 3m28s
Build / Build Components (1.20) (pull_request) Successful in 4m23s
Build / Build Components (1.21) (pull_request) Successful in 4m27s
Tests and linters / Staticcheck (pull_request) Successful in 6m3s
Tests and linters / Lint (pull_request) Successful in 7m13s
Tests and linters / Tests (1.21) (pull_request) Successful in 14m48s
Tests and linters / Tests (1.20) (pull_request) Successful in 16m1s
Tests and linters / Tests with -race (pull_request) Successful in 15m57s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
0f45e3d344
commit
f44d3ddc77
3 changed files with 32 additions and 12 deletions
|
@ -1,6 +1,8 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
@ -10,4 +12,9 @@ import (
|
|||
type AddressWithType struct {
|
||||
Address oid.Address
|
||||
Type objectSDK.Type
|
||||
IsLinkingObject bool
|
||||
}
|
||||
|
||||
func (v AddressWithType) String() string {
|
||||
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
|
||||
}
|
||||
|
|
|
@ -102,6 +102,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
|
|||
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
|
||||
threshold := cursor == nil // threshold is a flag to ignore cursor
|
||||
var bucketName []byte
|
||||
var err error
|
||||
|
||||
c := tx.Cursor()
|
||||
name, _ := c.First()
|
||||
|
@ -140,8 +141,11 @@ loop:
|
|||
bkt := tx.Bucket(name)
|
||||
if bkt != nil {
|
||||
copy(rawAddr, cidRaw)
|
||||
result, offset, cursor = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||
result, count, cursor, threshold)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
bucketName = name
|
||||
if len(result) >= count {
|
||||
|
@ -183,23 +187,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
limit int, // stop listing at `limit` items in result
|
||||
cursor *Cursor, // start from cursor object
|
||||
threshold bool, // ignore cursor and start immediately
|
||||
) ([]objectcore.AddressWithType, []byte, *Cursor) {
|
||||
) ([]objectcore.AddressWithType, []byte, *Cursor, error) {
|
||||
if cursor == nil {
|
||||
cursor = new(Cursor)
|
||||
}
|
||||
|
||||
count := len(to)
|
||||
c := bkt.Cursor()
|
||||
k, _ := c.First()
|
||||
k, v := c.First()
|
||||
|
||||
offset := cursor.inBucketOffset
|
||||
|
||||
if !threshold {
|
||||
c.Seek(offset)
|
||||
k, _ = c.Next() // we are looking for objects _after_ the cursor
|
||||
k, v = c.Next() // we are looking for objects _after_ the cursor
|
||||
}
|
||||
|
||||
for ; k != nil; k, _ = c.Next() {
|
||||
for ; k != nil; k, v = c.Next() {
|
||||
if count >= limit {
|
||||
break
|
||||
}
|
||||
|
@ -214,14 +218,23 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
continue
|
||||
}
|
||||
|
||||
var isLinkingObj bool
|
||||
if objType == objectSDK.TypeRegular {
|
||||
var o objectSDK.Object
|
||||
if err := o.Unmarshal(v); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
isLinkingObj = isLinkObject(&o)
|
||||
}
|
||||
|
||||
var a oid.Address
|
||||
a.SetContainer(cnt)
|
||||
a.SetObject(obj)
|
||||
to = append(to, objectcore.AddressWithType{Address: a, Type: objType})
|
||||
to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj})
|
||||
count++
|
||||
}
|
||||
|
||||
return to, offset, cursor
|
||||
return to, offset, cursor, nil
|
||||
}
|
||||
|
||||
func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) {
|
||||
|
|
|
@ -93,10 +93,10 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
|||
// Number of copies that are stored on maintenance nodes.
|
||||
var uncheckedCopies int
|
||||
|
||||
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone {
|
||||
// all nodes of a container must store the `LOCK` and `TOMBSTONE` objects
|
||||
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject {
|
||||
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
|
||||
// for correct object removal protection:
|
||||
// - `LOCK` and `TOMBSTONE` objects are broadcast on their PUT requests;
|
||||
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
|
||||
// - `LOCK` object removal is a prohibited action in the GC.
|
||||
shortage = uint32(len(nodes))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue