diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index ac8fa9c6f..bccc34273 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -136,3 +136,52 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo }) return } + +func (e *StorageEngine) RemoveOrphanLocks(ctx context.Context, locked oid.Address) error { + _, span := tracing.StartSpanFromContext(ctx, "StorageEngine.RemoveOrphanLocks", + trace.WithAttributes( + attribute.String("address", locked.EncodeToString()), + )) + defer span.End() + + return e.execIfNotBlocked(func() (err error) { + e.iterateOverSortedShards(locked, func(_ int, hs hashedShard) (stop bool) { + var existsPrm shard.ExistsPrm + var existsRes shard.ExistsRes + + existsPrm.Address = locked + + existsRes, err = hs.Shard.Exists(ctx, existsPrm) + if err != nil { + return true + } + if existsRes.Exists() && existsRes.Locked() { + err = hs.Shard.RemoveOrphanLocks(ctx, locked, e.dropOrphanLocks) + return true + } + return + }) + return + }) +} + +func (e *StorageEngine) dropOrphanLocks(ctx context.Context, cnt cid.ID, locks []oid.ID) (locksToKeep []oid.ID) { + for _, lock := range locks { + var addr oid.Address + addr.SetContainer(cnt) + addr.SetObject(lock) + + e.iterateOverSortedShards(addr, func(_ int, hs hashedShard) (stop bool) { + var existsPrm shard.ExistsPrm + existsPrm.Address = addr + + existsRes, err := hs.Shard.Exists(ctx, existsPrm) + if err != nil || existsRes.Exists() { + locksToKeep = append(locksToKeep, lock) + return true + } + return false + }) + } + return +} diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index 6b78ef392..bbfda425f 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -3,6 +3,8 @@ package meta import ( "bytes" "context" + "crypto/sha256" + "errors" "fmt" "time" @@ -200,6 +202,39 @@ func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) { return lockers, nil } +func setLocked(tx *bbolt.Tx, cnt cid.ID, obj oid.ID, locks []oid.ID) error { + bucketLocked := tx.Bucket(bucketNameLocked) + if bucketLocked == nil { + return errors.New("expected locked bucket") + } + + rawCnt := make([]byte, sha256.Size) + cnt.Encode(rawCnt) + + rawObj := make([]byte, sha256.Size) + obj.Encode(rawObj) + + bucketLockedContainer := bucketLocked.Bucket(rawCnt) + if bucketLockedContainer == nil { + return errors.New("expected locked container bucket") + } + + var rawLocks [][]byte + for _, lock := range locks { + rawLock := make([]byte, sha256.Size) + lock.Encode(rawLock) + + rawLocks = append(rawLocks, rawLock) + } + + rawLockList, err := encodeList(rawLocks) + if err != nil { + return fmt.Errorf("can't decode list of locks: %w", err) + } + + return bucketLockedContainer.Put(rawObj, rawLockList) +} + // releases all records about the objects locked by the locker. // Returns slice of unlocked object ID's or an error. // @@ -383,3 +418,79 @@ func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, er success = err == nil return res, err } + +func (db *DB) RemoveOrphanLocks( + ctx context.Context, locked oid.Address, + dropOrphanLocks func(ctx context.Context, cnt cid.ID, locks []oid.ID) (locksToKeep []oid.ID), +) error { + _, span := tracing.StartSpanFromContext(ctx, "metabase.RemoveOrphanLocks", + trace.WithAttributes( + attribute.String("address", locked.EncodeToString()), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + switch { + case db.mode.NoMetabase(): + return ErrDegradedMode + case db.mode.ReadOnly(): + return ErrReadOnlyMode + default: + } + + cnt := locked.Container() + obj := locked.Object() + + return metaerr.Wrap(db.boltDB.Batch(func(tx *bbolt.Tx) error { + key := make([]byte, bucketKeySize) + bucket := tx.Bucket(primaryBucketName(cnt, key)) + if bucket == nil { + return errors.New("expected primary bucket") + } + + key = make([]byte, objectKeySize) + obj.Encode(key) + rawObject := bucket.Get(key) + if rawObject == nil { + return errors.New("such an object doesn't exists") + } + + var object objectSDK.Object + if err := object.Unmarshal(rawObject); err != nil { + return fmt.Errorf("can't unmarshal an object: %w", err) + } + + if parent, set := object.ParentID(); set { + if err := db.removeOrphanLocks(ctx, tx, cnt, parent, dropOrphanLocks); err != nil { + return err + } + } + return db.removeOrphanLocks(ctx, tx, cnt, obj, dropOrphanLocks) + })) +} + +func (db *DB) removeOrphanLocks( + ctx context.Context, tx *bbolt.Tx, cnt cid.ID, obj oid.ID, + dropOrphanLocks func(ctx context.Context, cnt cid.ID, locks []oid.ID) (locksToKeep []oid.ID), +) error { + locks, err := getLocked(tx, cnt, obj) + if err != nil { + return fmt.Errorf("can't get locks: %w", err) + } + if len(locks) == 0 { + return nil + } + + newLocks := dropOrphanLocks(ctx, cnt, locks) + if len(newLocks) == len(locks) { + return nil + } + + if err = setLocked(tx, cnt, obj, newLocks); err != nil { + return fmt.Errorf("can't set locks: %w", err) + } + + return nil +} diff --git a/pkg/local_object_storage/shard/lock.go b/pkg/local_object_storage/shard/lock.go index 4a8d89d63..17904d2bc 100644 --- a/pkg/local_object_storage/shard/lock.go +++ b/pkg/local_object_storage/shard/lock.go @@ -88,3 +88,24 @@ func (s *Shard) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, erro } return s.metaBase.GetLocked(ctx, addr) } + +func (s *Shard) RemoveOrphanLocks( + ctx context.Context, locked oid.Address, + dropOrphanLocks func(ctx context.Context, cnt cid.ID, locks []oid.ID) (locksToKeep []oid.ID), +) error { + _, span := tracing.StartSpanFromContext(ctx, "Shard.RemoveOrphanLocks", + trace.WithAttributes( + attribute.String("address", locked.EncodeToString()), + )) + defer span.End() + + switch m := s.GetMode(); { + case m.NoMetabase(): + return ErrDegradedMode + case m.ReadOnly(): + return ErrReadOnlyMode + default: + } + + return s.metaBase.RemoveOrphanLocks(ctx, locked, dropOrphanLocks) +}