node: Implement Lock\Delete
requests for EC object #1147
8
Makefile
|
@ -44,9 +44,12 @@ PROTOGEN_FROSTFS_DIR ?= $(PROTOBUF_DIR)/protogen-$(PROTOGEN_FROSTFS_VERSION)
|
|||
STATICCHECK_DIR ?= $(abspath $(BIN))/staticcheck
|
||||
STATICCHECK_VERSION_DIR ?= $(STATICCHECK_DIR)/$(STATICCHECK_VERSION)
|
||||
|
||||
SOURCES = $(shell find . -type f -name "*.go" -print)
|
||||
|
||||
GOPLS_VERSION ?= v0.15.1
|
||||
GOPLS_DIR ?= $(abspath $(BIN))/gopls
|
||||
GOPLS_VERSION_DIR ?= $(GOPLS_DIR)/$(GOPLS_VERSION)
|
||||
GOPLS_TEMP_FILE := $(shell mktemp)
|
||||
|
||||
FROSTFS_CONTRACTS_PATH=$(abspath ./../frostfs-contract)
|
||||
LOCODE_DB_PATH=$(abspath ./.cache/locode_db)
|
||||
|
@ -220,9 +223,12 @@ gopls-run:
|
|||
@if [ ! -d "$(GOPLS_VERSION_DIR)" ]; then \
|
||||
make gopls-install; \
|
||||
fi
|
||||
@if [[ $$(find . -type f -name "*.go" -print | xargs $(GOPLS_VERSION_DIR)/gopls check | tee /dev/tty | wc -l) -ne 0 ]]; then \
|
||||
$(GOPLS_VERSION_DIR)/gopls check $(SOURCES) 2>&1 >$(GOPLS_TEMP_FILE)
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
@if [[ $$(wc -l < $(GOPLS_TEMP_FILE)) -ne 0 ]]; then \
|
||||
cat $(GOPLS_TEMP_FILE); \
|
||||
exit 1; \
|
||||
fi
|
||||
rm $(GOPLS_TEMP_FILE)
|
||||
|
||||
# Run linters in Docker
|
||||
docker/lint:
|
||||
|
|
|
@ -38,9 +38,7 @@ const (
|
|||
groupTarget = "group"
|
||||
)
|
||||
|
||||
var (
|
||||
errUnknownTargetType = errors.New("unknown target type")
|
||||
)
|
||||
var errUnknownTargetType = errors.New("unknown target type")
|
||||
|
||||
var addCmd = &cobra.Command{
|
||||
Use: "add",
|
||||
|
|
|
@ -152,7 +152,11 @@ func printECInfoErr(cmd *cobra.Command, err error) bool {
|
|||
ok := errors.As(err, &errECInfo)
|
||||
|
||||
if ok {
|
||||
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
|
||||
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
|
||||
toProto, _ := cmd.Flags().GetBool("proto")
|
||||
if !(toJSON || toProto) {
|
||||
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
|
||||
}
|
||||
printECInfo(cmd, errECInfo.ECInfo())
|
||||
}
|
||||
|
||||
|
|
|
@ -357,6 +357,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
_, err := internal.HeadObject(cmd.Context(), prmHead)
|
||||
|
||||
var errSplit *objectSDK.SplitInfoError
|
||||
var errEC *objectSDK.ECInfoError
|
||||
|
||||
switch {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why have you replaced switch with Why have you replaced switch with `else if` chain?
acid-ant
commented
Because previous implementation was only for Because previous implementation was only for `Split Info`. Thought it should be more readable. Reverted switch back.
fyrchik
commented
IMO it is exactly the opposite -- IMO it is exactly the opposite -- `else if` is ok once, switch is less verbose for multiple branches.
|
||||
default:
|
||||
|
@ -366,19 +367,22 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
return nil
|
||||
case errors.As(err, &errSplit):
|
||||
common.PrintVerbose(cmd, "Split information received - object is virtual.")
|
||||
splitInfo := errSplit.SplitInfo()
|
||||
|
||||
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
|
||||
return members
|
||||
}
|
||||
|
||||
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnr); ok {
|
||||
return members
|
||||
}
|
||||
|
||||
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
|
||||
case errors.As(err, &errEC):
|
||||
common.PrintVerbose(cmd, "Object is erasure-coded.")
|
||||
return nil
|
||||
}
|
||||
|
||||
splitInfo := errSplit.SplitInfo()
|
||||
|
||||
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
|
||||
return members
|
||||
}
|
||||
|
||||
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnr); ok {
|
||||
return members
|
||||
}
|
||||
|
||||
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
|
||||
return nil
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
So we add each chunk to the tombstone/lock? It is a problem, because chunks may be missing (with split it cannot be the case, it means DL, with EC it is ok). So we add each chunk to the tombstone/lock? It is a problem, because chunks may be missing (with split it cannot be the case, it means DL, with EC it is ok).
acid-ant
commented
Oh, thanks, that was from previous implementation, removed. Oh, thanks, that was from previous implementation, removed.
fyrchik
commented
Does the new implementation still pass sanity tests? Does the new implementation still pass sanity tests?
acid-ant
commented
Execute each time when changed sensitive part of the code. Execute each time when changed sensitive part of the code.
|
||||
|
||||
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) {
|
||||
|
|
|
@ -95,6 +95,7 @@ const (
|
|||
DeleteFormingSplitInfo = "forming split info..."
|
||||
DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..."
|
||||
DeleteMembersSuccessfullyCollected = "members successfully collected"
|
||||
DeleteECObjectReceived = "erasure-coded object received, form tombstone"
|
||||
GetRemoteCallFailed = "remote call failed"
|
||||
GetCanNotAssembleTheObject = "can not assemble the object"
|
||||
GetTryingToAssembleTheObject = "trying to assemble the object..."
|
||||
|
@ -213,6 +214,7 @@ const (
|
|||
EngineFinishedRemovalOfLocallyredundantCopies = "finished removal of locally-redundant copies"
|
||||
EngineRemovingAnObjectWithoutFullLockingCheck = "removing an object without full locking check"
|
||||
EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks"
|
||||
EngineInterruptGettingLockers = "can't get object's lockers"
|
||||
EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks"
|
||||
EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only"
|
||||
EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode"
|
||||
|
|
|
@ -76,13 +76,14 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
|||
is bool
|
||||
}
|
||||
var splitInfo *objectSDK.SplitInfo
|
||||
var ecInfo *objectSDK.ECInfo
|
||||
|
||||
// Removal of a big object is done in multiple stages:
|
||||
// 1. Remove the parent object. If it is locked or already removed, return immediately.
|
||||
// 2. Otherwise, search for all objects with a particular SplitID and delete them too.
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
var existsPrm shard.ExistsPrm
|
||||
existsPrm.SetAddress(prm.addr)
|
||||
existsPrm.Address = prm.addr
|
||||
|
||||
resExists, err := sh.Exists(ctx, existsPrm)
|
||||
if err != nil {
|
||||
|
@ -91,13 +92,18 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
|||
}
|
||||
|
||||
var splitErr *objectSDK.SplitInfoError
|
||||
if !errors.As(err, &splitErr) {
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
if errors.As(err, &splitErr) {
|
||||
splitInfo = splitErr.SplitInfo()
|
||||
} else if errors.As(err, &ecErr) {
|
||||
e.deleteChunks(ctx, sh, ecInfo, prm)
|
||||
return false
|
||||
} else {
|
||||
if !client.IsErrObjectNotFound(err) {
|
||||
e.reportShardError(sh, "could not check object existence", err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
splitInfo = splitErr.SplitInfo()
|
||||
} else if !resExists.Exists() {
|
||||
return false
|
||||
}
|
||||
|
@ -171,3 +177,31 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo
|
|||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func (e *StorageEngine) deleteChunks(
|
||||
ctx context.Context, sh hashedShard, ecInfo *objectSDK.ECInfo, prm DeletePrm,
|
||||
) {
|
||||
var inhumePrm shard.InhumePrm
|
||||
if prm.forceRemoval {
|
||||
inhumePrm.ForceRemoval()
|
||||
}
|
||||
for _, chunk := range ecInfo.Chunks {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(prm.addr.Container())
|
||||
var objID oid.ID
|
||||
err := objID.ReadFromV2(chunk.ID)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not delete EC chunk", err)
|
||||
}
|
||||
addr.SetObject(objID)
|
||||
inhumePrm.MarkAsGarbage(addr)
|
||||
_, err = sh.Inhume(ctx, inhumePrm)
|
||||
if err != nil {
|
||||
e.log.Debug(logs.EngineCouldNotInhumeObjectInShard,
|
||||
zap.Stringer("addr", addr),
|
||||
zap.String("err", err.Error()),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
continue
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
There is an error which we have ignored. What happens with this yet-to-be-removed chunk? There is an error which we have ignored. What happens with this yet-to-be-removed chunk?
acid-ant
commented
It will be removed by It will be removed by `remover` at next iteration. The behavior is the same as for complex object, see [deleteChildren()](https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/3627b44e92395d2be7eeda9790513021b9f345ca/pkg/local_object_storage/engine/delete.go#L136).
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/hrw"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
|
@ -62,7 +63,10 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
|||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ok, err := e.exists(context.Background(), addr)
|
||||
var shPrm shard.ExistsPrm
|
||||
fyrchik
commented
The interface is confusing, we have 2 identical types with different meaning. The interface is confusing, we have 2 identical types with different meaning.
What about accepting `shard.ExistsPrm`?
acid-ant
commented
In this case we need to make fields of In this case we need to make fields of `ExistsPrm` public, are you ok?
acid-ant
commented
Implemented in a separate commit. Implemented in a separate commit.
|
||||
shPrm.Address = addr
|
||||
shPrm.ParentAddress = oid.Address{}
|
||||
ok, _, err := e.exists(context.Background(), shPrm)
|
||||
if err != nil || ok {
|
||||
b.Fatalf("%t %v", ok, err)
|
||||
}
|
||||
|
|
|
@ -8,16 +8,16 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) {
|
||||
var shPrm shard.ExistsPrm
|
||||
shPrm.SetAddress(addr)
|
||||
// exists return in the first value true if object exists.
|
||||
// Second return value marks is parent object locked.
|
||||
func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool, bool, error) {
|
||||
alreadyRemoved := false
|
||||
exists := false
|
||||
locked := false
|
||||
|
||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
e.iterateOverSortedShards(shPrm.Address, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.Exists(ctx, shPrm)
|
||||
if err != nil {
|
||||
if client.IsErrObjectAlreadyRemoved(err) {
|
||||
|
@ -44,13 +44,16 @@ func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, err
|
|||
if !exists {
|
||||
exists = res.Exists()
|
||||
}
|
||||
if !locked {
|
||||
locked = res.Locked()
|
||||
}
|
||||
|
||||
return false
|
||||
})
|
||||
|
||||
if alreadyRemoved {
|
||||
return false, new(apistatus.ObjectAlreadyRemoved)
|
||||
return false, false, new(apistatus.ObjectAlreadyRemoved)
|
||||
}
|
||||
|
||||
return exists, nil
|
||||
return exists, locked, nil
|
||||
}
|
||||
|
|
|
@ -142,7 +142,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
|||
}()
|
||||
|
||||
if checkExists {
|
||||
existPrm.SetAddress(addr)
|
||||
existPrm.Address = addr
|
||||
exRes, err := sh.Exists(ctx, existPrm)
|
||||
if err != nil {
|
||||
if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) {
|
||||
|
@ -152,7 +152,8 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
|||
}
|
||||
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
if !errors.As(err, &siErr) {
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
|
||||
e.reportShardError(sh, "could not check for presents in shard", err)
|
||||
return
|
||||
}
|
||||
|
@ -220,6 +221,33 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
|
|||
return locked, outErr
|
||||
}
|
||||
|
||||
// GetLocked return lock id's if object is locked according to StorageEngine's state.
|
||||
func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.GetLocked",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", addr.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var locked []oid.ID
|
||||
var outErr error
|
||||
|
||||
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||
ld, err := h.Shard.GetLocked(ctx, addr)
|
||||
if err != nil {
|
||||
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr),
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It is a log message, should be a const. Also, why didn't the linter fail? cc @achuprov It is a log message, should be a const. Also, why didn't the linter fail? cc @achuprov
acid-ant
commented
Thanks, updated. Thanks, updated.
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
outErr = err
|
||||
}
|
||||
locked = append(locked, ld...)
|
||||
return false
|
||||
})
|
||||
if len(locked) > 0 {
|
||||
return locked, nil
|
||||
}
|
||||
return locked, outErr
|
||||
}
|
||||
|
||||
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
sh.HandleExpiredTombstones(ctx, addrs)
|
||||
|
|
|
@ -78,12 +78,31 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
|||
|
||||
if checkExists {
|
||||
var existsPrm shard.ExistsPrm
|
||||
existsPrm.SetAddress(addrLocked)
|
||||
existsPrm.Address = addrLocked
|
||||
|
||||
exRes, err := sh.Exists(ctx, existsPrm)
|
||||
if err != nil {
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
if !errors.As(err, &siErr) {
|
||||
var eiErr *objectSDK.ECInfoError
|
||||
if errors.As(err, &eiErr) {
|
||||
eclocked := []oid.ID{locked}
|
||||
for _, chunk := range eiErr.ECInfo().Chunks {
|
||||
var objID oid.ID
|
||||
err = objID.ReadFromV2(chunk.ID)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not lock object in shard", err)
|
||||
return false
|
||||
}
|
||||
eclocked = append(eclocked, objID)
|
||||
}
|
||||
err = sh.Lock(ctx, idCnr, locker, eclocked)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not lock object in shard", err)
|
||||
return false
|
||||
}
|
||||
root = true
|
||||
return false
|
||||
} else if !errors.As(err, &siErr) {
|
||||
if shard.IsErrObjectExpired(err) {
|
||||
// object is already expired =>
|
||||
// do not lock it
|
||||
|
|
|
@ -80,11 +80,32 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
|
||||
// In #1146 this check was parallelized, however, it became
|
||||
// much slower on fast machines for 4 shards.
|
||||
_, err := e.exists(ctx, addr)
|
||||
var parent oid.Address
|
||||
if prm.obj.ECHeader() != nil {
|
||||
parent.SetObject(prm.obj.ECHeader().Parent())
|
||||
parent.SetContainer(addr.Container())
|
||||
}
|
||||
var shPrm shard.ExistsPrm
|
||||
shPrm.Address = addr
|
||||
shPrm.ParentAddress = parent
|
||||
existed, locked, err := e.exists(ctx, shPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !existed && locked {
|
||||
lockers, err := e.GetLocked(ctx, parent)
|
||||
if err != nil {
|
||||
return err
|
||||
fyrchik
commented
Do we lock an object before we have put it? It seems like a problem, because this lock record can persist indefinitely. Do we lock an object before we have put it? It seems like a problem, because this lock record can persist indefinitely.
acid-ant
commented
Didn't catch the problem. Here we are persisting lock for a chunk before put, because we need to avoid Didn't catch the problem. Here we are persisting lock for a chunk before put, because we need to avoid `gc` removing it. This is reconstruction scenario - when we need to put chunk on the node. If there is no lock for a chunk, `gc` will inhume it.
fyrchik
commented
The problem is atomicity -- The problem is atomicity -- `lock -> CRASH -> put` and we now have some garbage about locks which will (?) be removed eventually.
We could do it atomically in `put` instead, this would also ensure we put info on the same shard.
acid-ant
commented
As a result of discussion, we need to move As a result of discussion, we need to move `gc` on a storage engine level. Created https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/1151 for tracking.
|
||||
}
|
||||
for _, locker := range lockers {
|
||||
err = e.lock(ctx, addr.Container(), locker, []oid.ID{addr.Object()})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var shRes putToShardRes
|
||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
e.mtx.RLock()
|
||||
|
@ -120,7 +141,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
|
|||
defer close(exitCh)
|
||||
|
||||
var existPrm shard.ExistsPrm
|
||||
existPrm.SetAddress(addr)
|
||||
existPrm.Address = addr
|
||||
|
||||
exists, err := sh.Exists(ctx, existPrm)
|
||||
if err != nil {
|
||||
|
|
|
@ -115,7 +115,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address
|
|||
found := false
|
||||
for i := range shards {
|
||||
var existsPrm shard.ExistsPrm
|
||||
existsPrm.SetAddress(addr)
|
||||
existsPrm.Address = addr
|
||||
|
||||
res, err := shards[i].Exists(ctx, existsPrm)
|
||||
if err != nil {
|
||||
|
|
|
@ -267,8 +267,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
return deleteSingleResult{}, nil
|
||||
}
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
if errors.As(err, &siErr) {
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
if errors.As(err, &siErr) || errors.As(err, &ecErr) {
|
||||
// if object is virtual (parent) then do nothing, it will be deleted with last child
|
||||
// if object is erasure-coded it will be deleted with the last chunk presented on the shard
|
||||
return deleteSingleResult{}, nil
|
||||
}
|
||||
|
||||
|
@ -471,6 +473,46 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
|
|||
name: rootBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
})
|
||||
if obj.ECHeader() != nil {
|
||||
err := delECInfo(tx, cnr, objKey, obj.ECHeader())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func delECInfo(tx *bbolt.Tx, cnr cid.ID, objKey []byte, ecHead *objectSDK.ECHeader) error {
|
||||
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
|
||||
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
|
||||
if len(val) > 0 {
|
||||
if bytes.Equal(val, objKey) {
|
||||
delUniqueIndexItem(tx, namedBucketItem{
|
||||
name: ecInfoBucketName(cnr, bucketName),
|
||||
key: parentID,
|
||||
})
|
||||
} else {
|
||||
val = bytes.Clone(val)
|
||||
offset := 0
|
||||
for offset < len(val) {
|
||||
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`val` is received from `getFromBucket`. Is it taken from bbolt or freshly allocated? Bbolt prohibits changing values in some cases.
acid-ant
commented
According to doc, According to doc, `val` should be valid for the life of the transaction. Let's clone it.
fyrchik
commented
This line is more important This line is more important `// The returned memory is owned by bbolt and must never be modified; writing to this memory might corrupt the database.`
acid-ant
commented
This line is from the newest version. Looks like we need to update This line is from the newest version. Looks like we need to update `bbolt`.
|
||||
val = append(val[:offset], val[offset+objectKeySize:]...)
|
||||
break
|
||||
}
|
||||
offset += objectKeySize
|
||||
}
|
||||
err := putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
|
||||
key: parentID,
|
||||
val: val,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -20,12 +20,14 @@ import (
|
|||
|
||||
// ExistsPrm groups the parameters of Exists operation.
|
||||
type ExistsPrm struct {
|
||||
addr oid.Address
|
||||
addr oid.Address
|
||||
paddr oid.Address
|
||||
}
|
||||
|
||||
// ExistsRes groups the resulting values of Exists operation.
|
||||
type ExistsRes struct {
|
||||
exists bool
|
||||
locked bool
|
||||
}
|
||||
|
||||
var ErrLackSplitInfo = logicerr.New("no split info on parent object")
|
||||
|
@ -35,11 +37,21 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
|
|||
p.addr = addr
|
||||
}
|
||||
|
||||
// SetParent is an Exists option to set objects parent.
|
||||
func (p *ExistsPrm) SetParent(addr oid.Address) {
|
||||
p.paddr = addr
|
||||
}
|
||||
|
||||
// Exists returns the fact that the object is in the metabase.
|
||||
func (p ExistsRes) Exists() bool {
|
||||
return p.exists
|
||||
}
|
||||
|
||||
// Locked returns the fact that the object is locked.
|
||||
func (p ExistsRes) Locked() bool {
|
||||
return p.locked
|
||||
}
|
||||
|
||||
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
|
||||
// returns true if addr is in primary index or false if it is not.
|
||||
//
|
||||
|
@ -70,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.exists, err = db.exists(tx, prm.addr, currEpoch)
|
||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
|
||||
|
||||
return err
|
||||
})
|
||||
|
@ -78,15 +90,19 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists bool, err error) {
|
||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||
var locked bool
|
||||
if !parent.Equals(oid.Address{}) {
|
||||
locked = objectLocked(tx, parent.Container(), parent.Object())
|
||||
}
|
||||
// check graveyard and object expiration first
|
||||
switch objectStatus(tx, addr, currEpoch) {
|
||||
case 1:
|
||||
return false, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
case 2:
|
||||
return false, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
||||
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
||||
case 3:
|
||||
return false, ErrObjectIsExpired
|
||||
return false, locked, ErrObjectIsExpired
|
||||
}
|
||||
|
||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||
|
@ -96,25 +112,25 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
|
|||
|
||||
// if graveyard is empty, then check if object exists in primary bucket
|
||||
if inBucket(tx, primaryBucketName(cnr, key), objKey) {
|
||||
return true, nil
|
||||
return true, locked, nil
|
||||
}
|
||||
|
||||
// if primary bucket is empty, then check if object exists in parent bucket
|
||||
if inBucket(tx, parentBucketName(cnr, key), objKey) {
|
||||
splitInfo, err := getSplitInfo(tx, cnr, objKey)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return false, locked, err
|
||||
}
|
||||
|
||||
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||
}
|
||||
// if parent bucket is empty, then check if object exists in ec bucket
|
||||
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
||||
return false, getECInfoError(tx, cnr, data)
|
||||
return false, locked, getECInfoError(tx, cnr, data)
|
||||
}
|
||||
|
||||
// if parent bucket is empty, then check if object exists in typed buckets
|
||||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil
|
||||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil
|
||||
}
|
||||
|
||||
// objectStatus returns:
|
||||
|
|
|
@ -229,11 +229,17 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
|
||||
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
|
||||
targetKey := addressKey(prm.target[i], buf)
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
if err == nil {
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if errors.As(err, &ecErr) {
|
||||
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if prm.tomb != nil {
|
||||
|
@ -272,6 +278,43 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
return db.applyInhumeResToCounters(tx, res)
|
||||
}
|
||||
|
||||
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
||||
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
|
||||
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte,
|
||||
) error {
|
||||
for _, chunk := range ecInfo.Chunks {
|
||||
chunkBuf := make([]byte, addressKeySize)
|
||||
var chunkAddr oid.Address
|
||||
chunkAddr.SetContainer(cnr)
|
||||
var chunkID oid.ID
|
||||
err := chunkID.ReadFromV2(chunk.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
chunkAddr.SetObject(chunkID)
|
||||
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
chunkKey := addressKey(chunkAddr, chunkBuf)
|
||||
if tomb != nil {
|
||||
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = targetBucket.Put(chunkKey, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
|
||||
return err
|
||||
|
|
|
@ -175,6 +175,31 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// return `LOCK` id's if specified object is locked in the specified container.
|
||||
func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
|
||||
var lockers []oid.ID
|
||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||
if bucketLocked != nil {
|
||||
key := make([]byte, cidSize)
|
||||
idCnr.Encode(key)
|
||||
bucketLockedContainer := bucketLocked.Bucket(key)
|
||||
if bucketLockedContainer != nil {
|
||||
binObjIDs, err := decodeList(bucketLockedContainer.Get(objectKey(idObj, key)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode list of object lockers: %w", err)
|
||||
}
|
||||
for _, binObjID := range binObjIDs {
|
||||
var id oid.ID
|
||||
if err = id.Decode(binObjID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockers = append(lockers, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
return lockers, nil
|
||||
}
|
||||
|
||||
// releases all records about the objects locked by the locker.
|
||||
// Returns slice of unlocked object ID's or an error.
|
||||
//
|
||||
|
@ -325,3 +350,36 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e
|
|||
success = err == nil
|
||||
return res, err
|
||||
}
|
||||
|
||||
// GetLocked return `LOCK` id's if provided object is locked by any `LOCK`. Not found
|
||||
// object is considered as non-locked.
|
||||
//
|
||||
// Returns only non-logical errors related to underlying database.
|
||||
func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, err error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("GetLocked", time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.GetLocked",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", addr.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res, err = getLocked(tx, addr.Container(), addr.Object())
|
||||
return nil
|
||||
}))
|
||||
success = err == nil
|
||||
return res, err
|
||||
}
|
||||
|
|
|
@ -115,7 +115,7 @@ func (db *DB) put(tx *bbolt.Tx,
|
|||
|
||||
isParent := si != nil
|
||||
|
||||
exists, err := db.exists(tx, objectCore.AddressOf(obj), currEpoch)
|
||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
|
||||
|
||||
var splitInfoError *objectSDK.SplitInfoError
|
||||
if errors.As(err, &splitInfoError) {
|
||||
|
|
|
@ -428,7 +428,7 @@ func (db *DB) selectObjectID(
|
|||
addr.SetObject(id)
|
||||
|
||||
var splitInfoError *objectSDK.SplitInfoError
|
||||
ok, err := db.exists(tx, addr, currEpoch)
|
||||
ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch)
|
||||
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
||||
raw := make([]byte, objectKeySize)
|
||||
id.Encode(raw)
|
||||
|
|
|
@ -13,17 +13,16 @@ import (
|
|||
|
||||
// ExistsPrm groups the parameters of Exists operation.
|
||||
type ExistsPrm struct {
|
||||
addr oid.Address
|
||||
// Exists option to set object checked for existence.
|
||||
Address oid.Address
|
||||
// Exists option to set parent object checked for existence.
|
||||
ParentAddress oid.Address
|
||||
}
|
||||
|
||||
// ExistsRes groups the resulting values of Exists operation.
|
||||
type ExistsRes struct {
|
||||
ex bool
|
||||
}
|
||||
|
||||
// SetAddress is an Exists option to set object checked for existence.
|
||||
func (p *ExistsPrm) SetAddress(addr oid.Address) {
|
||||
p.addr = addr
|
||||
lc bool
|
||||
}
|
||||
|
||||
// Exists returns the fact that the object is in the shard.
|
||||
|
@ -31,6 +30,11 @@ func (p ExistsRes) Exists() bool {
|
|||
return p.ex
|
||||
}
|
||||
|
||||
// Locked returns the fact that the object is locked.
|
||||
func (p ExistsRes) Locked() bool {
|
||||
return p.lc
|
||||
}
|
||||
|
||||
// Exists checks if object is presented in shard.
|
||||
//
|
||||
// Returns any error encountered that does not allow to
|
||||
|
@ -43,11 +47,12 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
|||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.String("address", prm.addr.EncodeToString()),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var exists bool
|
||||
var locked bool
|
||||
var err error
|
||||
|
||||
s.m.RLock()
|
||||
|
@ -57,21 +62,24 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
|||
return ExistsRes{}, ErrShardDisabled
|
||||
} else if s.info.Mode.NoMetabase() {
|
||||
var p common.ExistsPrm
|
||||
p.Address = prm.addr
|
||||
p.Address = prm.Address
|
||||
|
||||
var res common.ExistsRes
|
||||
res, err = s.blobStor.Exists(ctx, p)
|
||||
exists = res.Exists
|
||||
} else {
|
||||
var existsPrm meta.ExistsPrm
|
||||
existsPrm.SetAddress(prm.addr)
|
||||
existsPrm.SetAddress(prm.Address)
|
||||
existsPrm.SetParent(prm.ParentAddress)
|
||||
|
||||
var res meta.ExistsRes
|
||||
res, err = s.metaBase.Exists(ctx, existsPrm)
|
||||
exists = res.Exists()
|
||||
locked = res.Locked()
|
||||
}
|
||||
|
||||
return ExistsRes{
|
||||
ex: exists,
|
||||
lc: locked,
|
||||
}, err
|
||||
}
|
||||
|
|
|
@ -525,7 +525,9 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
|||
}
|
||||
|
||||
log.Debug(logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp)))
|
||||
s.expiredTombstonesCallback(ctx, tssExp)
|
||||
if len(tssExp) > 0 {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
To be clear: is this an optimization or a functional change? To be clear: is this an optimization or a functional change?
acid-ant
commented
It is an optimization - here we do nothing but getting lock on metabase, because call It is an optimization - here we do nothing but getting lock on metabase, because call `db.boltDB.Update(...)`.
|
||||
s.expiredTombstonesCallback(ctx, tssExp)
|
||||
}
|
||||
|
||||
iterPrm.SetOffset(tss[tssLen-1].Address())
|
||||
tss = tss[:0]
|
||||
|
|
|
@ -71,3 +71,20 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
|
|||
|
||||
return res.Locked(), nil
|
||||
}
|
||||
|
||||
// GetLocked return lock id's of the provided object. Not found object is
|
||||
// considered as not locked. Requires healthy metabase, returns ErrDegradedMode otherwise.
|
||||
func (s *Shard) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetLocked",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.String("address", addr.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
m := s.GetMode()
|
||||
if m.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
return s.metaBase.GetLocked(ctx, addr)
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ func TestShardReload(t *testing.T) {
|
|||
checkHasObjects := func(t *testing.T, exists bool) {
|
||||
for i := range objects {
|
||||
var prm ExistsPrm
|
||||
prm.SetAddress(objects[i].addr)
|
||||
prm.Address = objects[i].addr
|
||||
|
||||
res, err := sh.Exists(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -24,9 +24,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
errEmptyBodySignature = errors.New("malformed request: empty body signature")
|
||||
)
|
||||
var errEmptyBodySignature = errors.New("malformed request: empty body signature")
|
||||
|
||||
type cfg struct {
|
||||
log *logger.Logger
|
||||
|
|
|
@ -2,6 +2,7 @@ package deletesvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
|
@ -40,7 +41,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
|
|||
)}
|
||||
}
|
||||
|
||||
func (exec execCtx) isLocal() bool {
|
||||
func (exec *execCtx) isLocal() bool {
|
||||
return exec.prm.common.LocalOnly()
|
||||
}
|
||||
|
||||
|
@ -64,10 +65,33 @@ func (exec *execCtx) newAddress(id oid.ID) oid.Address {
|
|||
return a
|
||||
}
|
||||
|
||||
func (exec *execCtx) formSplitInfo(ctx context.Context) error {
|
||||
var err error
|
||||
exec.splitInfo, err = exec.svc.header.splitInfo(ctx, exec)
|
||||
if err != nil && !apiclient.IsErrObjectAlreadyRemoved(err) {
|
||||
func (exec *execCtx) formExtendedInfo(ctx context.Context) error {
|
||||
_, err := exec.svc.header.head(ctx, exec)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errECInfo *objectSDK.ECInfoError
|
||||
|
||||
switch {
|
||||
case err == nil:
|
||||
return nil
|
||||
case errors.As(err, &errSplitInfo):
|
||||
exec.splitInfo = errSplitInfo.SplitInfo()
|
||||
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
|
||||
|
||||
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
|
||||
|
||||
if err := exec.collectMembers(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
|
||||
return nil
|
||||
case errors.As(err, &errECInfo):
|
||||
exec.log.Debug(logs.DeleteECObjectReceived)
|
||||
return nil
|
||||
}
|
||||
|
||||
if !apiclient.IsErrObjectAlreadyRemoved(err) {
|
||||
// IsErrObjectAlreadyRemoved check is required because splitInfo
|
||||
// implicitly performs Head request that may return ObjectAlreadyRemoved
|
||||
// status that is not specified for Delete.
|
||||
|
|
|
@ -35,19 +35,9 @@ func (exec *execCtx) formTombstone(ctx context.Context) error {
|
|||
|
||||
exec.log.Debug(logs.DeleteFormingSplitInfo)
|
||||
|
||||
if err := exec.formSplitInfo(ctx); err != nil {
|
||||
return fmt.Errorf("form split info: %w", err)
|
||||
if err := exec.formExtendedInfo(ctx); err != nil {
|
||||
return fmt.Errorf("form extended info: %w", err)
|
||||
}
|
||||
|
||||
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
|
||||
|
||||
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
|
||||
|
||||
if err := exec.collectMembers(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
|
||||
|
||||
return exec.initTombstoneObject()
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ type cfg struct {
|
|||
|
||||
header interface {
|
||||
// must return (nil, nil) for PHY objects
|
||||
splitInfo(context.Context, *execCtx) (*objectSDK.SplitInfo, error)
|
||||
head(context.Context, *execCtx) (*objectSDK.Object, error)
|
||||
|
||||
children(context.Context, *execCtx) ([]oid.ID, error)
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ package deletesvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
|
@ -44,19 +43,8 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi
|
|||
return wr.Object(), nil
|
||||
}
|
||||
|
||||
func (w *headSvcWrapper) splitInfo(ctx context.Context, exec *execCtx) (*objectSDK.SplitInfo, error) {
|
||||
_, err := w.headAddress(ctx, exec, exec.address())
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
|
||||
switch {
|
||||
case err == nil:
|
||||
return nil, nil
|
||||
case errors.As(err, &errSplitInfo):
|
||||
return errSplitInfo.SplitInfo(), nil
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
func (w *headSvcWrapper) head(ctx context.Context, exec *execCtx) (*objectSDK.Object, error) {
|
||||
return w.headAddress(ctx, exec, exec.address())
|
||||
}
|
||||
|
||||
func (w *headSvcWrapper) children(ctx context.Context, exec *execCtx) ([]oid.ID, error) {
|
||||
|
|
|
@ -41,12 +41,10 @@ func (r *request) assemble(ctx context.Context) {
|
|||
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
||||
|
||||
r.log.Debug(logs.GetAssemblingSplittedObject,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
|
@ -55,7 +53,6 @@ func (r *request) assemble(ctx context.Context) {
|
|||
if err != nil {
|
||||
r.log.Warn(logs.GetFailedToAssembleSplittedObject,
|
||||
zap.Error(err),
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
|
|
|
@ -39,12 +39,10 @@ func (r *request) assembleEC(ctx context.Context) {
|
|||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
||||
|
||||
r.log.Debug(logs.GetAssemblingECObject,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
|
@ -53,7 +51,6 @@ func (r *request) assembleEC(ctx context.Context) {
|
|||
if err != nil {
|
||||
r.log.Warn(logs.GetFailedToAssembleECObject,
|
||||
zap.Error(err),
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
|
|
|
@ -28,7 +28,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
|
|||
)}
|
||||
}
|
||||
|
||||
func (exec execCtx) isLocal() bool {
|
||||
func (exec *execCtx) isLocal() bool {
|
||||
return exec.prm.common.LocalOnly()
|
||||
}
|
||||
|
||||
|
|
Was there any problem with the previous implementation (pipe instead of temp file)?
We are unable to use
tee /dev/tty
for security reason. If replace it for... check 2>&1 | tee | wc -l
there are no output for error.