[#183] gc: Fix drop expired locked complex objects
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
Do not delete bucket keys during iteration Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
9f0bce5c15
commit
ab32067152
1 changed files with 47 additions and 13 deletions
|
@ -14,6 +14,11 @@ import (
|
||||||
|
|
||||||
var bucketNameLocked = []byte{lockedPrefix}
|
var bucketNameLocked = []byte{lockedPrefix}
|
||||||
|
|
||||||
|
type keyValue struct {
|
||||||
|
Key []byte
|
||||||
|
Value []byte
|
||||||
|
}
|
||||||
|
|
||||||
// returns name of the bucket with objects of type LOCK for specified container.
|
// returns name of the bucket with objects of type LOCK for specified container.
|
||||||
func bucketNameLockers(idCnr cid.ID, key []byte) []byte {
|
func bucketNameLockers(idCnr cid.ID, key []byte) []byte {
|
||||||
return bucketName(idCnr, lockersPrefix, key)
|
return bucketName(idCnr, lockersPrefix, key)
|
||||||
|
@ -107,7 +112,7 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
|
||||||
|
|
||||||
var unlockedObjects []oid.Address
|
var unlockedObjects []oid.Address
|
||||||
|
|
||||||
return unlockedObjects, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
if err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
for i := range lockers {
|
for i := range lockers {
|
||||||
unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
|
unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -117,7 +122,10 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return unlockedObjects, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// checks if specified object is locked in the specified container.
|
// checks if specified object is locked in the specified container.
|
||||||
|
@ -157,7 +165,8 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres
|
||||||
}
|
}
|
||||||
|
|
||||||
keyLocker := objectKey(locker, key)
|
keyLocker := objectKey(locker, key)
|
||||||
return unlockedObjects, bucketLockedContainer.ForEach(func(k, v []byte) error {
|
updates := make([]keyValue, 0)
|
||||||
|
err := bucketLockedContainer.ForEach(func(k, v []byte) error {
|
||||||
keyLockers, err := decodeList(v)
|
keyLockers, err := decodeList(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
|
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
|
||||||
|
@ -166,11 +175,10 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres
|
||||||
for i := range keyLockers {
|
for i := range keyLockers {
|
||||||
if bytes.Equal(keyLockers[i], keyLocker) {
|
if bytes.Equal(keyLockers[i], keyLocker) {
|
||||||
if len(keyLockers) == 1 {
|
if len(keyLockers) == 1 {
|
||||||
// locker was all alone
|
updates = append(updates, keyValue{
|
||||||
err = bucketLockedContainer.Delete(k)
|
Key: k,
|
||||||
if err != nil {
|
Value: nil,
|
||||||
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
|
})
|
||||||
}
|
|
||||||
|
|
||||||
var id oid.ID
|
var id oid.ID
|
||||||
err = id.Decode(k)
|
err = id.Decode(k)
|
||||||
|
@ -192,11 +200,10 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres
|
||||||
return fmt.Errorf("encode updated list of lockers: %w", err)
|
return fmt.Errorf("encode updated list of lockers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the record
|
updates = append(updates, keyValue{
|
||||||
err = bucketLockedContainer.Put(k, v)
|
Key: k,
|
||||||
if err != nil {
|
Value: v,
|
||||||
return fmt.Errorf("update list of lockers: %w", err)
|
})
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -205,6 +212,33 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = applyBucketUpdates(bucketLockedContainer, updates); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return unlockedObjects, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyBucketUpdates(bucket *bbolt.Bucket, updates []keyValue) error {
|
||||||
|
for _, update := range updates {
|
||||||
|
if update.Value == nil {
|
||||||
|
err := bucket.Delete(update.Key)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err := bucket.Put(update.Key, update.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("update list of lockers: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsLockedPrm groups the parameters of IsLocked operation.
|
// IsLockedPrm groups the parameters of IsLocked operation.
|
||||||
|
|
Loading…
Reference in a new issue