[#1147] node: Implement Lock\Delete requests for EC object

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-05-27 22:07:43 +03:00 committed by Evgenii Stratonikov
parent 88b8ddd902
commit 6130650bb6
20 changed files with 371 additions and 66 deletions

View file

@ -357,6 +357,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
_, err := internal.HeadObject(cmd.Context(), prmHead) _, err := internal.HeadObject(cmd.Context(), prmHead)
var errSplit *objectSDK.SplitInfoError var errSplit *objectSDK.SplitInfoError
var errEC *objectSDK.ECInfoError
switch { switch {
default: default:
@ -366,8 +367,6 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
return nil return nil
case errors.As(err, &errSplit): case errors.As(err, &errSplit):
common.PrintVerbose(cmd, "Split information received - object is virtual.") common.PrintVerbose(cmd, "Split information received - object is virtual.")
}
splitInfo := errSplit.SplitInfo() splitInfo := errSplit.SplitInfo()
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok { if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
@ -379,6 +378,11 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
} }
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj) return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
case errors.As(err, &errEC):
common.PrintVerbose(cmd, "Object is erasure-coded.")
return nil
}
return nil
} }
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) { func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) {

View file

@ -95,6 +95,7 @@ const (
DeleteFormingSplitInfo = "forming split info..." DeleteFormingSplitInfo = "forming split info..."
DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..." DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..."
DeleteMembersSuccessfullyCollected = "members successfully collected" DeleteMembersSuccessfullyCollected = "members successfully collected"
DeleteECObjectReceived = "erasure-coded object received, form tombstone"
GetRemoteCallFailed = "remote call failed" GetRemoteCallFailed = "remote call failed"
GetCanNotAssembleTheObject = "can not assemble the object" GetCanNotAssembleTheObject = "can not assemble the object"
GetTryingToAssembleTheObject = "trying to assemble the object..." GetTryingToAssembleTheObject = "trying to assemble the object..."
@ -213,6 +214,7 @@ const (
EngineFinishedRemovalOfLocallyredundantCopies = "finished removal of locally-redundant copies" EngineFinishedRemovalOfLocallyredundantCopies = "finished removal of locally-redundant copies"
EngineRemovingAnObjectWithoutFullLockingCheck = "removing an object without full locking check" EngineRemovingAnObjectWithoutFullLockingCheck = "removing an object without full locking check"
EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks" EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks"
EngineInterruptGettingLockers = "can't get object's lockers"
EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks" EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks"
EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only" EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only"
EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode" EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode"

View file

@ -76,6 +76,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
is bool is bool
} }
var splitInfo *objectSDK.SplitInfo var splitInfo *objectSDK.SplitInfo
var ecInfo *objectSDK.ECInfo
// Removal of a big object is done in multiple stages: // Removal of a big object is done in multiple stages:
// 1. Remove the parent object. If it is locked or already removed, return immediately. // 1. Remove the parent object. If it is locked or already removed, return immediately.
@ -91,13 +92,18 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
} }
var splitErr *objectSDK.SplitInfoError var splitErr *objectSDK.SplitInfoError
if !errors.As(err, &splitErr) { var ecErr *objectSDK.ECInfoError
if errors.As(err, &splitErr) {
splitInfo = splitErr.SplitInfo()
} else if errors.As(err, &ecErr) {
e.deleteChunks(ctx, sh, ecInfo, prm)
return false
} else {
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check object existence", err) e.reportShardError(sh, "could not check object existence", err)
} }
return false return false
} }
splitInfo = splitErr.SplitInfo()
} else if !resExists.Exists() { } else if !resExists.Exists() {
return false return false
} }
@ -171,3 +177,31 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo
return false return false
}) })
} }
func (e *StorageEngine) deleteChunks(
ctx context.Context, sh hashedShard, ecInfo *objectSDK.ECInfo, prm DeletePrm,
) {
var inhumePrm shard.InhumePrm
if prm.forceRemoval {
inhumePrm.ForceRemoval()
}
for _, chunk := range ecInfo.Chunks {
var addr oid.Address
addr.SetContainer(prm.addr.Container())
var objID oid.ID
err := objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(sh, "could not delete EC chunk", err)
}
addr.SetObject(objID)
inhumePrm.MarkAsGarbage(addr)
_, err = sh.Inhume(ctx, inhumePrm)
if err != nil {
e.log.Debug(logs.EngineCouldNotInhumeObjectInShard,
zap.Stringer("addr", addr),
zap.String("err", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
continue
}
}
}

View file

@ -17,6 +17,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/hrw" "git.frostfs.info/TrueCloudLab/hrw"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
@ -62,7 +63,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
ok, err := e.exists(context.Background(), addr) ok, _, err := e.exists(context.Background(), addr, oid.Address{})
if err != nil || ok { if err != nil || ok {
b.Fatalf("%t %v", ok, err) b.Fatalf("%t %v", ok, err)
} }

View file

@ -11,11 +11,13 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
) )
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) { func (e *StorageEngine) exists(ctx context.Context, addr oid.Address, parentAddr oid.Address) (bool, bool, error) {
var shPrm shard.ExistsPrm var shPrm shard.ExistsPrm
shPrm.SetAddress(addr) shPrm.SetAddress(addr)
shPrm.SetParentAddress(parentAddr)
alreadyRemoved := false alreadyRemoved := false
exists := false exists := false
locked := false
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Exists(ctx, shPrm) res, err := sh.Exists(ctx, shPrm)
@ -44,13 +46,16 @@ func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, err
if !exists { if !exists {
exists = res.Exists() exists = res.Exists()
} }
if !locked {
locked = res.Locked()
}
return false return false
}) })
if alreadyRemoved { if alreadyRemoved {
return false, new(apistatus.ObjectAlreadyRemoved) return false, false, new(apistatus.ObjectAlreadyRemoved)
} }
return exists, nil return exists, locked, nil
} }

View file

@ -152,7 +152,8 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
} }
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
if !errors.As(err, &siErr) { var ecErr *objectSDK.ECInfoError
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
e.reportShardError(sh, "could not check for presents in shard", err) e.reportShardError(sh, "could not check for presents in shard", err)
return return
} }
@ -220,6 +221,33 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
return locked, outErr return locked, outErr
} }
// GetLocked return lock id's if object is locked according to StorageEngine's state.
func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.GetLocked",
trace.WithAttributes(
attribute.String("address", addr.EncodeToString()),
))
defer span.End()
var locked []oid.ID
var outErr error
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
ld, err := h.Shard.GetLocked(ctx, addr)
if err != nil {
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err
}
locked = append(locked, ld...)
return false
})
if len(locked) > 0 {
return locked, nil
}
return locked, outErr
}
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) { func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredTombstones(ctx, addrs) sh.HandleExpiredTombstones(ctx, addrs)

View file

@ -83,7 +83,26 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
exRes, err := sh.Exists(ctx, existsPrm) exRes, err := sh.Exists(ctx, existsPrm)
if err != nil { if err != nil {
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
if !errors.As(err, &siErr) { var eiErr *objectSDK.ECInfoError
if errors.As(err, &eiErr) {
eclocked := []oid.ID{locked}
for _, chunk := range eiErr.ECInfo().Chunks {
var objID oid.ID
err = objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
return false
}
eclocked = append(eclocked, objID)
}
err = sh.Lock(ctx, idCnr, locker, eclocked)
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
return false
}
root = true
return false
} else if !errors.As(err, &siErr) {
if shard.IsErrObjectExpired(err) { if shard.IsErrObjectExpired(err) {
// object is already expired => // object is already expired =>
// do not lock it // do not lock it

View file

@ -80,11 +80,29 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// In #1146 this check was parallelized, however, it became // In #1146 this check was parallelized, however, it became
// much slower on fast machines for 4 shards. // much slower on fast machines for 4 shards.
_, err := e.exists(ctx, addr) var parent oid.Address
if prm.obj.ECHeader() != nil {
parent.SetObject(prm.obj.ECHeader().Parent())
parent.SetContainer(addr.Container())
}
existed, locked, err := e.exists(ctx, addr, parent)
if err != nil { if err != nil {
return err return err
} }
if !existed && locked {
lockers, err := e.GetLocked(ctx, parent)
if err != nil {
return err
}
for _, locker := range lockers {
err = e.lock(ctx, addr.Container(), locker, []oid.ID{addr.Object()})
if err != nil {
return err
}
}
}
var shRes putToShardRes var shRes putToShardRes
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
e.mtx.RLock() e.mtx.RLock()

View file

@ -267,8 +267,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
return deleteSingleResult{}, nil return deleteSingleResult{}, nil
} }
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) { var ecErr *objectSDK.ECInfoError
if errors.As(err, &siErr) || errors.As(err, &ecErr) {
// if object is virtual (parent) then do nothing, it will be deleted with last child // if object is virtual (parent) then do nothing, it will be deleted with last child
// if object is erasure-coded it will be deleted with the last chunk presented on the shard
return deleteSingleResult{}, nil return deleteSingleResult{}, nil
} }
@ -471,6 +473,46 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
name: rootBucketName(cnr, bucketName), name: rootBucketName(cnr, bucketName),
key: objKey, key: objKey,
}) })
if obj.ECHeader() != nil {
err := delECInfo(tx, cnr, objKey, obj.ECHeader())
if err != nil {
return err
}
}
return nil return nil
} }
func delECInfo(tx *bbolt.Tx, cnr cid.ID, objKey []byte, ecHead *objectSDK.ECHeader) error {
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
if len(val) > 0 {
if bytes.Equal(val, objKey) {
delUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, bucketName),
key: parentID,
})
} else {
val = bytes.Clone(val)
offset := 0
for offset < len(val) {
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
val = append(val[:offset], val[offset+objectKeySize:]...)
break
}
offset += objectKeySize
}
err := putUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
key: parentID,
val: val,
})
if err != nil {
return err
}
}
}
return nil
}

View file

@ -21,11 +21,13 @@ import (
// ExistsPrm groups the parameters of Exists operation. // ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct { type ExistsPrm struct {
addr oid.Address addr oid.Address
paddr oid.Address
} }
// ExistsRes groups the resulting values of Exists operation. // ExistsRes groups the resulting values of Exists operation.
type ExistsRes struct { type ExistsRes struct {
exists bool exists bool
locked bool
} }
var ErrLackSplitInfo = logicerr.New("no split info on parent object") var ErrLackSplitInfo = logicerr.New("no split info on parent object")
@ -35,11 +37,21 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr p.addr = addr
} }
// SetParent is an Exists option to set objects parent.
func (p *ExistsPrm) SetParent(addr oid.Address) {
p.paddr = addr
}
// Exists returns the fact that the object is in the metabase. // Exists returns the fact that the object is in the metabase.
func (p ExistsRes) Exists() bool { func (p ExistsRes) Exists() bool {
return p.exists return p.exists
} }
// Locked returns the fact that the object is locked.
func (p ExistsRes) Locked() bool {
return p.locked
}
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it // Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
// returns true if addr is in primary index or false if it is not. // returns true if addr is in primary index or false if it is not.
// //
@ -70,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.View(func(tx *bbolt.Tx) error { err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.exists, err = db.exists(tx, prm.addr, currEpoch) res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
return err return err
}) })
@ -78,15 +90,19 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
return res, metaerr.Wrap(err) return res, metaerr.Wrap(err)
} }
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists bool, err error) { func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
var locked bool
if !parent.Equals(oid.Address{}) {
locked = objectLocked(tx, parent.Container(), parent.Object())
}
// check graveyard and object expiration first // check graveyard and object expiration first
switch objectStatus(tx, addr, currEpoch) { switch objectStatus(tx, addr, currEpoch) {
case 1: case 1:
return false, logicerr.Wrap(new(apistatus.ObjectNotFound)) return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
case 2: case 2:
return false, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved)) return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
case 3: case 3:
return false, ErrObjectIsExpired return false, locked, ErrObjectIsExpired
} }
objKey := objectKey(addr.Object(), make([]byte, objectKeySize)) objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
@ -96,25 +112,25 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
// if graveyard is empty, then check if object exists in primary bucket // if graveyard is empty, then check if object exists in primary bucket
if inBucket(tx, primaryBucketName(cnr, key), objKey) { if inBucket(tx, primaryBucketName(cnr, key), objKey) {
return true, nil return true, locked, nil
} }
// if primary bucket is empty, then check if object exists in parent bucket // if primary bucket is empty, then check if object exists in parent bucket
if inBucket(tx, parentBucketName(cnr, key), objKey) { if inBucket(tx, parentBucketName(cnr, key), objKey) {
splitInfo, err := getSplitInfo(tx, cnr, objKey) splitInfo, err := getSplitInfo(tx, cnr, objKey)
if err != nil { if err != nil {
return false, err return false, locked, err
} }
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
} }
// if parent bucket is empty, then check if object exists in ec bucket // if parent bucket is empty, then check if object exists in ec bucket
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 { if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
return false, getECInfoError(tx, cnr, data) return false, locked, getECInfoError(tx, cnr, data)
} }
// if parent bucket is empty, then check if object exists in typed buckets // if parent bucket is empty, then check if object exists in typed buckets
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil
} }
// objectStatus returns: // objectStatus returns:

View file

@ -229,11 +229,17 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch) obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
targetKey := addressKey(prm.target[i], buf) targetKey := addressKey(prm.target[i], buf)
var ecErr *objectSDK.ECInfoError
if err == nil { if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res) err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
if err != nil { if err != nil {
return err return err
} }
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey)
if err != nil {
return err
}
} }
if prm.tomb != nil { if prm.tomb != nil {
@ -272,6 +278,43 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
return db.applyInhumeResToCounters(tx, res) return db.applyInhumeResToCounters(tx, res)
} }
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte,
) error {
for _, chunk := range ecInfo.Chunks {
chunkBuf := make([]byte, addressKeySize)
var chunkAddr oid.Address
chunkAddr.SetContainer(cnr)
var chunkID oid.ID
err := chunkID.ReadFromV2(chunk.ID)
if err != nil {
return err
}
chunkAddr.SetObject(chunkID)
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
if err != nil {
return err
}
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res)
if err != nil {
return err
}
chunkKey := addressKey(chunkAddr, chunkBuf)
if tomb != nil {
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
if err != nil {
return err
}
}
err = targetBucket.Put(chunkKey, value)
if err != nil {
return err
}
}
return nil
}
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil { if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
return err return err

View file

@ -175,6 +175,31 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
return false return false
} }
// return `LOCK` id's if specified object is locked in the specified container.
func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
var lockers []oid.ID
bucketLocked := tx.Bucket(bucketNameLocked)
if bucketLocked != nil {
key := make([]byte, cidSize)
idCnr.Encode(key)
bucketLockedContainer := bucketLocked.Bucket(key)
if bucketLockedContainer != nil {
binObjIDs, err := decodeList(bucketLockedContainer.Get(objectKey(idObj, key)))
if err != nil {
return nil, fmt.Errorf("decode list of object lockers: %w", err)
}
for _, binObjID := range binObjIDs {
var id oid.ID
if err = id.Decode(binObjID); err != nil {
return nil, err
}
lockers = append(lockers, id)
}
}
}
return lockers, nil
}
// releases all records about the objects locked by the locker. // releases all records about the objects locked by the locker.
// Returns slice of unlocked object ID's or an error. // Returns slice of unlocked object ID's or an error.
// //
@ -325,3 +350,36 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e
success = err == nil success = err == nil
return res, err return res, err
} }
// GetLocked return `LOCK` id's if provided object is locked by any `LOCK`. Not found
// object is considered as non-locked.
//
// Returns only non-logical errors related to underlying database.
func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, err error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("GetLocked", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.GetLocked",
trace.WithAttributes(
attribute.String("address", addr.EncodeToString()),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return res, ErrDegradedMode
}
err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
res, err = getLocked(tx, addr.Container(), addr.Object())
return nil
}))
success = err == nil
return res, err
}

View file

@ -115,7 +115,7 @@ func (db *DB) put(tx *bbolt.Tx,
isParent := si != nil isParent := si != nil
exists, err := db.exists(tx, objectCore.AddressOf(obj), currEpoch) exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
var splitInfoError *objectSDK.SplitInfoError var splitInfoError *objectSDK.SplitInfoError
if errors.As(err, &splitInfoError) { if errors.As(err, &splitInfoError) {

View file

@ -428,7 +428,7 @@ func (db *DB) selectObjectID(
addr.SetObject(id) addr.SetObject(id)
var splitInfoError *objectSDK.SplitInfoError var splitInfoError *objectSDK.SplitInfoError
ok, err := db.exists(tx, addr, currEpoch) ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch)
if (err == nil && ok) || errors.As(err, &splitInfoError) { if (err == nil && ok) || errors.As(err, &splitInfoError) {
raw := make([]byte, objectKeySize) raw := make([]byte, objectKeySize)
id.Encode(raw) id.Encode(raw)

View file

@ -14,11 +14,13 @@ import (
// ExistsPrm groups the parameters of Exists operation. // ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct { type ExistsPrm struct {
addr oid.Address addr oid.Address
paddr oid.Address
} }
// ExistsRes groups the resulting values of Exists operation. // ExistsRes groups the resulting values of Exists operation.
type ExistsRes struct { type ExistsRes struct {
ex bool ex bool
lc bool
} }
// SetAddress is an Exists option to set object checked for existence. // SetAddress is an Exists option to set object checked for existence.
@ -26,11 +28,21 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr p.addr = addr
} }
// SetParentAddress is an Exists option to set parent object checked for existence.
func (p *ExistsPrm) SetParentAddress(addr oid.Address) {
p.paddr = addr
}
// Exists returns the fact that the object is in the shard. // Exists returns the fact that the object is in the shard.
func (p ExistsRes) Exists() bool { func (p ExistsRes) Exists() bool {
return p.ex return p.ex
} }
// Locked returns the fact that the object is locked.
func (p ExistsRes) Locked() bool {
return p.lc
}
// Exists checks if object is presented in shard. // Exists checks if object is presented in shard.
// //
// Returns any error encountered that does not allow to // Returns any error encountered that does not allow to
@ -48,6 +60,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
defer span.End() defer span.End()
var exists bool var exists bool
var locked bool
var err error var err error
s.m.RLock() s.m.RLock()
@ -65,13 +78,16 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
} else { } else {
var existsPrm meta.ExistsPrm var existsPrm meta.ExistsPrm
existsPrm.SetAddress(prm.addr) existsPrm.SetAddress(prm.addr)
existsPrm.SetParent(prm.paddr)
var res meta.ExistsRes var res meta.ExistsRes
res, err = s.metaBase.Exists(ctx, existsPrm) res, err = s.metaBase.Exists(ctx, existsPrm)
exists = res.Exists() exists = res.Exists()
locked = res.Locked()
} }
return ExistsRes{ return ExistsRes{
ex: exists, ex: exists,
lc: locked,
}, err }, err
} }

View file

@ -71,3 +71,20 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
return res.Locked(), nil return res.Locked(), nil
} }
// GetLocked return lock id's of the provided object. Not found object is
// considered as not locked. Requires healthy metabase, returns ErrDegradedMode otherwise.
func (s *Shard) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetLocked",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", addr.EncodeToString()),
))
defer span.End()
m := s.GetMode()
if m.NoMetabase() {
return nil, ErrDegradedMode
}
return s.metaBase.GetLocked(ctx, addr)
}

View file

@ -2,6 +2,7 @@ package deletesvc
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"strconv" "strconv"
@ -64,9 +65,32 @@ func (exec *execCtx) newAddress(id oid.ID) oid.Address {
return a return a
} }
func (exec *execCtx) formSplitInfo(ctx context.Context) error { func (exec *execCtx) formExtendedInfo(ctx context.Context) error {
var err error _, err := exec.svc.header.head(ctx, exec)
exec.splitInfo, err = exec.svc.header.splitInfo(ctx, exec)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
switch {
case err == nil:
return nil
case errors.As(err, &errSplitInfo):
exec.splitInfo = errSplitInfo.SplitInfo()
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
if err := exec.collectMembers(ctx); err != nil {
return err
}
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
return nil
case errors.As(err, &errECInfo):
exec.log.Debug(logs.DeleteECObjectReceived)
return nil
}
if err != nil && !apiclient.IsErrObjectAlreadyRemoved(err) { if err != nil && !apiclient.IsErrObjectAlreadyRemoved(err) {
// IsErrObjectAlreadyRemoved check is required because splitInfo // IsErrObjectAlreadyRemoved check is required because splitInfo
// implicitly performs Head request that may return ObjectAlreadyRemoved // implicitly performs Head request that may return ObjectAlreadyRemoved

View file

@ -35,19 +35,9 @@ func (exec *execCtx) formTombstone(ctx context.Context) error {
exec.log.Debug(logs.DeleteFormingSplitInfo) exec.log.Debug(logs.DeleteFormingSplitInfo)
if err := exec.formSplitInfo(ctx); err != nil { if err := exec.formExtendedInfo(ctx); err != nil {
return fmt.Errorf("form split info: %w", err) return fmt.Errorf("form extended info: %w", err)
} }
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
if err := exec.collectMembers(ctx); err != nil {
return err
}
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
return exec.initTombstoneObject() return exec.initTombstoneObject()
} }

View file

@ -41,7 +41,7 @@ type cfg struct {
header interface { header interface {
// must return (nil, nil) for PHY objects // must return (nil, nil) for PHY objects
splitInfo(context.Context, *execCtx) (*objectSDK.SplitInfo, error) head(context.Context, *execCtx) (*objectSDK.Object, error)
children(context.Context, *execCtx) ([]oid.ID, error) children(context.Context, *execCtx) ([]oid.ID, error)

View file

@ -2,7 +2,6 @@ package deletesvc
import ( import (
"context" "context"
"errors"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
@ -44,19 +43,8 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi
return wr.Object(), nil return wr.Object(), nil
} }
func (w *headSvcWrapper) splitInfo(ctx context.Context, exec *execCtx) (*objectSDK.SplitInfo, error) { func (w *headSvcWrapper) head(ctx context.Context, exec *execCtx) (*objectSDK.Object, error) {
_, err := w.headAddress(ctx, exec, exec.address()) return w.headAddress(ctx, exec, exec.address())
var errSplitInfo *objectSDK.SplitInfoError
switch {
case err == nil:
return nil, nil
case errors.As(err, &errSplitInfo):
return errSplitInfo.SplitInfo(), nil
default:
return nil, err
}
} }
func (w *headSvcWrapper) children(ctx context.Context, exec *execCtx) ([]oid.ID, error) { func (w *headSvcWrapper) children(ctx context.Context, exec *execCtx) ([]oid.ID, error) {