[#9999] metabase: Fix db engine to pebble in lock.go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
b12c1f1b23
commit
4bac5644f1
1 changed files with 104 additions and 156 deletions
|
@ -3,7 +3,7 @@ package meta
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
|
@ -14,23 +14,15 @@ import (
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/cockroachdb/pebble"
|
"github.com/cockroachdb/pebble"
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
var bucketNameLocked = []byte{lockedPrefix}
|
|
||||||
|
|
||||||
type keyValue struct {
|
type keyValue struct {
|
||||||
Key []byte
|
Key []byte
|
||||||
Value []byte
|
Value []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns name of the bucket with objects of type LOCK for specified container.
|
|
||||||
func bucketNameLockers(idCnr cid.ID, key []byte) []byte {
|
|
||||||
return bucketName(idCnr, lockersPrefix, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lock marks objects as locked with another object. All objects are from the
|
// Lock marks objects as locked with another object. All objects are from the
|
||||||
// specified container.
|
// specified container.
|
||||||
//
|
//
|
||||||
|
@ -67,66 +59,45 @@ func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.
|
||||||
panic("empty locked list")
|
panic("empty locked list")
|
||||||
}
|
}
|
||||||
|
|
||||||
err := db.lockInternal(locked, cnr, locker)
|
defer db.guard.LockContainerID(cnr)()
|
||||||
|
|
||||||
|
err := db.batch(func(b *pebble.Batch) error {
|
||||||
|
return lockInternal(b, locked, cnr, locker)
|
||||||
|
})
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
|
func lockInternal(b *pebble.Batch, locked []oid.ID, cnr cid.ID, locker oid.ID) error {
|
||||||
bucketKeysLocked := make([][]byte, len(locked))
|
t, err := firstIrregularObjectType(b, cnr, locked...)
|
||||||
for i := range locked {
|
if err != nil {
|
||||||
bucketKeysLocked[i] = objectKey(locked[i], make([]byte, objectKeySize))
|
return err
|
||||||
}
|
}
|
||||||
key := make([]byte, cidSize)
|
if t != objectSDK.TypeRegular {
|
||||||
|
|
||||||
return metaerr.Wrap(db.database.Update(func(tx *bbolt.Tx) error {
|
|
||||||
if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular {
|
|
||||||
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
|
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
for _, objID := range locked {
|
||||||
|
key := lockedKey(cnr, objID, locker)
|
||||||
cnr.Encode(key)
|
v, err := valueSafe(b, key)
|
||||||
bucketLockedContainer, err := bucketLocked.CreateBucketIfNotExists(key)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("create container bucket for locked objects %v: %w", cnr, err)
|
return err
|
||||||
|
}
|
||||||
|
if v != nil {
|
||||||
|
// already locked by locker
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
keyLocker := objectKey(locker, key)
|
if err := b.Set(key, zeroValue, pebble.Sync); err != nil {
|
||||||
var exLockers [][]byte
|
return err
|
||||||
var updLockers []byte
|
|
||||||
|
|
||||||
loop:
|
|
||||||
for i := range bucketKeysLocked {
|
|
||||||
exLockers, err = decodeList(bucketLockedContainer.Get(bucketKeysLocked[i]))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("decode list of object lockers: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range exLockers {
|
|
||||||
if bytes.Equal(exLockers[i], keyLocker) {
|
|
||||||
continue loop
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
updLockers, err = encodeList(append(exLockers, keyLocker))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("encode list of object lockers: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = bucketLockedContainer.Put(bucketKeysLocked[i], updLockers)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("update list of object lockers: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// FreeLockedBy unlocks all objects in DB which are locked by lockers.
|
// FreeLockedBy unlocks all objects in DB which are locked by lockers.
|
||||||
// Returns slice of unlocked object ID's or an error.
|
// Returns slice of unlocked object ID's or an error.
|
||||||
func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
|
func (db *DB) FreeLockedBy(ctx context.Context, lockers []oid.Address) ([]oid.Address, error) {
|
||||||
var (
|
var (
|
||||||
startedAt = time.Now()
|
startedAt = time.Now()
|
||||||
success = false
|
success = false
|
||||||
|
@ -142,11 +113,17 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
|
||||||
return nil, ErrDegradedMode
|
return nil, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var containerIDs []cid.ID
|
||||||
|
for _, a := range lockers {
|
||||||
|
containerIDs = append(containerIDs, a.Container())
|
||||||
|
}
|
||||||
|
defer db.guard.LockContainerIDs(containerIDs)()
|
||||||
|
|
||||||
var unlockedObjects []oid.Address
|
var unlockedObjects []oid.Address
|
||||||
|
|
||||||
if err := db.database.Update(func(tx *bbolt.Tx) error {
|
if err := db.batch(func(b *pebble.Batch) error {
|
||||||
for i := range lockers {
|
for i := range lockers {
|
||||||
unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
|
unlocked, err := freePotentialLocks(ctx, b, lockers[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -164,6 +141,7 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
|
||||||
// checks if specified object is locked in the specified container.
|
// checks if specified object is locked in the specified container.
|
||||||
func objectLocked(ctx context.Context, r pebble.Reader, idCnr cid.ID, idObj oid.ID) (bool, error) {
|
func objectLocked(ctx context.Context, r pebble.Reader, idCnr cid.ID, idObj oid.ID) (bool, error) {
|
||||||
prefix := lockedKeyLongPrefix(idCnr, idObj)
|
prefix := lockedKeyLongPrefix(idCnr, idObj)
|
||||||
|
|
||||||
items, err := selectByPrefixBatch(ctx, r, prefix, 1)
|
items, err := selectByPrefixBatch(ctx, r, prefix, 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
@ -172,27 +150,27 @@ func objectLocked(ctx context.Context, r pebble.Reader, idCnr cid.ID, idObj oid.
|
||||||
}
|
}
|
||||||
|
|
||||||
// return `LOCK` id's if specified object is locked in the specified container.
|
// return `LOCK` id's if specified object is locked in the specified container.
|
||||||
func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
|
func getLocked(ctx context.Context, r pebble.Reader, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
|
||||||
|
prefix := lockedKeyLongPrefix(idCnr, idObj)
|
||||||
|
|
||||||
var lockers []oid.ID
|
var lockers []oid.ID
|
||||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
for {
|
||||||
if bucketLocked != nil {
|
items, err := selectByPrefixBatch(ctx, r, prefix, batchSize)
|
||||||
key := make([]byte, cidSize)
|
|
||||||
idCnr.Encode(key)
|
|
||||||
bucketLockedContainer := bucketLocked.Bucket(key)
|
|
||||||
if bucketLockedContainer != nil {
|
|
||||||
binObjIDs, err := decodeList(bucketLockedContainer.Get(objectKey(idObj, key)))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("decode list of object lockers: %w", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, binObjID := range binObjIDs {
|
for _, it := range items {
|
||||||
var id oid.ID
|
id, err := lockerObjectIDFromLockedKey(it)
|
||||||
if err = id.Decode(binObjID); err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
lockers = append(lockers, id)
|
lockers = append(lockers, id)
|
||||||
}
|
}
|
||||||
|
if len(items) < batchSize {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return lockers, nil
|
return lockers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,95 +180,65 @@ func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
|
||||||
// Operation is very resource-intensive, which is caused by the admissibility
|
// Operation is very resource-intensive, which is caused by the admissibility
|
||||||
// of multiple locks. Also, if we knew what objects are locked, it would be
|
// of multiple locks. Also, if we knew what objects are locked, it would be
|
||||||
// possible to speed up the execution.
|
// possible to speed up the execution.
|
||||||
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
|
func freePotentialLocks(ctx context.Context, b *pebble.Batch, locker oid.Address) ([]oid.Address, error) {
|
||||||
var unlockedObjects []oid.Address
|
var unlockedObjects []oid.Address
|
||||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
|
||||||
if bucketLocked == nil {
|
|
||||||
return unlockedObjects, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
key := make([]byte, cidSize)
|
locked, err := lockedObjects(b, locker)
|
||||||
idCnr.Encode(key)
|
|
||||||
|
|
||||||
bucketLockedContainer := bucketLocked.Bucket(key)
|
|
||||||
if bucketLockedContainer == nil {
|
|
||||||
return unlockedObjects, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
keyLocker := objectKey(locker, key)
|
|
||||||
updates := make([]keyValue, 0)
|
|
||||||
err := bucketLockedContainer.ForEach(func(k, v []byte) error {
|
|
||||||
keyLockers, err := decodeList(v)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range keyLockers {
|
for _, lockedObject := range locked {
|
||||||
if bytes.Equal(keyLockers[i], keyLocker) {
|
select {
|
||||||
if len(keyLockers) == 1 {
|
case <-ctx.Done():
|
||||||
updates = append(updates, keyValue{
|
return nil, ctx.Err()
|
||||||
Key: k,
|
default:
|
||||||
Value: nil,
|
}
|
||||||
})
|
|
||||||
|
|
||||||
var id oid.ID
|
if err := b.Delete(lockedKey(locker.Container(), lockedObject, locker.Object()), pebble.Sync); err != nil {
|
||||||
err = id.Decode(k)
|
return nil, err
|
||||||
|
}
|
||||||
|
isLocked, err := objectLocked(ctx, b, locker.Container(), lockedObject)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decode unlocked object id error: %w", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !isLocked { // deleted locker was the last one
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(idCnr)
|
addr.SetContainer(locker.Container())
|
||||||
addr.SetObject(id)
|
addr.SetObject(lockedObject)
|
||||||
|
|
||||||
unlockedObjects = append(unlockedObjects, addr)
|
unlockedObjects = append(unlockedObjects, addr)
|
||||||
} else {
|
|
||||||
// exclude locker
|
|
||||||
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
|
|
||||||
|
|
||||||
v, err = encodeList(keyLockers)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("encode updated list of lockers: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
updates = append(updates, keyValue{
|
|
||||||
Key: k,
|
|
||||||
Value: v,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = applyBucketUpdates(bucketLockedContainer, updates); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return unlockedObjects, nil
|
return unlockedObjects, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyBucketUpdates(bucket *bbolt.Bucket, updates []keyValue) error {
|
func lockedObjects(r pebble.Reader, locker oid.Address) ([]oid.ID, error) {
|
||||||
for _, update := range updates {
|
var lockedByLocker []oid.ID
|
||||||
if update.Value == nil {
|
|
||||||
err := bucket.Delete(update.Key)
|
prefix := lockedKeyShortPrefix(locker.Container())
|
||||||
|
it, err := r.NewIter(&pebble.IterOptions{
|
||||||
|
LowerBound: prefix,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
err := bucket.Put(update.Key, update.Value)
|
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||||
|
currentLockerObjID, err := lockerObjectIDFromLockedKey(it.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("update list of lockers: %w", err)
|
return nil, errors.Join(err, it.Close())
|
||||||
}
|
}
|
||||||
|
if !currentLockerObjID.Equals(locker.Object()) {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
currentObjectID, err := objectIDFromLockedKey(it.Key())
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Join(err, it.Close())
|
||||||
}
|
}
|
||||||
return nil
|
lockedByLocker = append(lockedByLocker, currentObjectID)
|
||||||
|
}
|
||||||
|
return lockedByLocker, it.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsLockedPrm groups the parameters of IsLocked operation.
|
// IsLockedPrm groups the parameters of IsLocked operation.
|
||||||
|
@ -339,9 +287,10 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e
|
||||||
if db.mode.NoMetabase() {
|
if db.mode.NoMetabase() {
|
||||||
return res, ErrDegradedMode
|
return res, ErrDegradedMode
|
||||||
}
|
}
|
||||||
err = metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
err = metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
res.locked = objectLocked(tx, prm.addr.Container(), prm.addr.Object())
|
var e error
|
||||||
return nil
|
res.locked, e = objectLocked(ctx, s, prm.addr.Container(), prm.addr.Object())
|
||||||
|
return e
|
||||||
}))
|
}))
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return res, err
|
return res, err
|
||||||
|
@ -372,15 +321,14 @@ func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, er
|
||||||
if db.mode.NoMetabase() {
|
if db.mode.NoMetabase() {
|
||||||
return res, ErrDegradedMode
|
return res, ErrDegradedMode
|
||||||
}
|
}
|
||||||
err = metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
err = metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
res, err = getLocked(tx, addr.Container(), addr.Object())
|
res, err = getLocked(ctx, s, addr.Container(), addr.Object())
|
||||||
return nil
|
return nil
|
||||||
}))
|
}))
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// return true if provided object is of LOCK type.
|
|
||||||
func isLockObject(r pebble.Reader, idCnr cid.ID, obj oid.ID) (bool, error) {
|
func isLockObject(r pebble.Reader, idCnr cid.ID, obj oid.ID) (bool, error) {
|
||||||
key := lockersKey(idCnr, obj)
|
key := lockersKey(idCnr, obj)
|
||||||
v, err := valueSafe(r, key)
|
v, err := valueSafe(r, key)
|
||||||
|
|
Loading…
Reference in a new issue