node: Implement Lock\Delete requests for EC object #1147

Merged
fyrchik merged 10 commits from acid-ant/frostfs-node:feature/ec-del-lock into master 2024-05-30 08:13:07 +00:00
30 changed files with 405 additions and 102 deletions

View file

@ -44,9 +44,12 @@ PROTOGEN_FROSTFS_DIR ?= $(PROTOBUF_DIR)/protogen-$(PROTOGEN_FROSTFS_VERSION)
STATICCHECK_DIR ?= $(abspath $(BIN))/staticcheck STATICCHECK_DIR ?= $(abspath $(BIN))/staticcheck
STATICCHECK_VERSION_DIR ?= $(STATICCHECK_DIR)/$(STATICCHECK_VERSION) STATICCHECK_VERSION_DIR ?= $(STATICCHECK_DIR)/$(STATICCHECK_VERSION)
SOURCES = $(shell find . -type f -name "*.go" -print)
GOPLS_VERSION ?= v0.15.1 GOPLS_VERSION ?= v0.15.1
GOPLS_DIR ?= $(abspath $(BIN))/gopls GOPLS_DIR ?= $(abspath $(BIN))/gopls
GOPLS_VERSION_DIR ?= $(GOPLS_DIR)/$(GOPLS_VERSION) GOPLS_VERSION_DIR ?= $(GOPLS_DIR)/$(GOPLS_VERSION)
GOPLS_TEMP_FILE := $(shell mktemp)
FROSTFS_CONTRACTS_PATH=$(abspath ./../frostfs-contract) FROSTFS_CONTRACTS_PATH=$(abspath ./../frostfs-contract)
LOCODE_DB_PATH=$(abspath ./.cache/locode_db) LOCODE_DB_PATH=$(abspath ./.cache/locode_db)
@ -220,9 +223,12 @@ gopls-run:
@if [ ! -d "$(GOPLS_VERSION_DIR)" ]; then \ @if [ ! -d "$(GOPLS_VERSION_DIR)" ]; then \
make gopls-install; \ make gopls-install; \
fi fi
@if [[ $$(find . -type f -name "*.go" -print | xargs $(GOPLS_VERSION_DIR)/gopls check | tee /dev/tty | wc -l) -ne 0 ]]; then \ $(GOPLS_VERSION_DIR)/gopls check $(SOURCES) 2>&1 >$(GOPLS_TEMP_FILE)
fyrchik marked this conversation as resolved Outdated

Was there any problem with the previous implementation (pipe instead of temp file)?

Was there any problem with the previous implementation (pipe instead of temp file)?

We are unable to use tee /dev/tty for security reason. If replace it for ... check 2>&1 | tee | wc -l there are no output for error.

We are unable to use `tee /dev/tty` for security reason. If replace it for `... check 2>&1 | tee | wc -l` there are no output for error.
@if [[ $$(wc -l < $(GOPLS_TEMP_FILE)) -ne 0 ]]; then \
cat $(GOPLS_TEMP_FILE); \
exit 1; \ exit 1; \
fi fi
rm $(GOPLS_TEMP_FILE)
# Run linters in Docker # Run linters in Docker
docker/lint: docker/lint:

View file

@ -38,9 +38,7 @@ const (
groupTarget = "group" groupTarget = "group"
) )
var ( var errUnknownTargetType = errors.New("unknown target type")
errUnknownTargetType = errors.New("unknown target type")
)
var addCmd = &cobra.Command{ var addCmd = &cobra.Command{
Use: "add", Use: "add",

View file

@ -152,7 +152,11 @@ func printECInfoErr(cmd *cobra.Command, err error) bool {
ok := errors.As(err, &errECInfo) ok := errors.As(err, &errECInfo)
if ok { if ok {
cmd.PrintErrln("Object is erasure-encoded, ec information received.") toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
toProto, _ := cmd.Flags().GetBool("proto")
if !(toJSON || toProto) {
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
}
printECInfo(cmd, errECInfo.ECInfo()) printECInfo(cmd, errECInfo.ECInfo())
} }

View file

@ -357,6 +357,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
_, err := internal.HeadObject(cmd.Context(), prmHead) _, err := internal.HeadObject(cmd.Context(), prmHead)
var errSplit *objectSDK.SplitInfoError var errSplit *objectSDK.SplitInfoError
var errEC *objectSDK.ECInfoError
switch { switch {
fyrchik marked this conversation as resolved Outdated

Why have you replaced switch with else if chain?

Why have you replaced switch with `else if` chain?

Because previous implementation was only for Split Info. Thought it should be more readable. Reverted switch back.

Because previous implementation was only for `Split Info`. Thought it should be more readable. Reverted switch back.

IMO it is exactly the opposite -- else if is ok once, switch is less verbose for multiple branches.

IMO it is exactly the opposite -- `else if` is ok once, switch is less verbose for multiple branches.
default: default:
@ -366,19 +367,22 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
return nil return nil
case errors.As(err, &errSplit): case errors.As(err, &errSplit):
common.PrintVerbose(cmd, "Split information received - object is virtual.") common.PrintVerbose(cmd, "Split information received - object is virtual.")
splitInfo := errSplit.SplitInfo()
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
return members
}
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnr); ok {
return members
}
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
case errors.As(err, &errEC):
common.PrintVerbose(cmd, "Object is erasure-coded.")
return nil
} }
return nil
splitInfo := errSplit.SplitInfo()
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
return members
}
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnr); ok {
return members
}
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
} }
fyrchik marked this conversation as resolved Outdated

So we add each chunk to the tombstone/lock? It is a problem, because chunks may be missing (with split it cannot be the case, it means DL, with EC it is ok).

So we add each chunk to the tombstone/lock? It is a problem, because chunks may be missing (with split it cannot be the case, it means DL, with EC it is ok).

Oh, thanks, that was from previous implementation, removed.

Oh, thanks, that was from previous implementation, removed.

Does the new implementation still pass sanity tests?

Does the new implementation still pass sanity tests?

Execute each time when changed sensitive part of the code.

Execute each time when changed sensitive part of the code.
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) { func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) {

View file

@ -95,6 +95,7 @@ const (
DeleteFormingSplitInfo = "forming split info..." DeleteFormingSplitInfo = "forming split info..."
DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..." DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..."
DeleteMembersSuccessfullyCollected = "members successfully collected" DeleteMembersSuccessfullyCollected = "members successfully collected"
DeleteECObjectReceived = "erasure-coded object received, form tombstone"
GetRemoteCallFailed = "remote call failed" GetRemoteCallFailed = "remote call failed"
GetCanNotAssembleTheObject = "can not assemble the object" GetCanNotAssembleTheObject = "can not assemble the object"
GetTryingToAssembleTheObject = "trying to assemble the object..." GetTryingToAssembleTheObject = "trying to assemble the object..."
@ -213,6 +214,7 @@ const (
EngineFinishedRemovalOfLocallyredundantCopies = "finished removal of locally-redundant copies" EngineFinishedRemovalOfLocallyredundantCopies = "finished removal of locally-redundant copies"
EngineRemovingAnObjectWithoutFullLockingCheck = "removing an object without full locking check" EngineRemovingAnObjectWithoutFullLockingCheck = "removing an object without full locking check"
EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks" EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks"
EngineInterruptGettingLockers = "can't get object's lockers"
EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks" EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks"
EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only" EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only"
EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode" EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode"

View file

@ -76,13 +76,14 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
is bool is bool
} }
var splitInfo *objectSDK.SplitInfo var splitInfo *objectSDK.SplitInfo
var ecInfo *objectSDK.ECInfo
// Removal of a big object is done in multiple stages: // Removal of a big object is done in multiple stages:
// 1. Remove the parent object. If it is locked or already removed, return immediately. // 1. Remove the parent object. If it is locked or already removed, return immediately.
// 2. Otherwise, search for all objects with a particular SplitID and delete them too. // 2. Otherwise, search for all objects with a particular SplitID and delete them too.
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
var existsPrm shard.ExistsPrm var existsPrm shard.ExistsPrm
existsPrm.SetAddress(prm.addr) existsPrm.Address = prm.addr
resExists, err := sh.Exists(ctx, existsPrm) resExists, err := sh.Exists(ctx, existsPrm)
if err != nil { if err != nil {
@ -91,13 +92,18 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
} }
var splitErr *objectSDK.SplitInfoError var splitErr *objectSDK.SplitInfoError
if !errors.As(err, &splitErr) { var ecErr *objectSDK.ECInfoError
if errors.As(err, &splitErr) {
splitInfo = splitErr.SplitInfo()
} else if errors.As(err, &ecErr) {
e.deleteChunks(ctx, sh, ecInfo, prm)
return false
} else {
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {
e.reportShardError(sh, "could not check object existence", err) e.reportShardError(sh, "could not check object existence", err)
} }
return false return false
} }
splitInfo = splitErr.SplitInfo()
} else if !resExists.Exists() { } else if !resExists.Exists() {
return false return false
} }
@ -171,3 +177,31 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo
return false return false
}) })
} }
func (e *StorageEngine) deleteChunks(
ctx context.Context, sh hashedShard, ecInfo *objectSDK.ECInfo, prm DeletePrm,
) {
var inhumePrm shard.InhumePrm
if prm.forceRemoval {
inhumePrm.ForceRemoval()
}
for _, chunk := range ecInfo.Chunks {
var addr oid.Address
addr.SetContainer(prm.addr.Container())
var objID oid.ID
err := objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(sh, "could not delete EC chunk", err)
}
addr.SetObject(objID)
inhumePrm.MarkAsGarbage(addr)
_, err = sh.Inhume(ctx, inhumePrm)
if err != nil {
e.log.Debug(logs.EngineCouldNotInhumeObjectInShard,
zap.Stringer("addr", addr),
zap.String("err", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
continue
fyrchik marked this conversation as resolved Outdated

There is an error which we have ignored. What happens with this yet-to-be-removed chunk?

There is an error which we have ignored. What happens with this yet-to-be-removed chunk?

It will be removed by remover at next iteration. The behavior is the same as for complex object, see deleteChildren().

It will be removed by `remover` at next iteration. The behavior is the same as for complex object, see [deleteChildren()](https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/3627b44e92395d2be7eeda9790513021b9f345ca/pkg/local_object_storage/engine/delete.go#L136).
}
}
}

View file

@ -17,6 +17,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/hrw" "git.frostfs.info/TrueCloudLab/hrw"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
@ -62,7 +63,10 @@ func benchmarkExists(b *testing.B, shardNum int) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
ok, err := e.exists(context.Background(), addr) var shPrm shard.ExistsPrm

The interface is confusing, we have 2 identical types with different meaning.
What about accepting shard.ExistsPrm?

The interface is confusing, we have 2 identical types with different meaning. What about accepting `shard.ExistsPrm`?

In this case we need to make fields of ExistsPrm public, are you ok?

In this case we need to make fields of `ExistsPrm` public, are you ok?

Implemented in a separate commit.

Implemented in a separate commit.
shPrm.Address = addr
shPrm.ParentAddress = oid.Address{}
ok, _, err := e.exists(context.Background(), shPrm)
if err != nil || ok { if err != nil || ok {
b.Fatalf("%t %v", ok, err) b.Fatalf("%t %v", ok, err)
} }

View file

@ -8,16 +8,16 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
) )
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) { // exists return in the first value true if object exists.
var shPrm shard.ExistsPrm // Second return value marks is parent object locked.
shPrm.SetAddress(addr) func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool, bool, error) {
alreadyRemoved := false alreadyRemoved := false
exists := false exists := false
locked := false
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(shPrm.Address, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Exists(ctx, shPrm) res, err := sh.Exists(ctx, shPrm)
if err != nil { if err != nil {
if client.IsErrObjectAlreadyRemoved(err) { if client.IsErrObjectAlreadyRemoved(err) {
@ -44,13 +44,16 @@ func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, err
if !exists { if !exists {
exists = res.Exists() exists = res.Exists()
} }
if !locked {
locked = res.Locked()
}
return false return false
}) })
if alreadyRemoved { if alreadyRemoved {
return false, new(apistatus.ObjectAlreadyRemoved) return false, false, new(apistatus.ObjectAlreadyRemoved)
} }
return exists, nil return exists, locked, nil
} }

View file

@ -142,7 +142,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
}() }()
if checkExists { if checkExists {
existPrm.SetAddress(addr) existPrm.Address = addr
exRes, err := sh.Exists(ctx, existPrm) exRes, err := sh.Exists(ctx, existPrm)
if err != nil { if err != nil {
if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) { if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) {
@ -152,7 +152,8 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
} }
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
if !errors.As(err, &siErr) { var ecErr *objectSDK.ECInfoError
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
e.reportShardError(sh, "could not check for presents in shard", err) e.reportShardError(sh, "could not check for presents in shard", err)
return return
} }
@ -220,6 +221,33 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
return locked, outErr return locked, outErr
} }
// GetLocked return lock id's if object is locked according to StorageEngine's state.
func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.GetLocked",
trace.WithAttributes(
attribute.String("address", addr.EncodeToString()),
))
defer span.End()
var locked []oid.ID
var outErr error
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
ld, err := h.Shard.GetLocked(ctx, addr)
if err != nil {
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr),
fyrchik marked this conversation as resolved Outdated

It is a log message, should be a const. Also, why didn't the linter fail? cc @achuprov

It is a log message, should be a const. Also, why didn't the linter fail? cc @achuprov

Thanks, updated.

Thanks, updated.
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err
}
locked = append(locked, ld...)
return false
})
if len(locked) > 0 {
return locked, nil
}
return locked, outErr
}
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) { func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredTombstones(ctx, addrs) sh.HandleExpiredTombstones(ctx, addrs)

View file

@ -78,12 +78,31 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
if checkExists { if checkExists {
var existsPrm shard.ExistsPrm var existsPrm shard.ExistsPrm
existsPrm.SetAddress(addrLocked) existsPrm.Address = addrLocked
exRes, err := sh.Exists(ctx, existsPrm) exRes, err := sh.Exists(ctx, existsPrm)
if err != nil { if err != nil {
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
if !errors.As(err, &siErr) { var eiErr *objectSDK.ECInfoError
if errors.As(err, &eiErr) {
eclocked := []oid.ID{locked}
for _, chunk := range eiErr.ECInfo().Chunks {
var objID oid.ID
err = objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
return false
}
eclocked = append(eclocked, objID)
}
err = sh.Lock(ctx, idCnr, locker, eclocked)
if err != nil {
e.reportShardError(sh, "could not lock object in shard", err)
return false
}
root = true
return false
} else if !errors.As(err, &siErr) {
if shard.IsErrObjectExpired(err) { if shard.IsErrObjectExpired(err) {
// object is already expired => // object is already expired =>
// do not lock it // do not lock it

View file

@ -80,11 +80,32 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// In #1146 this check was parallelized, however, it became // In #1146 this check was parallelized, however, it became
// much slower on fast machines for 4 shards. // much slower on fast machines for 4 shards.
_, err := e.exists(ctx, addr) var parent oid.Address
if prm.obj.ECHeader() != nil {
parent.SetObject(prm.obj.ECHeader().Parent())
parent.SetContainer(addr.Container())
}
var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = parent
existed, locked, err := e.exists(ctx, shPrm)
if err != nil { if err != nil {
return err return err
} }
if !existed && locked {
lockers, err := e.GetLocked(ctx, parent)
if err != nil {
return err

Do we lock an object before we have put it? It seems like a problem, because this lock record can persist indefinitely.

Do we lock an object before we have put it? It seems like a problem, because this lock record can persist indefinitely.

Didn't catch the problem. Here we are persisting lock for a chunk before put, because we need to avoid gc removing it. This is reconstruction scenario - when we need to put chunk on the node. If there is no lock for a chunk, gc will inhume it.

Didn't catch the problem. Here we are persisting lock for a chunk before put, because we need to avoid `gc` removing it. This is reconstruction scenario - when we need to put chunk on the node. If there is no lock for a chunk, `gc` will inhume it.

The problem is atomicity -- lock -> CRASH -> put and we now have some garbage about locks which will (?) be removed eventually.
We could do it atomically in put instead, this would also ensure we put info on the same shard.

The problem is atomicity -- `lock -> CRASH -> put` and we now have some garbage about locks which will (?) be removed eventually. We could do it atomically in `put` instead, this would also ensure we put info on the same shard.

As a result of discussion, we need to move gc on a storage engine level. Created #1151 for tracking.

As a result of discussion, we need to move `gc` on a storage engine level. Created https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/1151 for tracking.
}
for _, locker := range lockers {
err = e.lock(ctx, addr.Container(), locker, []oid.ID{addr.Object()})
if err != nil {
return err
}
}
}
var shRes putToShardRes var shRes putToShardRes
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
e.mtx.RLock() e.mtx.RLock()
@ -120,7 +141,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
defer close(exitCh) defer close(exitCh)
var existPrm shard.ExistsPrm var existPrm shard.ExistsPrm
existPrm.SetAddress(addr) existPrm.Address = addr
exists, err := sh.Exists(ctx, existPrm) exists, err := sh.Exists(ctx, existPrm)
if err != nil { if err != nil {

View file

@ -115,7 +115,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address
found := false found := false
for i := range shards { for i := range shards {
var existsPrm shard.ExistsPrm var existsPrm shard.ExistsPrm
existsPrm.SetAddress(addr) existsPrm.Address = addr
res, err := shards[i].Exists(ctx, existsPrm) res, err := shards[i].Exists(ctx, existsPrm)
if err != nil { if err != nil {

View file

@ -267,8 +267,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
return deleteSingleResult{}, nil return deleteSingleResult{}, nil
} }
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) { var ecErr *objectSDK.ECInfoError
if errors.As(err, &siErr) || errors.As(err, &ecErr) {
// if object is virtual (parent) then do nothing, it will be deleted with last child // if object is virtual (parent) then do nothing, it will be deleted with last child
// if object is erasure-coded it will be deleted with the last chunk presented on the shard
return deleteSingleResult{}, nil return deleteSingleResult{}, nil
} }
@ -471,6 +473,46 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
name: rootBucketName(cnr, bucketName), name: rootBucketName(cnr, bucketName),
key: objKey, key: objKey,
}) })
if obj.ECHeader() != nil {
err := delECInfo(tx, cnr, objKey, obj.ECHeader())
if err != nil {
return err
}
}
return nil return nil
} }
func delECInfo(tx *bbolt.Tx, cnr cid.ID, objKey []byte, ecHead *objectSDK.ECHeader) error {
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
if len(val) > 0 {
if bytes.Equal(val, objKey) {
delUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, bucketName),
key: parentID,
})
} else {
val = bytes.Clone(val)
offset := 0
for offset < len(val) {
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
fyrchik marked this conversation as resolved Outdated

val is received from getFromBucket. Is it taken from bbolt or freshly allocated? Bbolt prohibits changing values in some cases.

`val` is received from `getFromBucket`. Is it taken from bbolt or freshly allocated? Bbolt prohibits changing values in some cases.

According to doc, val should be valid for the life of the transaction. Let's clone it.

According to doc, `val` should be valid for the life of the transaction. Let's clone it.

This line is more important // The returned memory is owned by bbolt and must never be modified; writing to this memory might corrupt the database.

This line is more important `// The returned memory is owned by bbolt and must never be modified; writing to this memory might corrupt the database.`

This line is from the newest version. Looks like we need to update bbolt.

This line is from the newest version. Looks like we need to update `bbolt`.
val = append(val[:offset], val[offset+objectKeySize:]...)
break
}
offset += objectKeySize
}
err := putUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
key: parentID,
val: val,
})
if err != nil {
return err
}
}
}
return nil
}

View file

@ -20,12 +20,14 @@ import (
// ExistsPrm groups the parameters of Exists operation. // ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct { type ExistsPrm struct {
addr oid.Address addr oid.Address
paddr oid.Address
} }
// ExistsRes groups the resulting values of Exists operation. // ExistsRes groups the resulting values of Exists operation.
type ExistsRes struct { type ExistsRes struct {
exists bool exists bool
locked bool
} }
var ErrLackSplitInfo = logicerr.New("no split info on parent object") var ErrLackSplitInfo = logicerr.New("no split info on parent object")
@ -35,11 +37,21 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr p.addr = addr
} }
// SetParent is an Exists option to set objects parent.
func (p *ExistsPrm) SetParent(addr oid.Address) {
p.paddr = addr
}
// Exists returns the fact that the object is in the metabase. // Exists returns the fact that the object is in the metabase.
func (p ExistsRes) Exists() bool { func (p ExistsRes) Exists() bool {
return p.exists return p.exists
} }
// Locked returns the fact that the object is locked.
func (p ExistsRes) Locked() bool {
return p.locked
}
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it // Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
// returns true if addr is in primary index or false if it is not. // returns true if addr is in primary index or false if it is not.
// //
@ -70,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.View(func(tx *bbolt.Tx) error { err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.exists, err = db.exists(tx, prm.addr, currEpoch) res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
return err return err
}) })
@ -78,15 +90,19 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
return res, metaerr.Wrap(err) return res, metaerr.Wrap(err)
} }
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists bool, err error) { func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
var locked bool
if !parent.Equals(oid.Address{}) {
locked = objectLocked(tx, parent.Container(), parent.Object())
}
// check graveyard and object expiration first // check graveyard and object expiration first
switch objectStatus(tx, addr, currEpoch) { switch objectStatus(tx, addr, currEpoch) {
case 1: case 1:
return false, logicerr.Wrap(new(apistatus.ObjectNotFound)) return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
case 2: case 2:
return false, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved)) return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
case 3: case 3:
return false, ErrObjectIsExpired return false, locked, ErrObjectIsExpired
} }
objKey := objectKey(addr.Object(), make([]byte, objectKeySize)) objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
@ -96,25 +112,25 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
// if graveyard is empty, then check if object exists in primary bucket // if graveyard is empty, then check if object exists in primary bucket
if inBucket(tx, primaryBucketName(cnr, key), objKey) { if inBucket(tx, primaryBucketName(cnr, key), objKey) {
return true, nil return true, locked, nil
} }
// if primary bucket is empty, then check if object exists in parent bucket // if primary bucket is empty, then check if object exists in parent bucket
if inBucket(tx, parentBucketName(cnr, key), objKey) { if inBucket(tx, parentBucketName(cnr, key), objKey) {
splitInfo, err := getSplitInfo(tx, cnr, objKey) splitInfo, err := getSplitInfo(tx, cnr, objKey)
if err != nil { if err != nil {
return false, err return false, locked, err
} }
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
} }
// if parent bucket is empty, then check if object exists in ec bucket // if parent bucket is empty, then check if object exists in ec bucket
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 { if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
return false, getECInfoError(tx, cnr, data) return false, locked, getECInfoError(tx, cnr, data)
} }
// if parent bucket is empty, then check if object exists in typed buckets // if parent bucket is empty, then check if object exists in typed buckets
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil
} }
// objectStatus returns: // objectStatus returns:

View file

@ -229,11 +229,17 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch) obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
targetKey := addressKey(prm.target[i], buf) targetKey := addressKey(prm.target[i], buf)
var ecErr *objectSDK.ECInfoError
if err == nil { if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res) err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
if err != nil { if err != nil {
return err return err
} }
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey)
if err != nil {
return err
}
} }
if prm.tomb != nil { if prm.tomb != nil {
@ -272,6 +278,43 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
return db.applyInhumeResToCounters(tx, res) return db.applyInhumeResToCounters(tx, res)
} }
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte,
) error {
for _, chunk := range ecInfo.Chunks {
chunkBuf := make([]byte, addressKeySize)
var chunkAddr oid.Address
chunkAddr.SetContainer(cnr)
var chunkID oid.ID
err := chunkID.ReadFromV2(chunk.ID)
if err != nil {
return err
}
chunkAddr.SetObject(chunkID)
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
if err != nil {
return err
}
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res)
if err != nil {
return err
}
chunkKey := addressKey(chunkAddr, chunkBuf)
if tomb != nil {
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
if err != nil {
return err
}
}
err = targetBucket.Put(chunkKey, value)
if err != nil {
return err
}
}
return nil
}
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil { if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
return err return err

View file

@ -175,6 +175,31 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
return false return false
} }
// return `LOCK` id's if specified object is locked in the specified container.
func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
var lockers []oid.ID
bucketLocked := tx.Bucket(bucketNameLocked)
if bucketLocked != nil {
key := make([]byte, cidSize)
idCnr.Encode(key)
bucketLockedContainer := bucketLocked.Bucket(key)
if bucketLockedContainer != nil {
binObjIDs, err := decodeList(bucketLockedContainer.Get(objectKey(idObj, key)))
if err != nil {
return nil, fmt.Errorf("decode list of object lockers: %w", err)
}
for _, binObjID := range binObjIDs {
var id oid.ID
if err = id.Decode(binObjID); err != nil {
return nil, err
}
lockers = append(lockers, id)
}
}
}
return lockers, nil
}
// releases all records about the objects locked by the locker. // releases all records about the objects locked by the locker.
// Returns slice of unlocked object ID's or an error. // Returns slice of unlocked object ID's or an error.
// //
@ -325,3 +350,36 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e
success = err == nil success = err == nil
return res, err return res, err
} }
// GetLocked return `LOCK` id's if provided object is locked by any `LOCK`. Not found
// object is considered as non-locked.
//
// Returns only non-logical errors related to underlying database.
func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, err error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("GetLocked", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.GetLocked",
trace.WithAttributes(
attribute.String("address", addr.EncodeToString()),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return res, ErrDegradedMode
}
err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
res, err = getLocked(tx, addr.Container(), addr.Object())
return nil
}))
success = err == nil
return res, err
}

View file

@ -115,7 +115,7 @@ func (db *DB) put(tx *bbolt.Tx,
isParent := si != nil isParent := si != nil
exists, err := db.exists(tx, objectCore.AddressOf(obj), currEpoch) exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
var splitInfoError *objectSDK.SplitInfoError var splitInfoError *objectSDK.SplitInfoError
if errors.As(err, &splitInfoError) { if errors.As(err, &splitInfoError) {

View file

@ -428,7 +428,7 @@ func (db *DB) selectObjectID(
addr.SetObject(id) addr.SetObject(id)
var splitInfoError *objectSDK.SplitInfoError var splitInfoError *objectSDK.SplitInfoError
ok, err := db.exists(tx, addr, currEpoch) ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch)
if (err == nil && ok) || errors.As(err, &splitInfoError) { if (err == nil && ok) || errors.As(err, &splitInfoError) {
raw := make([]byte, objectKeySize) raw := make([]byte, objectKeySize)
id.Encode(raw) id.Encode(raw)

View file

@ -13,17 +13,16 @@ import (
// ExistsPrm groups the parameters of Exists operation. // ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct { type ExistsPrm struct {
addr oid.Address // Exists option to set object checked for existence.
Address oid.Address
// Exists option to set parent object checked for existence.
ParentAddress oid.Address
} }
// ExistsRes groups the resulting values of Exists operation. // ExistsRes groups the resulting values of Exists operation.
type ExistsRes struct { type ExistsRes struct {
ex bool ex bool
} lc bool
// SetAddress is an Exists option to set object checked for existence.
func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr
} }
// Exists returns the fact that the object is in the shard. // Exists returns the fact that the object is in the shard.
@ -31,6 +30,11 @@ func (p ExistsRes) Exists() bool {
return p.ex return p.ex
} }
// Locked returns the fact that the object is locked.
func (p ExistsRes) Locked() bool {
return p.lc
}
// Exists checks if object is presented in shard. // Exists checks if object is presented in shard.
// //
// Returns any error encountered that does not allow to // Returns any error encountered that does not allow to
@ -43,11 +47,12 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists", ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists",
trace.WithAttributes( trace.WithAttributes(
attribute.String("shard_id", s.ID().String()), attribute.String("shard_id", s.ID().String()),
attribute.String("address", prm.addr.EncodeToString()), attribute.String("address", prm.Address.EncodeToString()),
)) ))
defer span.End() defer span.End()
var exists bool var exists bool
var locked bool
var err error var err error
s.m.RLock() s.m.RLock()
@ -57,21 +62,24 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
return ExistsRes{}, ErrShardDisabled return ExistsRes{}, ErrShardDisabled
} else if s.info.Mode.NoMetabase() { } else if s.info.Mode.NoMetabase() {
var p common.ExistsPrm var p common.ExistsPrm
p.Address = prm.addr p.Address = prm.Address
var res common.ExistsRes var res common.ExistsRes
res, err = s.blobStor.Exists(ctx, p) res, err = s.blobStor.Exists(ctx, p)
exists = res.Exists exists = res.Exists
} else { } else {
var existsPrm meta.ExistsPrm var existsPrm meta.ExistsPrm
existsPrm.SetAddress(prm.addr) existsPrm.SetAddress(prm.Address)
existsPrm.SetParent(prm.ParentAddress)
var res meta.ExistsRes var res meta.ExistsRes
res, err = s.metaBase.Exists(ctx, existsPrm) res, err = s.metaBase.Exists(ctx, existsPrm)
exists = res.Exists() exists = res.Exists()
locked = res.Locked()
} }
return ExistsRes{ return ExistsRes{
ex: exists, ex: exists,
lc: locked,
}, err }, err
} }

View file

@ -525,7 +525,9 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
} }
log.Debug(logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp))) log.Debug(logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp)))
s.expiredTombstonesCallback(ctx, tssExp) if len(tssExp) > 0 {
fyrchik marked this conversation as resolved Outdated

To be clear: is this an optimization or a functional change?

To be clear: is this an optimization or a functional change?

It is an optimization - here we do nothing but getting lock on metabase, because call db.boltDB.Update(...).

It is an optimization - here we do nothing but getting lock on metabase, because call `db.boltDB.Update(...)`.
s.expiredTombstonesCallback(ctx, tssExp)
}
iterPrm.SetOffset(tss[tssLen-1].Address()) iterPrm.SetOffset(tss[tssLen-1].Address())
tss = tss[:0] tss = tss[:0]

View file

@ -71,3 +71,20 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
return res.Locked(), nil return res.Locked(), nil
} }
// GetLocked return lock id's of the provided object. Not found object is
// considered as not locked. Requires healthy metabase, returns ErrDegradedMode otherwise.
func (s *Shard) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetLocked",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", addr.EncodeToString()),
))
defer span.End()
m := s.GetMode()
if m.NoMetabase() {
return nil, ErrDegradedMode
}
return s.metaBase.GetLocked(ctx, addr)
}

View file

@ -72,7 +72,7 @@ func TestShardReload(t *testing.T) {
checkHasObjects := func(t *testing.T, exists bool) { checkHasObjects := func(t *testing.T, exists bool) {
for i := range objects { for i := range objects {
var prm ExistsPrm var prm ExistsPrm
prm.SetAddress(objects[i].addr) prm.Address = objects[i].addr
res, err := sh.Exists(context.Background(), prm) res, err := sh.Exists(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)

View file

@ -24,9 +24,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var ( var errEmptyBodySignature = errors.New("malformed request: empty body signature")
errEmptyBodySignature = errors.New("malformed request: empty body signature")
)
type cfg struct { type cfg struct {
log *logger.Logger log *logger.Logger

View file

@ -2,6 +2,7 @@ package deletesvc
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"strconv" "strconv"
@ -40,7 +41,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
)} )}
} }
func (exec execCtx) isLocal() bool { func (exec *execCtx) isLocal() bool {
return exec.prm.common.LocalOnly() return exec.prm.common.LocalOnly()
} }
@ -64,10 +65,33 @@ func (exec *execCtx) newAddress(id oid.ID) oid.Address {
return a return a
} }
func (exec *execCtx) formSplitInfo(ctx context.Context) error { func (exec *execCtx) formExtendedInfo(ctx context.Context) error {
var err error _, err := exec.svc.header.head(ctx, exec)
exec.splitInfo, err = exec.svc.header.splitInfo(ctx, exec)
if err != nil && !apiclient.IsErrObjectAlreadyRemoved(err) { var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
switch {
case err == nil:
return nil
case errors.As(err, &errSplitInfo):
exec.splitInfo = errSplitInfo.SplitInfo()
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
if err := exec.collectMembers(ctx); err != nil {
return err
}
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
return nil
case errors.As(err, &errECInfo):
exec.log.Debug(logs.DeleteECObjectReceived)
return nil
}
if !apiclient.IsErrObjectAlreadyRemoved(err) {
// IsErrObjectAlreadyRemoved check is required because splitInfo // IsErrObjectAlreadyRemoved check is required because splitInfo
// implicitly performs Head request that may return ObjectAlreadyRemoved // implicitly performs Head request that may return ObjectAlreadyRemoved
// status that is not specified for Delete. // status that is not specified for Delete.

View file

@ -35,19 +35,9 @@ func (exec *execCtx) formTombstone(ctx context.Context) error {
exec.log.Debug(logs.DeleteFormingSplitInfo) exec.log.Debug(logs.DeleteFormingSplitInfo)
if err := exec.formSplitInfo(ctx); err != nil { if err := exec.formExtendedInfo(ctx); err != nil {
return fmt.Errorf("form split info: %w", err) return fmt.Errorf("form extended info: %w", err)
} }
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
if err := exec.collectMembers(ctx); err != nil {
return err
}
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
return exec.initTombstoneObject() return exec.initTombstoneObject()
} }

View file

@ -41,7 +41,7 @@ type cfg struct {
header interface { header interface {
// must return (nil, nil) for PHY objects // must return (nil, nil) for PHY objects
splitInfo(context.Context, *execCtx) (*objectSDK.SplitInfo, error) head(context.Context, *execCtx) (*objectSDK.Object, error)
children(context.Context, *execCtx) ([]oid.ID, error) children(context.Context, *execCtx) ([]oid.ID, error)

View file

@ -2,7 +2,6 @@ package deletesvc
import ( import (
"context" "context"
"errors"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
@ -44,19 +43,8 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi
return wr.Object(), nil return wr.Object(), nil
} }
func (w *headSvcWrapper) splitInfo(ctx context.Context, exec *execCtx) (*objectSDK.SplitInfo, error) { func (w *headSvcWrapper) head(ctx context.Context, exec *execCtx) (*objectSDK.Object, error) {
_, err := w.headAddress(ctx, exec, exec.address()) return w.headAddress(ctx, exec, exec.address())
var errSplitInfo *objectSDK.SplitInfoError
switch {
case err == nil:
return nil, nil
case errors.As(err, &errSplitInfo):
return errSplitInfo.SplitInfo(), nil
default:
return nil, err
}
} }
func (w *headSvcWrapper) children(ctx context.Context, exec *execCtx) ([]oid.ID, error) { func (w *headSvcWrapper) children(ctx context.Context, exec *execCtx) ([]oid.ID, error) {

View file

@ -41,12 +41,10 @@ func (r *request) assemble(ctx context.Context) {
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r) assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
r.log.Debug(logs.GetAssemblingSplittedObject, r.log.Debug(logs.GetAssemblingSplittedObject,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )
defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted, defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )
@ -55,7 +53,6 @@ func (r *request) assemble(ctx context.Context) {
if err != nil { if err != nil {
r.log.Warn(logs.GetFailedToAssembleSplittedObject, r.log.Warn(logs.GetFailedToAssembleSplittedObject,
zap.Error(err), zap.Error(err),
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )

View file

@ -39,12 +39,10 @@ func (r *request) assembleEC(ctx context.Context) {
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log) assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
r.log.Debug(logs.GetAssemblingECObject, r.log.Debug(logs.GetAssemblingECObject,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )
defer r.log.Debug(logs.GetAssemblingECObjectCompleted, defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )
@ -53,7 +51,6 @@ func (r *request) assembleEC(ctx context.Context) {
if err != nil { if err != nil {
r.log.Warn(logs.GetFailedToAssembleECObject, r.log.Warn(logs.GetFailedToAssembleECObject,
zap.Error(err), zap.Error(err),
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )

View file

@ -28,7 +28,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
)} )}
} }
func (exec execCtx) isLocal() bool { func (exec *execCtx) isLocal() bool {
return exec.prm.common.LocalOnly() return exec.prm.common.LocalOnly()
} }