forked from TrueCloudLab/frostfs-node
327 lines
8.4 KiB
Go
327 lines
8.4 KiB
Go
package meta
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.etcd.io/bbolt"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
var bucketNameLocked = []byte{lockedPrefix}
|
|
|
|
type keyValue struct {
|
|
Key []byte
|
|
Value []byte
|
|
}
|
|
|
|
// returns name of the bucket with objects of type LOCK for specified container.
|
|
func bucketNameLockers(idCnr cid.ID, key []byte) []byte {
|
|
return bucketName(idCnr, lockersPrefix, key)
|
|
}
|
|
|
|
// Lock marks objects as locked with another object. All objects are from the
|
|
// specified container.
|
|
//
|
|
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
|
//
|
|
// Locked list should be unique. Panics if it is empty.
|
|
func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
|
var (
|
|
startedAt = time.Now()
|
|
success = false
|
|
)
|
|
defer func() {
|
|
db.metrics.AddMethodDuration("Lock", time.Since(startedAt), success)
|
|
}()
|
|
|
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.Lock",
|
|
trace.WithAttributes(
|
|
attribute.String("container_id", cnr.EncodeToString()),
|
|
attribute.String("locker", locker.EncodeToString()),
|
|
attribute.Int("locked_count", len(locked)),
|
|
))
|
|
defer span.End()
|
|
|
|
db.modeMtx.RLock()
|
|
defer db.modeMtx.RUnlock()
|
|
|
|
if db.mode.NoMetabase() {
|
|
return ErrDegradedMode
|
|
} else if db.mode.ReadOnly() {
|
|
return ErrReadOnlyMode
|
|
}
|
|
|
|
if len(locked) == 0 {
|
|
panic("empty locked list")
|
|
}
|
|
|
|
err := db.lockInternal(locked, cnr, locker)
|
|
success = err == nil
|
|
return err
|
|
}
|
|
|
|
func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
|
|
bucketKeysLocked := make([][]byte, len(locked))
|
|
for i := range locked {
|
|
bucketKeysLocked[i] = objectKey(locked[i], make([]byte, objectKeySize))
|
|
}
|
|
key := make([]byte, cidSize)
|
|
|
|
return metaerr.Wrap(db.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular {
|
|
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
|
|
}
|
|
|
|
bucketLocked := tx.Bucket(bucketNameLocked)
|
|
|
|
cnr.Encode(key)
|
|
bucketLockedContainer, err := bucketLocked.CreateBucketIfNotExists(key)
|
|
if err != nil {
|
|
return fmt.Errorf("create container bucket for locked objects %v: %w", cnr, err)
|
|
}
|
|
|
|
keyLocker := objectKey(locker, key)
|
|
var exLockers [][]byte
|
|
var updLockers []byte
|
|
|
|
loop:
|
|
for i := range bucketKeysLocked {
|
|
exLockers, err = decodeList(bucketLockedContainer.Get(bucketKeysLocked[i]))
|
|
if err != nil {
|
|
return fmt.Errorf("decode list of object lockers: %w", err)
|
|
}
|
|
|
|
for i := range exLockers {
|
|
if bytes.Equal(exLockers[i], keyLocker) {
|
|
continue loop
|
|
}
|
|
}
|
|
|
|
updLockers, err = encodeList(append(exLockers, keyLocker))
|
|
if err != nil {
|
|
return fmt.Errorf("encode list of object lockers: %w", err)
|
|
}
|
|
|
|
err = bucketLockedContainer.Put(bucketKeysLocked[i], updLockers)
|
|
if err != nil {
|
|
return fmt.Errorf("update list of object lockers: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}))
|
|
}
|
|
|
|
// FreeLockedBy unlocks all objects in DB which are locked by lockers.
|
|
// Returns slice of unlocked object ID's or an error.
|
|
func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
|
|
var (
|
|
startedAt = time.Now()
|
|
success = false
|
|
)
|
|
defer func() {
|
|
db.metrics.AddMethodDuration("FreeLockedBy", time.Since(startedAt), success)
|
|
}()
|
|
|
|
db.modeMtx.RLock()
|
|
defer db.modeMtx.RUnlock()
|
|
|
|
if db.mode.NoMetabase() {
|
|
return nil, ErrDegradedMode
|
|
}
|
|
|
|
var unlockedObjects []oid.Address
|
|
|
|
if err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
for i := range lockers {
|
|
unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
unlockedObjects = append(unlockedObjects, unlocked...)
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return nil, metaerr.Wrap(err)
|
|
}
|
|
success = true
|
|
return unlockedObjects, nil
|
|
}
|
|
|
|
// checks if specified object is locked in the specified container.
|
|
func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
|
|
bucketLocked := tx.Bucket(bucketNameLocked)
|
|
if bucketLocked != nil {
|
|
key := make([]byte, cidSize)
|
|
idCnr.Encode(key)
|
|
bucketLockedContainer := bucketLocked.Bucket(key)
|
|
if bucketLockedContainer != nil {
|
|
return bucketLockedContainer.Get(objectKey(idObj, key)) != nil
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// releases all records about the objects locked by the locker.
|
|
// Returns slice of unlocked object ID's or an error.
|
|
//
|
|
// Operation is very resource-intensive, which is caused by the admissibility
|
|
// of multiple locks. Also, if we knew what objects are locked, it would be
|
|
// possible to speed up the execution.
|
|
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
|
|
var unlockedObjects []oid.Address
|
|
bucketLocked := tx.Bucket(bucketNameLocked)
|
|
if bucketLocked == nil {
|
|
return unlockedObjects, nil
|
|
}
|
|
|
|
key := make([]byte, cidSize)
|
|
idCnr.Encode(key)
|
|
|
|
bucketLockedContainer := bucketLocked.Bucket(key)
|
|
if bucketLockedContainer == nil {
|
|
return unlockedObjects, nil
|
|
}
|
|
|
|
keyLocker := objectKey(locker, key)
|
|
updates := make([]keyValue, 0)
|
|
err := bucketLockedContainer.ForEach(func(k, v []byte) error {
|
|
keyLockers, err := decodeList(v)
|
|
if err != nil {
|
|
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
|
|
}
|
|
|
|
for i := range keyLockers {
|
|
if bytes.Equal(keyLockers[i], keyLocker) {
|
|
if len(keyLockers) == 1 {
|
|
updates = append(updates, keyValue{
|
|
Key: k,
|
|
Value: nil,
|
|
})
|
|
|
|
var id oid.ID
|
|
err = id.Decode(k)
|
|
if err != nil {
|
|
return fmt.Errorf("decode unlocked object id error: %w", err)
|
|
}
|
|
|
|
var addr oid.Address
|
|
addr.SetContainer(idCnr)
|
|
addr.SetObject(id)
|
|
|
|
unlockedObjects = append(unlockedObjects, addr)
|
|
} else {
|
|
// exclude locker
|
|
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
|
|
|
|
v, err = encodeList(keyLockers)
|
|
if err != nil {
|
|
return fmt.Errorf("encode updated list of lockers: %w", err)
|
|
}
|
|
|
|
updates = append(updates, keyValue{
|
|
Key: k,
|
|
Value: v,
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = applyBucketUpdates(bucketLockedContainer, updates); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return unlockedObjects, nil
|
|
}
|
|
|
|
func applyBucketUpdates(bucket *bbolt.Bucket, updates []keyValue) error {
|
|
for _, update := range updates {
|
|
if update.Value == nil {
|
|
err := bucket.Delete(update.Key)
|
|
if err != nil {
|
|
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
|
|
}
|
|
} else {
|
|
err := bucket.Put(update.Key, update.Value)
|
|
if err != nil {
|
|
return fmt.Errorf("update list of lockers: %w", err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IsLockedPrm groups the parameters of IsLocked operation.
|
|
type IsLockedPrm struct {
|
|
addr oid.Address
|
|
}
|
|
|
|
// SetAddress sets object address that will be checked for lock relations.
|
|
func (i *IsLockedPrm) SetAddress(addr oid.Address) {
|
|
i.addr = addr
|
|
}
|
|
|
|
// IsLockedRes groups the resulting values of IsLocked operation.
|
|
type IsLockedRes struct {
|
|
locked bool
|
|
}
|
|
|
|
// Locked describes the requested object status according to the metabase
|
|
// current state.
|
|
func (i IsLockedRes) Locked() bool {
|
|
return i.locked
|
|
}
|
|
|
|
// IsLocked checks is the provided object is locked by any `LOCK`. Not found
|
|
// object is considered as non-locked.
|
|
//
|
|
// Returns only non-logical errors related to underlying database.
|
|
func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, err error) {
|
|
var (
|
|
startedAt = time.Now()
|
|
success = false
|
|
)
|
|
defer func() {
|
|
db.metrics.AddMethodDuration("IsLocked", time.Since(startedAt), success)
|
|
}()
|
|
|
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.IsLocked",
|
|
trace.WithAttributes(
|
|
attribute.String("address", prm.addr.EncodeToString()),
|
|
))
|
|
defer span.End()
|
|
|
|
db.modeMtx.RLock()
|
|
defer db.modeMtx.RUnlock()
|
|
|
|
if db.mode.NoMetabase() {
|
|
return res, ErrDegradedMode
|
|
}
|
|
err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
|
res.locked = objectLocked(tx, prm.addr.Container(), prm.addr.Object())
|
|
return nil
|
|
}))
|
|
success = err == nil
|
|
return res, err
|
|
}
|