node: Implement Lock\Delete requests for EC object #1147

Merged
fyrchik merged 10 commits from acid-ant/frostfs-node:feature/ec-del-lock into master 2024-05-30 08:13:07 +00:00
30 changed files with 405 additions and 102 deletions

View file

@ -44,9 +44,12 @@ PROTOGEN_FROSTFS_DIR ?= $(PROTOBUF_DIR)/protogen-$(PROTOGEN_FROSTFS_VERSION)
STATICCHECK_DIR ?= $(abspath $(BIN))/staticcheck
STATICCHECK_VERSION_DIR ?= $(STATICCHECK_DIR)/$(STATICCHECK_VERSION)
SOURCES = $(shell find . -type f -name "*.go" -print)
GOPLS_VERSION ?= v0.15.1
GOPLS_DIR ?= $(abspath $(BIN))/gopls
GOPLS_VERSION_DIR ?= $(GOPLS_DIR)/$(GOPLS_VERSION)
GOPLS_TEMP_FILE := $(shell mktemp)
FROSTFS_CONTRACTS_PATH=$(abspath ./../frostfs-contract)
LOCODE_DB_PATH=$(abspath ./.cache/locode_db)
@ -220,9 +223,12 @@ gopls-run:
@if [ ! -d "$(GOPLS_VERSION_DIR)" ]; then \
make gopls-install; \
fi
@if [[ $$(find . -type f -name "*.go" -print | xargs $(GOPLS_VERSION_DIR)/gopls check | tee /dev/tty | wc -l) -ne 0 ]]; then \
$(GOPLS_VERSION_DIR)/gopls check $(SOURCES) 2>&1 >$(GOPLS_TEMP_FILE)
fyrchik marked this conversation as resolved Outdated

Was there any problem with the previous implementation (pipe instead of temp file)?

Was there any problem with the previous implementation (pipe instead of temp file)?

We are unable to use tee /dev/tty for security reason. If replace it for ... check 2>&1 | tee | wc -l there are no output for error.

We are unable to use `tee /dev/tty` for security reason. If replace it for `... check 2>&1 | tee | wc -l` there are no output for error.
@if [[ $$(wc -l < $(GOPLS_TEMP_FILE)) -ne 0 ]]; then \
cat $(GOPLS_TEMP_FILE); \
exit 1; \
fi
rm $(GOPLS_TEMP_FILE)
# Run linters in Docker
docker/lint:

View file

@ -38,9 +38,7 @@ const (
groupTarget = "group"
)
var (
errUnknownTargetType = errors.New("unknown target type")
)
var errUnknownTargetType = errors.New("unknown target type")
var addCmd = &cobra.Command{
Use: "add",

View file

@ -152,7 +152,11 @@ func printECInfoErr(cmd *cobra.Command, err error) bool {
ok := errors.As(err, &errECInfo)
if ok {
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
toProto, _ := cmd.Flags().GetBool("proto")
if !(toJSON || toProto) {
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
}
printECInfo(cmd, errECInfo.ECInfo())
}

View file

@ -357,6 +357,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
_, err := internal.HeadObject(cmd.Context(), prmHead)
var errSplit *objectSDK.SplitInfoError
var errEC *objectSDK.ECInfoError
switch {
fyrchik marked this conversation as resolved Outdated

Why have you replaced switch with else if chain?

Why have you replaced switch with `else if` chain?

Because previous implementation was only for Split Info. Thought it should be more readable. Reverted switch back.

Because previous implementation was only for `Split Info`. Thought it should be more readable. Reverted switch back.

IMO it is exactly the opposite -- else if is ok once, switch is less verbose for multiple branches.

IMO it is exactly the opposite -- `else if` is ok once, switch is less verbose for multiple branches.
default:
@ -366,8 +367,6 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
return nil
case errors.As(err, &errSplit):
common.PrintVerbose(cmd, "Split information received - object is virtual.")
}
splitInfo := errSplit.SplitInfo()
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
@ -379,6 +378,11 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
}
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
case errors.As(err, &errEC):
common.PrintVerbose(cmd, "Object is erasure-coded.")
return nil
}
return nil
}
fyrchik marked this conversation as resolved Outdated

So we add each chunk to the tombstone/lock? It is a problem, because chunks may be missing (with split it cannot be the case, it means DL, with EC it is ok).

So we add each chunk to the tombstone/lock? It is a problem, because chunks may be missing (with split it cannot be the case, it means DL, with EC it is ok).

Oh, thanks, that was from previous implementation, removed.

Oh, thanks, that was from previous implementation, removed.

Does the new implementation still pass sanity tests?

Does the new implementation still pass sanity tests?

Execute each time when changed sensitive part of the code.

Execute each time when changed sensitive part of the code.
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) {

View file

@ -95,6 +95,7 @@ const (
DeleteFormingSplitInfo = "forming split info..."
DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..."
DeleteMembersSuccessfullyCollected = "members successfully collected"
DeleteECObjectReceived = "erasure-coded object received, form tombstone"
GetRemoteCallFailed = "remote call failed"
GetCanNotAssembleTheObject = "can not assemble the object"
GetTryingToAssembleTheObject = "trying to assemble the object..."
@ -213,6 +214,7 @@ const (
EngineFinishedRemovalOfLocallyredundantCopies = "finished removal of locally-redundant copies"
EngineRemovingAnObjectWithoutFullLockingCheck = "removing an object without full locking check"
EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks"
EngineInterruptGettingLockers = "can't get object's lockers"
EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks"
EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only"
EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode"

View file

@ -76,13 +76,14 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
is bool
}
var splitInfo *objectSDK.SplitInfo
var ecInfo *objectSDK.ECInfo
// Removal of a big object is done in multiple stages:
// 1. Remove the parent object. If it is locked or already removed, return immediately.
// 2. Otherwise, search for all objects with a particular SplitID and delete them too.
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(prm.addr)
existsPrm.Address = prm.addr
resExists, err := sh.Exists(ctx, existsPrm)
if err != nil {
@ -91,13 +92,18 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
}
var splitErr *objectSDK.SplitInfoError
if !errors.As(err, &splitErr) {
var ecErr *objectSDK.ECInfoError
if errors.As(err, &splitErr) {
splitInfo = splitErr.SplitInfo()
} else if errors.As(err, &ecErr) {
e.deleteChunks(ctx, sh, ecInfo, prm)
return false
} else {
if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check object existence", err)
}
return false
}
splitInfo = splitErr.SplitInfo()
} else if !resExists.Exists() {
return false
}
@ -171,3 +177,31 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo
return false
})
}
func (e *StorageEngine) deleteChunks(
ctx context.Context, sh hashedShard, ecInfo *objectSDK.ECInfo, prm DeletePrm,
) {
var inhumePrm shard.InhumePrm
if prm.forceRemoval {
inhumePrm.ForceRemoval()
}
for _, chunk := range ecInfo.Chunks {
var addr oid.Address
addr.SetContainer(prm.addr.Container())
var objID oid.ID
err := objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(sh, "could not delete EC chunk", err)
}
addr.SetObject(objID)
inhumePrm.MarkAsGarbage(addr)
_, err = sh.Inhume(ctx, inhumePrm)
if err != nil {
e.log.Debug(logs.EngineCouldNotInhumeObjectInShard,
zap.Stringer("addr", addr),
zap.String("err", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
continue
fyrchik marked this conversation as resolved Outdated

There is an error which we have ignored. What happens with this yet-to-be-removed chunk?

There is an error which we have ignored. What happens with this yet-to-be-removed chunk?

It will be removed by remover at next iteration. The behavior is the same as for complex object, see deleteChildren().

It will be removed by `remover` at next iteration. The behavior is the same as for complex object, see [deleteChildren()](https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/3627b44e92395d2be7eeda9790513021b9f345ca/pkg/local_object_storage/engine/delete.go#L136).
}
}
}

View file

@ -17,6 +17,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/hrw"
"github.com/panjf2000/ants/v2"
@ -62,7 +63,10 @@ func benchmarkExists(b *testing.B, shardNum int) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
ok, err := e.exists(context.Background(), addr)
var shPrm shard.ExistsPrm

The interface is confusing, we have 2 identical types with different meaning.
What about accepting shard.ExistsPrm?

The interface is confusing, we have 2 identical types with different meaning. What about accepting `shard.ExistsPrm`?

In this case we need to make fields of ExistsPrm public, are you ok?

In this case we need to make fields of `ExistsPrm` public, are you ok?

Implemented in a separate commit.

Implemented in a separate commit.
shPrm.Address = addr
shPrm.ParentAddress = oid.Address{}
ok, _, err := e.exists(context.Background(), shPrm)
if err != nil || ok {
b.Fatalf("%t %v", ok, err)
}

View file

@ -8,16 +8,16 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) {
var shPrm shard.ExistsPrm
shPrm.SetAddress(addr)
// exists return in the first value true if object exists.
// Second return value marks is parent object locked.
func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool, bool, error) {
alreadyRemoved := false
exists := false
locked := false
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
e.iterateOverSortedShards(shPrm.Address, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Exists(ctx, shPrm)
if err != nil {
if client.IsErrObjectAlreadyRemoved(err) {
@ -44,13 +44,16 @@ func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, err
if !exists {
exists = res.Exists()
}
if !locked {
locked = res.Locked()
}
return false
})
if alreadyRemoved {
return false, new(apistatus.ObjectAlreadyRemoved)
return false, false, new(apistatus.ObjectAlreadyRemoved)
}
return exists, nil
return exists, locked, nil
}

View file

@ -142,7 +142,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
}()
if checkExists {
existPrm.SetAddress(addr)
existPrm.Address = addr
exRes, err := sh.Exists(ctx, existPrm)
if err != nil {
if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) {
@ -152,7 +152,8 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
}
var siErr *objectSDK.SplitInfoError
if !errors.As(err, &siErr) {
var ecErr *objectSDK.ECInfoError
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
e.reportShardError(sh, "could not check for presents in shard", err)
return
}
@ -220,6 +221,33 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
return locked, outErr
}
// GetLocked return lock id's if object is locked according to StorageEngine's state.
func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.GetLocked",
trace.WithAttributes(
attribute.String("address", addr.EncodeToString()),
))
defer span.End()
var locked []oid.ID
var outErr error
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
ld, err := h.Shard.GetLocked(ctx, addr)
if err != nil {
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr),
fyrchik marked this conversation as resolved Outdated

It is a log message, should be a const. Also, why didn't the linter fail? cc @achuprov

It is a log message, should be a const. Also, why didn't the linter fail? cc @achuprov

Thanks, updated.

Thanks, updated.
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err
}
locked = append(locked, ld...)
return false
})
if len(locked) > 0 {
return locked, nil
}
return locked, outErr
}
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredTombstones(ctx, addrs)

View file

@ -78,12 +78,31 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
if checkExists {
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(addrLocked)
existsPrm.Address = addrLocked
exRes, err := sh.Exists(ctx, existsPrm)
if err != nil {
var siErr *objectSDK.SplitInfoError
if !errors.As(err, &siErr) {
var eiErr *objectSDK.ECInfoError
if errors.As(err, &eiErr) {
eclocked := []oid.ID{locked}
for _, chunk := range eiErr.ECInfo().Chunks {
var objID oid.ID
err = objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
return false
}
eclocked = append(eclocked, objID)
}
err = sh.Lock(ctx, idCnr, locker, eclocked)
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
return false
}
root = true
return false
} else if !errors.As(err, &siErr) {
if shard.IsErrObjectExpired(err) {
// object is already expired =>
// do not lock it

View file

@ -80,11 +80,32 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// In #1146 this check was parallelized, however, it became
// much slower on fast machines for 4 shards.
_, err := e.exists(ctx, addr)
var parent oid.Address
if prm.obj.ECHeader() != nil {
parent.SetObject(prm.obj.ECHeader().Parent())
parent.SetContainer(addr.Container())
}
var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = parent
existed, locked, err := e.exists(ctx, shPrm)
if err != nil {
return err
}
if !existed && locked {
lockers, err := e.GetLocked(ctx, parent)
if err != nil {
return err

Do we lock an object before we have put it? It seems like a problem, because this lock record can persist indefinitely.

Do we lock an object before we have put it? It seems like a problem, because this lock record can persist indefinitely.

Didn't catch the problem. Here we are persisting lock for a chunk before put, because we need to avoid gc removing it. This is reconstruction scenario - when we need to put chunk on the node. If there is no lock for a chunk, gc will inhume it.

Didn't catch the problem. Here we are persisting lock for a chunk before put, because we need to avoid `gc` removing it. This is reconstruction scenario - when we need to put chunk on the node. If there is no lock for a chunk, `gc` will inhume it.

The problem is atomicity -- lock -> CRASH -> put and we now have some garbage about locks which will (?) be removed eventually.
We could do it atomically in put instead, this would also ensure we put info on the same shard.

The problem is atomicity -- `lock -> CRASH -> put` and we now have some garbage about locks which will (?) be removed eventually. We could do it atomically in `put` instead, this would also ensure we put info on the same shard.

As a result of discussion, we need to move gc on a storage engine level. Created #1151 for tracking.

As a result of discussion, we need to move `gc` on a storage engine level. Created https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/1151 for tracking.
}
for _, locker := range lockers {
err = e.lock(ctx, addr.Container(), locker, []oid.ID{addr.Object()})
if err != nil {
return err
}
}
}
var shRes putToShardRes
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
e.mtx.RLock()
@ -120,7 +141,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
defer close(exitCh)
var existPrm shard.ExistsPrm
existPrm.SetAddress(addr)
existPrm.Address = addr
exists, err := sh.Exists(ctx, existPrm)
if err != nil {

View file

@ -115,7 +115,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address
found := false
for i := range shards {
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(addr)
existsPrm.Address = addr
res, err := shards[i].Exists(ctx, existsPrm)
if err != nil {

View file

@ -267,8 +267,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
return deleteSingleResult{}, nil
}
var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) {
var ecErr *objectSDK.ECInfoError
if errors.As(err, &siErr) || errors.As(err, &ecErr) {
// if object is virtual (parent) then do nothing, it will be deleted with last child
// if object is erasure-coded it will be deleted with the last chunk presented on the shard
return deleteSingleResult{}, nil
}
@ -471,6 +473,46 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
name: rootBucketName(cnr, bucketName),
key: objKey,
})
if obj.ECHeader() != nil {
err := delECInfo(tx, cnr, objKey, obj.ECHeader())
if err != nil {
return err
}
}
return nil
}
func delECInfo(tx *bbolt.Tx, cnr cid.ID, objKey []byte, ecHead *objectSDK.ECHeader) error {
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
if len(val) > 0 {
if bytes.Equal(val, objKey) {
delUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, bucketName),
key: parentID,
})
} else {
val = bytes.Clone(val)
offset := 0
for offset < len(val) {
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
fyrchik marked this conversation as resolved Outdated

val is received from getFromBucket. Is it taken from bbolt or freshly allocated? Bbolt prohibits changing values in some cases.

`val` is received from `getFromBucket`. Is it taken from bbolt or freshly allocated? Bbolt prohibits changing values in some cases.

According to doc, val should be valid for the life of the transaction. Let's clone it.

According to doc, `val` should be valid for the life of the transaction. Let's clone it.

This line is more important // The returned memory is owned by bbolt and must never be modified; writing to this memory might corrupt the database.

This line is more important `// The returned memory is owned by bbolt and must never be modified; writing to this memory might corrupt the database.`

This line is from the newest version. Looks like we need to update bbolt.

This line is from the newest version. Looks like we need to update `bbolt`.
val = append(val[:offset], val[offset+objectKeySize:]...)
break
}
offset += objectKeySize
}
err := putUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
key: parentID,
val: val,
})
if err != nil {
return err
}
}
}
return nil
}

View file

@ -21,11 +21,13 @@ import (
// ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct {
addr oid.Address
paddr oid.Address
}
// ExistsRes groups the resulting values of Exists operation.
type ExistsRes struct {
exists bool
locked bool
}
var ErrLackSplitInfo = logicerr.New("no split info on parent object")
@ -35,11 +37,21 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// SetParent is an Exists option to set objects parent.
func (p *ExistsPrm) SetParent(addr oid.Address) {
p.paddr = addr
}
// Exists returns the fact that the object is in the metabase.
func (p ExistsRes) Exists() bool {
return p.exists
}
// Locked returns the fact that the object is locked.
func (p ExistsRes) Locked() bool {
return p.locked
}
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
// returns true if addr is in primary index or false if it is not.
//
@ -70,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.exists, err = db.exists(tx, prm.addr, currEpoch)
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
return err
})
@ -78,15 +90,19 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
return res, metaerr.Wrap(err)
}
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists bool, err error) {
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
var locked bool
if !parent.Equals(oid.Address{}) {
locked = objectLocked(tx, parent.Container(), parent.Object())
}
// check graveyard and object expiration first
switch objectStatus(tx, addr, currEpoch) {
case 1:
return false, logicerr.Wrap(new(apistatus.ObjectNotFound))
return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
case 2:
return false, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
case 3:
return false, ErrObjectIsExpired
return false, locked, ErrObjectIsExpired
}
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
@ -96,25 +112,25 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
// if graveyard is empty, then check if object exists in primary bucket
if inBucket(tx, primaryBucketName(cnr, key), objKey) {
return true, nil
return true, locked, nil
}
// if primary bucket is empty, then check if object exists in parent bucket
if inBucket(tx, parentBucketName(cnr, key), objKey) {
splitInfo, err := getSplitInfo(tx, cnr, objKey)
if err != nil {
return false, err
return false, locked, err
}
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
}
// if parent bucket is empty, then check if object exists in ec bucket
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
return false, getECInfoError(tx, cnr, data)
return false, locked, getECInfoError(tx, cnr, data)
}
// if parent bucket is empty, then check if object exists in typed buckets
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil
}
// objectStatus returns:

View file

@ -229,11 +229,17 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
targetKey := addressKey(prm.target[i], buf)
var ecErr *objectSDK.ECInfoError
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
if err != nil {
return err
}
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey)
if err != nil {
return err
}
}
if prm.tomb != nil {
@ -272,6 +278,43 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
return db.applyInhumeResToCounters(tx, res)
}
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte,
) error {
for _, chunk := range ecInfo.Chunks {
chunkBuf := make([]byte, addressKeySize)
var chunkAddr oid.Address
chunkAddr.SetContainer(cnr)
var chunkID oid.ID
err := chunkID.ReadFromV2(chunk.ID)
if err != nil {
return err
}
chunkAddr.SetObject(chunkID)
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
if err != nil {
return err
}
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res)
if err != nil {
return err
}
chunkKey := addressKey(chunkAddr, chunkBuf)
if tomb != nil {
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
if err != nil {
return err
}
}
err = targetBucket.Put(chunkKey, value)
if err != nil {
return err
}
}
return nil
}
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
return err

View file

@ -175,6 +175,31 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
return false
}
// return `LOCK` id's if specified object is locked in the specified container.
func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
var lockers []oid.ID
bucketLocked := tx.Bucket(bucketNameLocked)
if bucketLocked != nil {
key := make([]byte, cidSize)
idCnr.Encode(key)
bucketLockedContainer := bucketLocked.Bucket(key)
if bucketLockedContainer != nil {
binObjIDs, err := decodeList(bucketLockedContainer.Get(objectKey(idObj, key)))
if err != nil {
return nil, fmt.Errorf("decode list of object lockers: %w", err)
}
for _, binObjID := range binObjIDs {
var id oid.ID
if err = id.Decode(binObjID); err != nil {
return nil, err
}
lockers = append(lockers, id)
}
}
}
return lockers, nil
}
// releases all records about the objects locked by the locker.
// Returns slice of unlocked object ID's or an error.
//
@ -325,3 +350,36 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e
success = err == nil
return res, err
}
// GetLocked return `LOCK` id's if provided object is locked by any `LOCK`. Not found
// object is considered as non-locked.
//
// Returns only non-logical errors related to underlying database.
func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, err error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("GetLocked", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.GetLocked",
trace.WithAttributes(
attribute.String("address", addr.EncodeToString()),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return res, ErrDegradedMode
}
err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
res, err = getLocked(tx, addr.Container(), addr.Object())
return nil
}))
success = err == nil
return res, err
}

View file

@ -115,7 +115,7 @@ func (db *DB) put(tx *bbolt.Tx,
isParent := si != nil
exists, err := db.exists(tx, objectCore.AddressOf(obj), currEpoch)
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
var splitInfoError *objectSDK.SplitInfoError
if errors.As(err, &splitInfoError) {

View file

@ -428,7 +428,7 @@ func (db *DB) selectObjectID(
addr.SetObject(id)
var splitInfoError *objectSDK.SplitInfoError
ok, err := db.exists(tx, addr, currEpoch)
ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch)
if (err == nil && ok) || errors.As(err, &splitInfoError) {
raw := make([]byte, objectKeySize)
id.Encode(raw)

View file

@ -13,17 +13,16 @@ import (
// ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct {
addr oid.Address
// Exists option to set object checked for existence.
Address oid.Address
// Exists option to set parent object checked for existence.
ParentAddress oid.Address
}
// ExistsRes groups the resulting values of Exists operation.
type ExistsRes struct {
ex bool
}
// SetAddress is an Exists option to set object checked for existence.
func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr
lc bool
}
// Exists returns the fact that the object is in the shard.
@ -31,6 +30,11 @@ func (p ExistsRes) Exists() bool {
return p.ex
}
// Locked returns the fact that the object is locked.
func (p ExistsRes) Locked() bool {
return p.lc
}
// Exists checks if object is presented in shard.
//
// Returns any error encountered that does not allow to
@ -43,11 +47,12 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", prm.addr.EncodeToString()),
attribute.String("address", prm.Address.EncodeToString()),
))
defer span.End()
var exists bool
var locked bool
var err error
s.m.RLock()
@ -57,21 +62,24 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
return ExistsRes{}, ErrShardDisabled
} else if s.info.Mode.NoMetabase() {
var p common.ExistsPrm
p.Address = prm.addr
p.Address = prm.Address
var res common.ExistsRes
res, err = s.blobStor.Exists(ctx, p)
exists = res.Exists
} else {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(prm.addr)
existsPrm.SetAddress(prm.Address)
existsPrm.SetParent(prm.ParentAddress)
var res meta.ExistsRes
res, err = s.metaBase.Exists(ctx, existsPrm)
exists = res.Exists()
locked = res.Locked()
}
return ExistsRes{
ex: exists,
lc: locked,
}, err
}

View file

@ -525,7 +525,9 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
}
log.Debug(logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp)))
if len(tssExp) > 0 {
fyrchik marked this conversation as resolved Outdated

To be clear: is this an optimization or a functional change?

To be clear: is this an optimization or a functional change?

It is an optimization - here we do nothing but getting lock on metabase, because call db.boltDB.Update(...).

It is an optimization - here we do nothing but getting lock on metabase, because call `db.boltDB.Update(...)`.
s.expiredTombstonesCallback(ctx, tssExp)
}
iterPrm.SetOffset(tss[tssLen-1].Address())
tss = tss[:0]

View file

@ -71,3 +71,20 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
return res.Locked(), nil
}
// GetLocked return lock id's of the provided object. Not found object is
// considered as not locked. Requires healthy metabase, returns ErrDegradedMode otherwise.
func (s *Shard) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetLocked",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", addr.EncodeToString()),
))
defer span.End()
m := s.GetMode()
if m.NoMetabase() {
return nil, ErrDegradedMode
}
return s.metaBase.GetLocked(ctx, addr)
}

View file

@ -72,7 +72,7 @@ func TestShardReload(t *testing.T) {
checkHasObjects := func(t *testing.T, exists bool) {
for i := range objects {
var prm ExistsPrm
prm.SetAddress(objects[i].addr)
prm.Address = objects[i].addr
res, err := sh.Exists(context.Background(), prm)
require.NoError(t, err)

View file

@ -24,9 +24,7 @@ import (
"go.uber.org/zap"
)
var (
errEmptyBodySignature = errors.New("malformed request: empty body signature")
)
var errEmptyBodySignature = errors.New("malformed request: empty body signature")
type cfg struct {
log *logger.Logger

View file

@ -2,6 +2,7 @@ package deletesvc
import (
"context"
"errors"
"fmt"
"strconv"
@ -40,7 +41,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
)}
}
func (exec execCtx) isLocal() bool {
func (exec *execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
@ -64,10 +65,33 @@ func (exec *execCtx) newAddress(id oid.ID) oid.Address {
return a
}
func (exec *execCtx) formSplitInfo(ctx context.Context) error {
var err error
exec.splitInfo, err = exec.svc.header.splitInfo(ctx, exec)
if err != nil && !apiclient.IsErrObjectAlreadyRemoved(err) {
func (exec *execCtx) formExtendedInfo(ctx context.Context) error {
_, err := exec.svc.header.head(ctx, exec)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
switch {
case err == nil:
return nil
case errors.As(err, &errSplitInfo):
exec.splitInfo = errSplitInfo.SplitInfo()
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
if err := exec.collectMembers(ctx); err != nil {
return err
}
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
return nil
case errors.As(err, &errECInfo):
exec.log.Debug(logs.DeleteECObjectReceived)
return nil
}
if !apiclient.IsErrObjectAlreadyRemoved(err) {
// IsErrObjectAlreadyRemoved check is required because splitInfo
// implicitly performs Head request that may return ObjectAlreadyRemoved
// status that is not specified for Delete.

View file

@ -35,19 +35,9 @@ func (exec *execCtx) formTombstone(ctx context.Context) error {
exec.log.Debug(logs.DeleteFormingSplitInfo)
if err := exec.formSplitInfo(ctx); err != nil {
return fmt.Errorf("form split info: %w", err)
if err := exec.formExtendedInfo(ctx); err != nil {
return fmt.Errorf("form extended info: %w", err)
}
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
if err := exec.collectMembers(ctx); err != nil {
return err
}
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
return exec.initTombstoneObject()
}

View file

@ -41,7 +41,7 @@ type cfg struct {
header interface {
// must return (nil, nil) for PHY objects
splitInfo(context.Context, *execCtx) (*objectSDK.SplitInfo, error)
head(context.Context, *execCtx) (*objectSDK.Object, error)
children(context.Context, *execCtx) ([]oid.ID, error)

View file

@ -2,7 +2,6 @@ package deletesvc
import (
"context"
"errors"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
@ -44,19 +43,8 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi
return wr.Object(), nil
}
func (w *headSvcWrapper) splitInfo(ctx context.Context, exec *execCtx) (*objectSDK.SplitInfo, error) {
_, err := w.headAddress(ctx, exec, exec.address())
var errSplitInfo *objectSDK.SplitInfoError
switch {
case err == nil:
return nil, nil
case errors.As(err, &errSplitInfo):
return errSplitInfo.SplitInfo(), nil
default:
return nil, err
}
func (w *headSvcWrapper) head(ctx context.Context, exec *execCtx) (*objectSDK.Object, error) {
return w.headAddress(ctx, exec, exec.address())
}
func (w *headSvcWrapper) children(ctx context.Context, exec *execCtx) ([]oid.ID, error) {

View file

@ -41,12 +41,10 @@ func (r *request) assemble(ctx context.Context) {
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
r.log.Debug(logs.GetAssemblingSplittedObject,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
@ -55,7 +53,6 @@ func (r *request) assemble(ctx context.Context) {
if err != nil {
r.log.Warn(logs.GetFailedToAssembleSplittedObject,
zap.Error(err),
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)

View file

@ -39,12 +39,10 @@ func (r *request) assembleEC(ctx context.Context) {
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
r.log.Debug(logs.GetAssemblingECObject,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
@ -53,7 +51,6 @@ func (r *request) assembleEC(ctx context.Context) {
if err != nil {
r.log.Warn(logs.GetFailedToAssembleECObject,
zap.Error(err),
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)

View file

@ -28,7 +28,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
)}
}
func (exec execCtx) isLocal() bool {
func (exec *execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}