From ab32067152352c54016de4a256fa83de2d598377 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 28 Mar 2023 18:49:13 +0300 Subject: [PATCH] [#183] gc: Fix drop expired locked complex objects Do not delete bucket keys during iteration Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/metabase/lock.go | 60 ++++++++++++++++++----- 1 file changed, 47 insertions(+), 13 deletions(-) diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index 6850cf61..2dcc85d4 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -14,6 +14,11 @@ import ( var bucketNameLocked = []byte{lockedPrefix} +type keyValue struct { + Key []byte + Value []byte +} + // returns name of the bucket with objects of type LOCK for specified container. func bucketNameLockers(idCnr cid.ID, key []byte) []byte { return bucketName(idCnr, lockersPrefix, key) @@ -107,7 +112,7 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { var unlockedObjects []oid.Address - return unlockedObjects, db.boltDB.Update(func(tx *bbolt.Tx) error { + if err := db.boltDB.Update(func(tx *bbolt.Tx) error { for i := range lockers { unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object()) if err != nil { @@ -117,7 +122,10 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { } return nil - }) + }); err != nil { + return nil, err + } + return unlockedObjects, nil } // checks if specified object is locked in the specified container. @@ -157,7 +165,8 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres } keyLocker := objectKey(locker, key) - return unlockedObjects, bucketLockedContainer.ForEach(func(k, v []byte) error { + updates := make([]keyValue, 0) + err := bucketLockedContainer.ForEach(func(k, v []byte) error { keyLockers, err := decodeList(v) if err != nil { return fmt.Errorf("decode list of lockers in locked bucket: %w", err) @@ -166,11 +175,10 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres for i := range keyLockers { if bytes.Equal(keyLockers[i], keyLocker) { if len(keyLockers) == 1 { - // locker was all alone - err = bucketLockedContainer.Delete(k) - if err != nil { - return fmt.Errorf("delete locked object record from locked bucket: %w", err) - } + updates = append(updates, keyValue{ + Key: k, + Value: nil, + }) var id oid.ID err = id.Decode(k) @@ -192,11 +200,10 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres return fmt.Errorf("encode updated list of lockers: %w", err) } - // update the record - err = bucketLockedContainer.Put(k, v) - if err != nil { - return fmt.Errorf("update list of lockers: %w", err) - } + updates = append(updates, keyValue{ + Key: k, + Value: v, + }) } return nil @@ -205,6 +212,33 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres return nil }) + + if err != nil { + return nil, err + } + + if err = applyBucketUpdates(bucketLockedContainer, updates); err != nil { + return nil, err + } + + return unlockedObjects, nil +} + +func applyBucketUpdates(bucket *bbolt.Bucket, updates []keyValue) error { + for _, update := range updates { + if update.Value == nil { + err := bucket.Delete(update.Key) + if err != nil { + return fmt.Errorf("delete locked object record from locked bucket: %w", err) + } + } else { + err := bucket.Put(update.Key, update.Value) + if err != nil { + return fmt.Errorf("update list of lockers: %w", err) + } + } + } + return nil } // IsLockedPrm groups the parameters of IsLocked operation.