[#1445] local_object_storage: Add RemoveOrphanLocks
method
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
85bc336216
commit
4e316367f1
3 changed files with 181 additions and 0 deletions
|
@ -136,3 +136,52 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
|||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (e *StorageEngine) RemoveOrphanLocks(ctx context.Context, locked oid.Address) error {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "StorageEngine.RemoveOrphanLocks",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", locked.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
return e.execIfNotBlocked(func() (err error) {
|
||||
e.iterateOverSortedShards(locked, func(_ int, hs hashedShard) (stop bool) {
|
||||
var existsPrm shard.ExistsPrm
|
||||
var existsRes shard.ExistsRes
|
||||
|
||||
existsPrm.Address = locked
|
||||
|
||||
existsRes, err = hs.Shard.Exists(ctx, existsPrm)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
if existsRes.Exists() && existsRes.Locked() {
|
||||
err = hs.Shard.RemoveOrphanLocks(ctx, locked, e.dropOrphanLocks)
|
||||
return true
|
||||
}
|
||||
return
|
||||
})
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
func (e *StorageEngine) dropOrphanLocks(ctx context.Context, cnt cid.ID, locks []oid.ID) (locksToKeep []oid.ID) {
|
||||
for _, lock := range locks {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnt)
|
||||
addr.SetObject(lock)
|
||||
|
||||
e.iterateOverSortedShards(addr, func(_ int, hs hashedShard) (stop bool) {
|
||||
var existsPrm shard.ExistsPrm
|
||||
existsPrm.Address = addr
|
||||
|
||||
existsRes, err := hs.Shard.Exists(ctx, existsPrm)
|
||||
if err != nil || existsRes.Exists() {
|
||||
locksToKeep = append(locksToKeep, lock)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -3,6 +3,8 @@ package meta
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
|
@ -200,6 +202,39 @@ func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
|
|||
return lockers, nil
|
||||
}
|
||||
|
||||
func setLocked(tx *bbolt.Tx, cnt cid.ID, obj oid.ID, locks []oid.ID) error {
|
||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||
if bucketLocked == nil {
|
||||
return errors.New("expected locked bucket")
|
||||
}
|
||||
|
||||
rawCnt := make([]byte, sha256.Size)
|
||||
cnt.Encode(rawCnt)
|
||||
|
||||
rawObj := make([]byte, sha256.Size)
|
||||
obj.Encode(rawObj)
|
||||
|
||||
bucketLockedContainer := bucketLocked.Bucket(rawCnt)
|
||||
if bucketLockedContainer == nil {
|
||||
return errors.New("expected locked container bucket")
|
||||
}
|
||||
|
||||
var rawLocks [][]byte
|
||||
for _, lock := range locks {
|
||||
rawLock := make([]byte, sha256.Size)
|
||||
lock.Encode(rawLock)
|
||||
|
||||
rawLocks = append(rawLocks, rawLock)
|
||||
}
|
||||
|
||||
rawLockList, err := encodeList(rawLocks)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't decode list of locks: %w", err)
|
||||
}
|
||||
|
||||
return bucketLockedContainer.Put(rawObj, rawLockList)
|
||||
}
|
||||
|
||||
// releases all records about the objects locked by the locker.
|
||||
// Returns slice of unlocked object ID's or an error.
|
||||
//
|
||||
|
@ -383,3 +418,79 @@ func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, er
|
|||
success = err == nil
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (db *DB) RemoveOrphanLocks(
|
||||
ctx context.Context, locked oid.Address,
|
||||
dropOrphanLocks func(ctx context.Context, cnt cid.ID, locks []oid.ID) (locksToKeep []oid.ID),
|
||||
) error {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.RemoveOrphanLocks",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", locked.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
switch {
|
||||
case db.mode.NoMetabase():
|
||||
return ErrDegradedMode
|
||||
case db.mode.ReadOnly():
|
||||
return ErrReadOnlyMode
|
||||
default:
|
||||
}
|
||||
|
||||
cnt := locked.Container()
|
||||
obj := locked.Object()
|
||||
|
||||
return metaerr.Wrap(db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
key := make([]byte, bucketKeySize)
|
||||
bucket := tx.Bucket(primaryBucketName(cnt, key))
|
||||
if bucket == nil {
|
||||
return errors.New("expected primary bucket")
|
||||
}
|
||||
|
||||
key = make([]byte, objectKeySize)
|
||||
obj.Encode(key)
|
||||
rawObject := bucket.Get(key)
|
||||
if rawObject == nil {
|
||||
return errors.New("such an object doesn't exists")
|
||||
}
|
||||
|
||||
var object objectSDK.Object
|
||||
if err := object.Unmarshal(rawObject); err != nil {
|
||||
return fmt.Errorf("can't unmarshal an object: %w", err)
|
||||
}
|
||||
|
||||
if parent, set := object.ParentID(); set {
|
||||
if err := db.removeOrphanLocks(ctx, tx, cnt, parent, dropOrphanLocks); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return db.removeOrphanLocks(ctx, tx, cnt, obj, dropOrphanLocks)
|
||||
}))
|
||||
}
|
||||
|
||||
func (db *DB) removeOrphanLocks(
|
||||
ctx context.Context, tx *bbolt.Tx, cnt cid.ID, obj oid.ID,
|
||||
dropOrphanLocks func(ctx context.Context, cnt cid.ID, locks []oid.ID) (locksToKeep []oid.ID),
|
||||
) error {
|
||||
locks, err := getLocked(tx, cnt, obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't get locks: %w", err)
|
||||
}
|
||||
if len(locks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
newLocks := dropOrphanLocks(ctx, cnt, locks)
|
||||
if len(newLocks) == len(locks) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err = setLocked(tx, cnt, obj, newLocks); err != nil {
|
||||
return fmt.Errorf("can't set locks: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -88,3 +88,24 @@ func (s *Shard) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, erro
|
|||
}
|
||||
return s.metaBase.GetLocked(ctx, addr)
|
||||
}
|
||||
|
||||
func (s *Shard) RemoveOrphanLocks(
|
||||
ctx context.Context, locked oid.Address,
|
||||
dropOrphanLocks func(ctx context.Context, cnt cid.ID, locks []oid.ID) (locksToKeep []oid.ID),
|
||||
) error {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "Shard.RemoveOrphanLocks",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", locked.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
switch m := s.GetMode(); {
|
||||
case m.NoMetabase():
|
||||
return ErrDegradedMode
|
||||
case m.ReadOnly():
|
||||
return ErrReadOnlyMode
|
||||
default:
|
||||
}
|
||||
|
||||
return s.metaBase.RemoveOrphanLocks(ctx, locked, dropOrphanLocks)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue