diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index 3f08d5ec4..58e6fa7cd 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -3,7 +3,7 @@ package meta import ( "bytes" "context" - "fmt" + "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" @@ -14,23 +14,15 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/cockroachdb/pebble" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) -var bucketNameLocked = []byte{lockedPrefix} - type keyValue struct { Key []byte Value []byte } -// returns name of the bucket with objects of type LOCK for specified container. -func bucketNameLockers(idCnr cid.ID, key []byte) []byte { - return bucketName(idCnr, lockersPrefix, key) -} - // Lock marks objects as locked with another object. All objects are from the // specified container. // @@ -67,66 +59,45 @@ func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid. panic("empty locked list") } - err := db.lockInternal(locked, cnr, locker) + defer db.guard.LockContainerID(cnr)() + + err := db.batch(func(b *pebble.Batch) error { + return lockInternal(b, locked, cnr, locker) + }) success = err == nil return err } -func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error { - bucketKeysLocked := make([][]byte, len(locked)) - for i := range locked { - bucketKeysLocked[i] = objectKey(locked[i], make([]byte, objectKeySize)) +func lockInternal(b *pebble.Batch, locked []oid.ID, cnr cid.ID, locker oid.ID) error { + t, err := firstIrregularObjectType(b, cnr, locked...) + if err != nil { + return err + } + if t != objectSDK.TypeRegular { + return logicerr.Wrap(new(apistatus.LockNonRegularObject)) } - key := make([]byte, cidSize) - return metaerr.Wrap(db.database.Update(func(tx *bbolt.Tx) error { - if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular { - return logicerr.Wrap(new(apistatus.LockNonRegularObject)) - } - - bucketLocked := tx.Bucket(bucketNameLocked) - - cnr.Encode(key) - bucketLockedContainer, err := bucketLocked.CreateBucketIfNotExists(key) + for _, objID := range locked { + key := lockedKey(cnr, objID, locker) + v, err := valueSafe(b, key) if err != nil { - return fmt.Errorf("create container bucket for locked objects %v: %w", cnr, err) + return err + } + if v != nil { + // already locked by locker + continue } - keyLocker := objectKey(locker, key) - var exLockers [][]byte - var updLockers []byte - - loop: - for i := range bucketKeysLocked { - exLockers, err = decodeList(bucketLockedContainer.Get(bucketKeysLocked[i])) - if err != nil { - return fmt.Errorf("decode list of object lockers: %w", err) - } - - for i := range exLockers { - if bytes.Equal(exLockers[i], keyLocker) { - continue loop - } - } - - updLockers, err = encodeList(append(exLockers, keyLocker)) - if err != nil { - return fmt.Errorf("encode list of object lockers: %w", err) - } - - err = bucketLockedContainer.Put(bucketKeysLocked[i], updLockers) - if err != nil { - return fmt.Errorf("update list of object lockers: %w", err) - } + if err := b.Set(key, zeroValue, pebble.Sync); err != nil { + return err } - - return nil - })) + } + return nil } // FreeLockedBy unlocks all objects in DB which are locked by lockers. // Returns slice of unlocked object ID's or an error. -func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { +func (db *DB) FreeLockedBy(ctx context.Context, lockers []oid.Address) ([]oid.Address, error) { var ( startedAt = time.Now() success = false @@ -142,11 +113,17 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { return nil, ErrDegradedMode } + var containerIDs []cid.ID + for _, a := range lockers { + containerIDs = append(containerIDs, a.Container()) + } + defer db.guard.LockContainerIDs(containerIDs)() + var unlockedObjects []oid.Address - if err := db.database.Update(func(tx *bbolt.Tx) error { + if err := db.batch(func(b *pebble.Batch) error { for i := range lockers { - unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object()) + unlocked, err := freePotentialLocks(ctx, b, lockers[i]) if err != nil { return err } @@ -164,6 +141,7 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { // checks if specified object is locked in the specified container. func objectLocked(ctx context.Context, r pebble.Reader, idCnr cid.ID, idObj oid.ID) (bool, error) { prefix := lockedKeyLongPrefix(idCnr, idObj) + items, err := selectByPrefixBatch(ctx, r, prefix, 1) if err != nil { return false, err @@ -172,27 +150,27 @@ func objectLocked(ctx context.Context, r pebble.Reader, idCnr cid.ID, idObj oid. } // return `LOCK` id's if specified object is locked in the specified container. -func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) { +func getLocked(ctx context.Context, r pebble.Reader, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) { + prefix := lockedKeyLongPrefix(idCnr, idObj) + var lockers []oid.ID - bucketLocked := tx.Bucket(bucketNameLocked) - if bucketLocked != nil { - key := make([]byte, cidSize) - idCnr.Encode(key) - bucketLockedContainer := bucketLocked.Bucket(key) - if bucketLockedContainer != nil { - binObjIDs, err := decodeList(bucketLockedContainer.Get(objectKey(idObj, key))) + for { + items, err := selectByPrefixBatch(ctx, r, prefix, batchSize) + if err != nil { + return nil, err + } + for _, it := range items { + id, err := lockerObjectIDFromLockedKey(it) if err != nil { - return nil, fmt.Errorf("decode list of object lockers: %w", err) - } - for _, binObjID := range binObjIDs { - var id oid.ID - if err = id.Decode(binObjID); err != nil { - return nil, err - } - lockers = append(lockers, id) + return nil, err } + lockers = append(lockers, id) + } + if len(items) < batchSize { + break } } + return lockers, nil } @@ -202,95 +180,65 @@ func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) { // Operation is very resource-intensive, which is caused by the admissibility // of multiple locks. Also, if we knew what objects are locked, it would be // possible to speed up the execution. -func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) { +func freePotentialLocks(ctx context.Context, b *pebble.Batch, locker oid.Address) ([]oid.Address, error) { var unlockedObjects []oid.Address - bucketLocked := tx.Bucket(bucketNameLocked) - if bucketLocked == nil { - return unlockedObjects, nil - } - key := make([]byte, cidSize) - idCnr.Encode(key) - - bucketLockedContainer := bucketLocked.Bucket(key) - if bucketLockedContainer == nil { - return unlockedObjects, nil - } - - keyLocker := objectKey(locker, key) - updates := make([]keyValue, 0) - err := bucketLockedContainer.ForEach(func(k, v []byte) error { - keyLockers, err := decodeList(v) - if err != nil { - return fmt.Errorf("decode list of lockers in locked bucket: %w", err) - } - - for i := range keyLockers { - if bytes.Equal(keyLockers[i], keyLocker) { - if len(keyLockers) == 1 { - updates = append(updates, keyValue{ - Key: k, - Value: nil, - }) - - var id oid.ID - err = id.Decode(k) - if err != nil { - return fmt.Errorf("decode unlocked object id error: %w", err) - } - - var addr oid.Address - addr.SetContainer(idCnr) - addr.SetObject(id) - - unlockedObjects = append(unlockedObjects, addr) - } else { - // exclude locker - keyLockers = append(keyLockers[:i], keyLockers[i+1:]...) - - v, err = encodeList(keyLockers) - if err != nil { - return fmt.Errorf("encode updated list of lockers: %w", err) - } - - updates = append(updates, keyValue{ - Key: k, - Value: v, - }) - } - - return nil - } - } - - return nil - }) + locked, err := lockedObjects(b, locker) if err != nil { return nil, err } - if err = applyBucketUpdates(bucketLockedContainer, updates); err != nil { - return nil, err + for _, lockedObject := range locked { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + if err := b.Delete(lockedKey(locker.Container(), lockedObject, locker.Object()), pebble.Sync); err != nil { + return nil, err + } + isLocked, err := objectLocked(ctx, b, locker.Container(), lockedObject) + if err != nil { + return nil, err + } + if !isLocked { // deleted locker was the last one + var addr oid.Address + addr.SetContainer(locker.Container()) + addr.SetObject(lockedObject) + unlockedObjects = append(unlockedObjects, addr) + } } return unlockedObjects, nil } -func applyBucketUpdates(bucket *bbolt.Bucket, updates []keyValue) error { - for _, update := range updates { - if update.Value == nil { - err := bucket.Delete(update.Key) - if err != nil { - return fmt.Errorf("delete locked object record from locked bucket: %w", err) - } - } else { - err := bucket.Put(update.Key, update.Value) - if err != nil { - return fmt.Errorf("update list of lockers: %w", err) - } - } +func lockedObjects(r pebble.Reader, locker oid.Address) ([]oid.ID, error) { + var lockedByLocker []oid.ID + + prefix := lockedKeyShortPrefix(locker.Container()) + it, err := r.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + }) + if err != nil { + return nil, err } - return nil + + for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { + currentLockerObjID, err := lockerObjectIDFromLockedKey(it.Key()) + if err != nil { + return nil, errors.Join(err, it.Close()) + } + if !currentLockerObjID.Equals(locker.Object()) { + continue + } + currentObjectID, err := objectIDFromLockedKey(it.Key()) + if err != nil { + return nil, errors.Join(err, it.Close()) + } + lockedByLocker = append(lockedByLocker, currentObjectID) + } + return lockedByLocker, it.Close() } // IsLockedPrm groups the parameters of IsLocked operation. @@ -339,9 +287,10 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e if db.mode.NoMetabase() { return res, ErrDegradedMode } - err = metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { - res.locked = objectLocked(tx, prm.addr.Container(), prm.addr.Object()) - return nil + err = metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error { + var e error + res.locked, e = objectLocked(ctx, s, prm.addr.Container(), prm.addr.Object()) + return e })) success = err == nil return res, err @@ -372,15 +321,14 @@ func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, er if db.mode.NoMetabase() { return res, ErrDegradedMode } - err = metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { - res, err = getLocked(tx, addr.Container(), addr.Object()) + err = metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error { + res, err = getLocked(ctx, s, addr.Container(), addr.Object()) return nil })) success = err == nil return res, err } -// return true if provided object is of LOCK type. func isLockObject(r pebble.Reader, idCnr cid.ID, obj oid.ID) (bool, error) { key := lockersKey(idCnr, obj) v, err := valueSafe(r, key)