diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 8dcb1fca..4b95a32f 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -148,6 +148,13 @@ func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) { if targetIsTomb { continue } + } else { + // garbage object can probably lock some objects, so they should become + // unlocked after its decay + err = freePotentialLocks(tx, *prm.target[i].ContainerID(), *prm.target[i].ObjectID()) + if err != nil { + return fmt.Errorf("free potential locks: %w", err) + } } // consider checking if target is already in graveyard? diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index 4c02100c..60a95788 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -103,3 +103,56 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool { return false } + +// releases all records about the objects locked by the locker. +// +// Operation is very resource-intensive, which is caused by the admissibility +// of multiple locks. Also, if we knew what objects are locked, it would be +// possible to speed up the execution. +func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error { + bucketLocked := tx.Bucket(bucketNameLocked) + if bucketLocked != nil { + bucketLockedContainer := bucketLocked.Bucket([]byte(idCnr.String())) + if bucketLockedContainer != nil { + keyLocker := objectKey(&locker) + return bucketLockedContainer.ForEach(func(k, v []byte) error { + keyLockers, err := decodeList(v) + if err != nil { + return fmt.Errorf("decode list of lockers in locked bucket: %w", err) + } + + for i := range keyLockers { + if bytes.Equal(keyLockers[i], keyLocker) { + if len(keyLockers) == 1 { + // locker was all alone + err = bucketLockedContainer.Delete(k) + if err != nil { + return fmt.Errorf("delete locked object record from locked bucket: %w", err) + } + } else { + // exclude locker + keyLockers = append(keyLockers[:i], keyLockers[i+1:]...) + + v, err = encodeList(append(keyLockers[:i], keyLockers[i+1:]...)) + if err != nil { + return fmt.Errorf("encode updated list of lockers: %w", err) + } + + // update the record + err = bucketLockedContainer.Put(k, v) + if err != nil { + return fmt.Errorf("update list of lockers: %w", err) + } + } + + return nil + } + } + + return nil + }) + } + } + + return nil +}