node: Implement Lock\Delete
requests for EC object #1147
8
Makefile
|
@ -44,9 +44,12 @@ PROTOGEN_FROSTFS_DIR ?= $(PROTOBUF_DIR)/protogen-$(PROTOGEN_FROSTFS_VERSION)
|
||||||
STATICCHECK_DIR ?= $(abspath $(BIN))/staticcheck
|
STATICCHECK_DIR ?= $(abspath $(BIN))/staticcheck
|
||||||
STATICCHECK_VERSION_DIR ?= $(STATICCHECK_DIR)/$(STATICCHECK_VERSION)
|
STATICCHECK_VERSION_DIR ?= $(STATICCHECK_DIR)/$(STATICCHECK_VERSION)
|
||||||
|
|
||||||
|
SOURCES = $(shell find . -type f -name "*.go" -print)
|
||||||
|
|
||||||
GOPLS_VERSION ?= v0.15.1
|
GOPLS_VERSION ?= v0.15.1
|
||||||
GOPLS_DIR ?= $(abspath $(BIN))/gopls
|
GOPLS_DIR ?= $(abspath $(BIN))/gopls
|
||||||
GOPLS_VERSION_DIR ?= $(GOPLS_DIR)/$(GOPLS_VERSION)
|
GOPLS_VERSION_DIR ?= $(GOPLS_DIR)/$(GOPLS_VERSION)
|
||||||
|
GOPLS_TEMP_FILE := $(shell mktemp)
|
||||||
|
|
||||||
FROSTFS_CONTRACTS_PATH=$(abspath ./../frostfs-contract)
|
FROSTFS_CONTRACTS_PATH=$(abspath ./../frostfs-contract)
|
||||||
LOCODE_DB_PATH=$(abspath ./.cache/locode_db)
|
LOCODE_DB_PATH=$(abspath ./.cache/locode_db)
|
||||||
|
@ -220,9 +223,12 @@ gopls-run:
|
||||||
@if [ ! -d "$(GOPLS_VERSION_DIR)" ]; then \
|
@if [ ! -d "$(GOPLS_VERSION_DIR)" ]; then \
|
||||||
make gopls-install; \
|
make gopls-install; \
|
||||||
fi
|
fi
|
||||||
@if [[ $$(find . -type f -name "*.go" -print | xargs $(GOPLS_VERSION_DIR)/gopls check | tee /dev/tty | wc -l) -ne 0 ]]; then \
|
$(GOPLS_VERSION_DIR)/gopls check $(SOURCES) 2>&1 >$(GOPLS_TEMP_FILE)
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
|
@if [[ $$(wc -l < $(GOPLS_TEMP_FILE)) -ne 0 ]]; then \
|
||||||
|
cat $(GOPLS_TEMP_FILE); \
|
||||||
exit 1; \
|
exit 1; \
|
||||||
fi
|
fi
|
||||||
|
rm $(GOPLS_TEMP_FILE)
|
||||||
|
|
||||||
# Run linters in Docker
|
# Run linters in Docker
|
||||||
docker/lint:
|
docker/lint:
|
||||||
|
|
|
@ -38,9 +38,7 @@ const (
|
||||||
groupTarget = "group"
|
groupTarget = "group"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var errUnknownTargetType = errors.New("unknown target type")
|
||||||
errUnknownTargetType = errors.New("unknown target type")
|
|
||||||
)
|
|
||||||
|
|
||||||
var addCmd = &cobra.Command{
|
var addCmd = &cobra.Command{
|
||||||
Use: "add",
|
Use: "add",
|
||||||
|
|
|
@ -152,7 +152,11 @@ func printECInfoErr(cmd *cobra.Command, err error) bool {
|
||||||
ok := errors.As(err, &errECInfo)
|
ok := errors.As(err, &errECInfo)
|
||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
|
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
|
||||||
|
toProto, _ := cmd.Flags().GetBool("proto")
|
||||||
|
if !(toJSON || toProto) {
|
||||||
|
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
|
||||||
|
}
|
||||||
printECInfo(cmd, errECInfo.ECInfo())
|
printECInfo(cmd, errECInfo.ECInfo())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -357,6 +357,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
||||||
_, err := internal.HeadObject(cmd.Context(), prmHead)
|
_, err := internal.HeadObject(cmd.Context(), prmHead)
|
||||||
|
|
||||||
var errSplit *objectSDK.SplitInfoError
|
var errSplit *objectSDK.SplitInfoError
|
||||||
|
var errEC *objectSDK.ECInfoError
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why have you replaced switch with Why have you replaced switch with `else if` chain?
acid-ant
commented
Because previous implementation was only for Because previous implementation was only for `Split Info`. Thought it should be more readable. Reverted switch back.
fyrchik
commented
IMO it is exactly the opposite -- IMO it is exactly the opposite -- `else if` is ok once, switch is less verbose for multiple branches.
|
|||||||
default:
|
default:
|
||||||
|
@ -366,19 +367,22 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
||||||
return nil
|
return nil
|
||||||
case errors.As(err, &errSplit):
|
case errors.As(err, &errSplit):
|
||||||
common.PrintVerbose(cmd, "Split information received - object is virtual.")
|
common.PrintVerbose(cmd, "Split information received - object is virtual.")
|
||||||
|
splitInfo := errSplit.SplitInfo()
|
||||||
|
|
||||||
|
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
|
||||||
|
return members
|
||||||
|
}
|
||||||
|
|
||||||
|
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnr); ok {
|
||||||
|
return members
|
||||||
|
}
|
||||||
|
|
||||||
|
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
|
||||||
|
case errors.As(err, &errEC):
|
||||||
|
common.PrintVerbose(cmd, "Object is erasure-coded.")
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
splitInfo := errSplit.SplitInfo()
|
|
||||||
|
|
||||||
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
|
|
||||||
return members
|
|
||||||
}
|
|
||||||
|
|
||||||
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnr); ok {
|
|
||||||
return members
|
|
||||||
}
|
|
||||||
|
|
||||||
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
|
|
||||||
}
|
}
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
So we add each chunk to the tombstone/lock? It is a problem, because chunks may be missing (with split it cannot be the case, it means DL, with EC it is ok). So we add each chunk to the tombstone/lock? It is a problem, because chunks may be missing (with split it cannot be the case, it means DL, with EC it is ok).
acid-ant
commented
Oh, thanks, that was from previous implementation, removed. Oh, thanks, that was from previous implementation, removed.
fyrchik
commented
Does the new implementation still pass sanity tests? Does the new implementation still pass sanity tests?
acid-ant
commented
Execute each time when changed sensitive part of the code. Execute each time when changed sensitive part of the code.
|
|||||||
|
|
||||||
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) {
|
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) {
|
||||||
|
|
|
@ -95,6 +95,7 @@ const (
|
||||||
DeleteFormingSplitInfo = "forming split info..."
|
DeleteFormingSplitInfo = "forming split info..."
|
||||||
DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..."
|
DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..."
|
||||||
DeleteMembersSuccessfullyCollected = "members successfully collected"
|
DeleteMembersSuccessfullyCollected = "members successfully collected"
|
||||||
|
DeleteECObjectReceived = "erasure-coded object received, form tombstone"
|
||||||
GetRemoteCallFailed = "remote call failed"
|
GetRemoteCallFailed = "remote call failed"
|
||||||
GetCanNotAssembleTheObject = "can not assemble the object"
|
GetCanNotAssembleTheObject = "can not assemble the object"
|
||||||
GetTryingToAssembleTheObject = "trying to assemble the object..."
|
GetTryingToAssembleTheObject = "trying to assemble the object..."
|
||||||
|
@ -213,6 +214,7 @@ const (
|
||||||
EngineFinishedRemovalOfLocallyredundantCopies = "finished removal of locally-redundant copies"
|
EngineFinishedRemovalOfLocallyredundantCopies = "finished removal of locally-redundant copies"
|
||||||
EngineRemovingAnObjectWithoutFullLockingCheck = "removing an object without full locking check"
|
EngineRemovingAnObjectWithoutFullLockingCheck = "removing an object without full locking check"
|
||||||
EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks"
|
EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks"
|
||||||
|
EngineInterruptGettingLockers = "can't get object's lockers"
|
||||||
EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks"
|
EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks"
|
||||||
EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only"
|
EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only"
|
||||||
EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode"
|
EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode"
|
||||||
|
|
|
@ -76,13 +76,14 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
||||||
is bool
|
is bool
|
||||||
}
|
}
|
||||||
var splitInfo *objectSDK.SplitInfo
|
var splitInfo *objectSDK.SplitInfo
|
||||||
|
var ecInfo *objectSDK.ECInfo
|
||||||
|
|
||||||
// Removal of a big object is done in multiple stages:
|
// Removal of a big object is done in multiple stages:
|
||||||
// 1. Remove the parent object. If it is locked or already removed, return immediately.
|
// 1. Remove the parent object. If it is locked or already removed, return immediately.
|
||||||
// 2. Otherwise, search for all objects with a particular SplitID and delete them too.
|
// 2. Otherwise, search for all objects with a particular SplitID and delete them too.
|
||||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||||
var existsPrm shard.ExistsPrm
|
var existsPrm shard.ExistsPrm
|
||||||
existsPrm.SetAddress(prm.addr)
|
existsPrm.Address = prm.addr
|
||||||
|
|
||||||
resExists, err := sh.Exists(ctx, existsPrm)
|
resExists, err := sh.Exists(ctx, existsPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -91,13 +92,18 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
||||||
}
|
}
|
||||||
|
|
||||||
var splitErr *objectSDK.SplitInfoError
|
var splitErr *objectSDK.SplitInfoError
|
||||||
if !errors.As(err, &splitErr) {
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
if errors.As(err, &splitErr) {
|
||||||
|
splitInfo = splitErr.SplitInfo()
|
||||||
|
} else if errors.As(err, &ecErr) {
|
||||||
|
e.deleteChunks(ctx, sh, ecInfo, prm)
|
||||||
|
return false
|
||||||
|
} else {
|
||||||
if !client.IsErrObjectNotFound(err) {
|
if !client.IsErrObjectNotFound(err) {
|
||||||
e.reportShardError(sh, "could not check object existence", err)
|
e.reportShardError(sh, "could not check object existence", err)
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
splitInfo = splitErr.SplitInfo()
|
|
||||||
} else if !resExists.Exists() {
|
} else if !resExists.Exists() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -171,3 +177,31 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *StorageEngine) deleteChunks(
|
||||||
|
ctx context.Context, sh hashedShard, ecInfo *objectSDK.ECInfo, prm DeletePrm,
|
||||||
|
) {
|
||||||
|
var inhumePrm shard.InhumePrm
|
||||||
|
if prm.forceRemoval {
|
||||||
|
inhumePrm.ForceRemoval()
|
||||||
|
}
|
||||||
|
for _, chunk := range ecInfo.Chunks {
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(prm.addr.Container())
|
||||||
|
var objID oid.ID
|
||||||
|
err := objID.ReadFromV2(chunk.ID)
|
||||||
|
if err != nil {
|
||||||
|
e.reportShardError(sh, "could not delete EC chunk", err)
|
||||||
|
}
|
||||||
|
addr.SetObject(objID)
|
||||||
|
inhumePrm.MarkAsGarbage(addr)
|
||||||
|
_, err = sh.Inhume(ctx, inhumePrm)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Debug(logs.EngineCouldNotInhumeObjectInShard,
|
||||||
|
zap.Stringer("addr", addr),
|
||||||
|
zap.String("err", err.Error()),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
continue
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
There is an error which we have ignored. What happens with this yet-to-be-removed chunk? There is an error which we have ignored. What happens with this yet-to-be-removed chunk?
acid-ant
commented
It will be removed by It will be removed by `remover` at next iteration. The behavior is the same as for complex object, see [deleteChildren()](https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/3627b44e92395d2be7eeda9790513021b9f345ca/pkg/local_object_storage/engine/delete.go#L136).
|
|||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"git.frostfs.info/TrueCloudLab/hrw"
|
"git.frostfs.info/TrueCloudLab/hrw"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
|
@ -62,7 +63,10 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
ok, err := e.exists(context.Background(), addr)
|
var shPrm shard.ExistsPrm
|
||||||
fyrchik
commented
The interface is confusing, we have 2 identical types with different meaning. The interface is confusing, we have 2 identical types with different meaning.
What about accepting `shard.ExistsPrm`?
acid-ant
commented
In this case we need to make fields of In this case we need to make fields of `ExistsPrm` public, are you ok?
acid-ant
commented
Implemented in a separate commit. Implemented in a separate commit.
|
|||||||
|
shPrm.Address = addr
|
||||||
|
shPrm.ParentAddress = oid.Address{}
|
||||||
|
ok, _, err := e.exists(context.Background(), shPrm)
|
||||||
if err != nil || ok {
|
if err != nil || ok {
|
||||||
b.Fatalf("%t %v", ok, err)
|
b.Fatalf("%t %v", ok, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,16 +8,16 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) {
|
// exists return in the first value true if object exists.
|
||||||
var shPrm shard.ExistsPrm
|
// Second return value marks is parent object locked.
|
||||||
shPrm.SetAddress(addr)
|
func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool, bool, error) {
|
||||||
alreadyRemoved := false
|
alreadyRemoved := false
|
||||||
exists := false
|
exists := false
|
||||||
|
locked := false
|
||||||
|
|
||||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(shPrm.Address, func(_ int, sh hashedShard) (stop bool) {
|
||||||
res, err := sh.Exists(ctx, shPrm)
|
res, err := sh.Exists(ctx, shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if client.IsErrObjectAlreadyRemoved(err) {
|
if client.IsErrObjectAlreadyRemoved(err) {
|
||||||
|
@ -44,13 +44,16 @@ func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, err
|
||||||
if !exists {
|
if !exists {
|
||||||
exists = res.Exists()
|
exists = res.Exists()
|
||||||
}
|
}
|
||||||
|
if !locked {
|
||||||
|
locked = res.Locked()
|
||||||
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
|
||||||
if alreadyRemoved {
|
if alreadyRemoved {
|
||||||
return false, new(apistatus.ObjectAlreadyRemoved)
|
return false, false, new(apistatus.ObjectAlreadyRemoved)
|
||||||
}
|
}
|
||||||
|
|
||||||
return exists, nil
|
return exists, locked, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if checkExists {
|
if checkExists {
|
||||||
existPrm.SetAddress(addr)
|
existPrm.Address = addr
|
||||||
exRes, err := sh.Exists(ctx, existPrm)
|
exRes, err := sh.Exists(ctx, existPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) {
|
if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) {
|
||||||
|
@ -152,7 +152,8 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
||||||
}
|
}
|
||||||
|
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
if !errors.As(err, &siErr) {
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
|
||||||
e.reportShardError(sh, "could not check for presents in shard", err)
|
e.reportShardError(sh, "could not check for presents in shard", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -220,6 +221,33 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
|
||||||
return locked, outErr
|
return locked, outErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLocked return lock id's if object is locked according to StorageEngine's state.
|
||||||
|
func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.GetLocked",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("address", addr.EncodeToString()),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
var locked []oid.ID
|
||||||
|
var outErr error
|
||||||
|
|
||||||
|
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||||
|
ld, err := h.Shard.GetLocked(ctx, addr)
|
||||||
|
if err != nil {
|
||||||
|
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr),
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It is a log message, should be a const. Also, why didn't the linter fail? cc @achuprov It is a log message, should be a const. Also, why didn't the linter fail? cc @achuprov
acid-ant
commented
Thanks, updated. Thanks, updated.
|
|||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
outErr = err
|
||||||
|
}
|
||||||
|
locked = append(locked, ld...)
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
if len(locked) > 0 {
|
||||||
|
return locked, nil
|
||||||
|
}
|
||||||
|
return locked, outErr
|
||||||
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
|
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
|
||||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||||
sh.HandleExpiredTombstones(ctx, addrs)
|
sh.HandleExpiredTombstones(ctx, addrs)
|
||||||
|
|
|
@ -78,12 +78,31 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
|
|
||||||
if checkExists {
|
if checkExists {
|
||||||
var existsPrm shard.ExistsPrm
|
var existsPrm shard.ExistsPrm
|
||||||
existsPrm.SetAddress(addrLocked)
|
existsPrm.Address = addrLocked
|
||||||
|
|
||||||
exRes, err := sh.Exists(ctx, existsPrm)
|
exRes, err := sh.Exists(ctx, existsPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
if !errors.As(err, &siErr) {
|
var eiErr *objectSDK.ECInfoError
|
||||||
|
if errors.As(err, &eiErr) {
|
||||||
|
eclocked := []oid.ID{locked}
|
||||||
|
for _, chunk := range eiErr.ECInfo().Chunks {
|
||||||
|
var objID oid.ID
|
||||||
|
err = objID.ReadFromV2(chunk.ID)
|
||||||
|
if err != nil {
|
||||||
|
e.reportShardError(sh, "could not lock object in shard", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
eclocked = append(eclocked, objID)
|
||||||
|
}
|
||||||
|
err = sh.Lock(ctx, idCnr, locker, eclocked)
|
||||||
|
if err != nil {
|
||||||
|
e.reportShardError(sh, "could not lock object in shard", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
root = true
|
||||||
|
return false
|
||||||
|
} else if !errors.As(err, &siErr) {
|
||||||
if shard.IsErrObjectExpired(err) {
|
if shard.IsErrObjectExpired(err) {
|
||||||
// object is already expired =>
|
// object is already expired =>
|
||||||
// do not lock it
|
// do not lock it
|
||||||
|
|
|
@ -80,11 +80,32 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
||||||
|
|
||||||
// In #1146 this check was parallelized, however, it became
|
// In #1146 this check was parallelized, however, it became
|
||||||
// much slower on fast machines for 4 shards.
|
// much slower on fast machines for 4 shards.
|
||||||
_, err := e.exists(ctx, addr)
|
var parent oid.Address
|
||||||
|
if prm.obj.ECHeader() != nil {
|
||||||
|
parent.SetObject(prm.obj.ECHeader().Parent())
|
||||||
|
parent.SetContainer(addr.Container())
|
||||||
|
}
|
||||||
|
var shPrm shard.ExistsPrm
|
||||||
|
shPrm.Address = addr
|
||||||
|
shPrm.ParentAddress = parent
|
||||||
|
existed, locked, err := e.exists(ctx, shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !existed && locked {
|
||||||
|
lockers, err := e.GetLocked(ctx, parent)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
fyrchik
commented
Do we lock an object before we have put it? It seems like a problem, because this lock record can persist indefinitely. Do we lock an object before we have put it? It seems like a problem, because this lock record can persist indefinitely.
acid-ant
commented
Didn't catch the problem. Here we are persisting lock for a chunk before put, because we need to avoid Didn't catch the problem. Here we are persisting lock for a chunk before put, because we need to avoid `gc` removing it. This is reconstruction scenario - when we need to put chunk on the node. If there is no lock for a chunk, `gc` will inhume it.
fyrchik
commented
The problem is atomicity -- The problem is atomicity -- `lock -> CRASH -> put` and we now have some garbage about locks which will (?) be removed eventually.
We could do it atomically in `put` instead, this would also ensure we put info on the same shard.
acid-ant
commented
As a result of discussion, we need to move As a result of discussion, we need to move `gc` on a storage engine level. Created https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/1151 for tracking.
|
|||||||
|
}
|
||||||
|
for _, locker := range lockers {
|
||||||
|
err = e.lock(ctx, addr.Container(), locker, []oid.ID{addr.Object()})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var shRes putToShardRes
|
var shRes putToShardRes
|
||||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
|
@ -120,7 +141,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
|
||||||
defer close(exitCh)
|
defer close(exitCh)
|
||||||
|
|
||||||
var existPrm shard.ExistsPrm
|
var existPrm shard.ExistsPrm
|
||||||
existPrm.SetAddress(addr)
|
existPrm.Address = addr
|
||||||
|
|
||||||
exists, err := sh.Exists(ctx, existPrm)
|
exists, err := sh.Exists(ctx, existPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -115,7 +115,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address
|
||||||
found := false
|
found := false
|
||||||
for i := range shards {
|
for i := range shards {
|
||||||
var existsPrm shard.ExistsPrm
|
var existsPrm shard.ExistsPrm
|
||||||
existsPrm.SetAddress(addr)
|
existsPrm.Address = addr
|
||||||
|
|
||||||
res, err := shards[i].Exists(ctx, existsPrm)
|
res, err := shards[i].Exists(ctx, existsPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -267,8 +267,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
return deleteSingleResult{}, nil
|
return deleteSingleResult{}, nil
|
||||||
}
|
}
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
if errors.As(err, &siErr) {
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
if errors.As(err, &siErr) || errors.As(err, &ecErr) {
|
||||||
// if object is virtual (parent) then do nothing, it will be deleted with last child
|
// if object is virtual (parent) then do nothing, it will be deleted with last child
|
||||||
|
// if object is erasure-coded it will be deleted with the last chunk presented on the shard
|
||||||
return deleteSingleResult{}, nil
|
return deleteSingleResult{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -471,6 +473,46 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
|
||||||
name: rootBucketName(cnr, bucketName),
|
name: rootBucketName(cnr, bucketName),
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
})
|
||||||
|
if obj.ECHeader() != nil {
|
||||||
|
err := delECInfo(tx, cnr, objKey, obj.ECHeader())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func delECInfo(tx *bbolt.Tx, cnr cid.ID, objKey []byte, ecHead *objectSDK.ECHeader) error {
|
||||||
|
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
|
||||||
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
|
||||||
|
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
|
||||||
|
if len(val) > 0 {
|
||||||
|
if bytes.Equal(val, objKey) {
|
||||||
|
delUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: ecInfoBucketName(cnr, bucketName),
|
||||||
|
key: parentID,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
val = bytes.Clone(val)
|
||||||
|
offset := 0
|
||||||
|
for offset < len(val) {
|
||||||
|
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`val` is received from `getFromBucket`. Is it taken from bbolt or freshly allocated? Bbolt prohibits changing values in some cases.
acid-ant
commented
According to doc, According to doc, `val` should be valid for the life of the transaction. Let's clone it.
fyrchik
commented
This line is more important This line is more important `// The returned memory is owned by bbolt and must never be modified; writing to this memory might corrupt the database.`
acid-ant
commented
This line is from the newest version. Looks like we need to update This line is from the newest version. Looks like we need to update `bbolt`.
|
|||||||
|
val = append(val[:offset], val[offset+objectKeySize:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
offset += objectKeySize
|
||||||
|
}
|
||||||
|
err := putUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
|
||||||
|
key: parentID,
|
||||||
|
val: val,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -20,12 +20,14 @@ import (
|
||||||
|
|
||||||
// ExistsPrm groups the parameters of Exists operation.
|
// ExistsPrm groups the parameters of Exists operation.
|
||||||
type ExistsPrm struct {
|
type ExistsPrm struct {
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
|
paddr oid.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExistsRes groups the resulting values of Exists operation.
|
// ExistsRes groups the resulting values of Exists operation.
|
||||||
type ExistsRes struct {
|
type ExistsRes struct {
|
||||||
exists bool
|
exists bool
|
||||||
|
locked bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrLackSplitInfo = logicerr.New("no split info on parent object")
|
var ErrLackSplitInfo = logicerr.New("no split info on parent object")
|
||||||
|
@ -35,11 +37,21 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
|
||||||
p.addr = addr
|
p.addr = addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetParent is an Exists option to set objects parent.
|
||||||
|
func (p *ExistsPrm) SetParent(addr oid.Address) {
|
||||||
|
p.paddr = addr
|
||||||
|
}
|
||||||
|
|
||||||
// Exists returns the fact that the object is in the metabase.
|
// Exists returns the fact that the object is in the metabase.
|
||||||
func (p ExistsRes) Exists() bool {
|
func (p ExistsRes) Exists() bool {
|
||||||
return p.exists
|
return p.exists
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Locked returns the fact that the object is locked.
|
||||||
|
func (p ExistsRes) Locked() bool {
|
||||||
|
return p.locked
|
||||||
|
}
|
||||||
|
|
||||||
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
|
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
|
||||||
// returns true if addr is in primary index or false if it is not.
|
// returns true if addr is in primary index or false if it is not.
|
||||||
//
|
//
|
||||||
|
@ -70,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
res.exists, err = db.exists(tx, prm.addr, currEpoch)
|
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
@ -78,15 +90,19 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists bool, err error) {
|
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||||
|
var locked bool
|
||||||
|
if !parent.Equals(oid.Address{}) {
|
||||||
|
locked = objectLocked(tx, parent.Container(), parent.Object())
|
||||||
|
}
|
||||||
// check graveyard and object expiration first
|
// check graveyard and object expiration first
|
||||||
switch objectStatus(tx, addr, currEpoch) {
|
switch objectStatus(tx, addr, currEpoch) {
|
||||||
case 1:
|
case 1:
|
||||||
return false, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
case 2:
|
case 2:
|
||||||
return false, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
||||||
case 3:
|
case 3:
|
||||||
return false, ErrObjectIsExpired
|
return false, locked, ErrObjectIsExpired
|
||||||
}
|
}
|
||||||
|
|
||||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||||
|
@ -96,25 +112,25 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
|
||||||
|
|
||||||
// if graveyard is empty, then check if object exists in primary bucket
|
// if graveyard is empty, then check if object exists in primary bucket
|
||||||
if inBucket(tx, primaryBucketName(cnr, key), objKey) {
|
if inBucket(tx, primaryBucketName(cnr, key), objKey) {
|
||||||
return true, nil
|
return true, locked, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// if primary bucket is empty, then check if object exists in parent bucket
|
// if primary bucket is empty, then check if object exists in parent bucket
|
||||||
if inBucket(tx, parentBucketName(cnr, key), objKey) {
|
if inBucket(tx, parentBucketName(cnr, key), objKey) {
|
||||||
splitInfo, err := getSplitInfo(tx, cnr, objKey)
|
splitInfo, err := getSplitInfo(tx, cnr, objKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, locked, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||||
}
|
}
|
||||||
// if parent bucket is empty, then check if object exists in ec bucket
|
// if parent bucket is empty, then check if object exists in ec bucket
|
||||||
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
||||||
return false, getECInfoError(tx, cnr, data)
|
return false, locked, getECInfoError(tx, cnr, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if parent bucket is empty, then check if object exists in typed buckets
|
// if parent bucket is empty, then check if object exists in typed buckets
|
||||||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil
|
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// objectStatus returns:
|
// objectStatus returns:
|
||||||
|
|
|
@ -229,11 +229,17 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
||||||
|
|
||||||
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
|
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
|
||||||
targetKey := addressKey(prm.target[i], buf)
|
targetKey := addressKey(prm.target[i], buf)
|
||||||
|
var ecErr *objectSDK.ECInfoError
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
} else if errors.As(err, &ecErr) {
|
||||||
|
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.tomb != nil {
|
if prm.tomb != nil {
|
||||||
|
@ -272,6 +278,43 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
||||||
return db.applyInhumeResToCounters(tx, res)
|
return db.applyInhumeResToCounters(tx, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
||||||
|
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
|
||||||
|
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte,
|
||||||
|
) error {
|
||||||
|
for _, chunk := range ecInfo.Chunks {
|
||||||
|
chunkBuf := make([]byte, addressKeySize)
|
||||||
|
var chunkAddr oid.Address
|
||||||
|
chunkAddr.SetContainer(cnr)
|
||||||
|
var chunkID oid.ID
|
||||||
|
err := chunkID.ReadFromV2(chunk.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
chunkAddr.SetObject(chunkID)
|
||||||
|
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
chunkKey := addressKey(chunkAddr, chunkBuf)
|
||||||
|
if tomb != nil {
|
||||||
|
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = targetBucket.Put(chunkKey, value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||||
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
|
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -175,6 +175,31 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// return `LOCK` id's if specified object is locked in the specified container.
|
||||||
|
func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
|
||||||
|
var lockers []oid.ID
|
||||||
|
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||||
|
if bucketLocked != nil {
|
||||||
|
key := make([]byte, cidSize)
|
||||||
|
idCnr.Encode(key)
|
||||||
|
bucketLockedContainer := bucketLocked.Bucket(key)
|
||||||
|
if bucketLockedContainer != nil {
|
||||||
|
binObjIDs, err := decodeList(bucketLockedContainer.Get(objectKey(idObj, key)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decode list of object lockers: %w", err)
|
||||||
|
}
|
||||||
|
for _, binObjID := range binObjIDs {
|
||||||
|
var id oid.ID
|
||||||
|
if err = id.Decode(binObjID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
lockers = append(lockers, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lockers, nil
|
||||||
|
}
|
||||||
|
|
||||||
// releases all records about the objects locked by the locker.
|
// releases all records about the objects locked by the locker.
|
||||||
// Returns slice of unlocked object ID's or an error.
|
// Returns slice of unlocked object ID's or an error.
|
||||||
//
|
//
|
||||||
|
@ -325,3 +350,36 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLocked return `LOCK` id's if provided object is locked by any `LOCK`. Not found
|
||||||
|
// object is considered as non-locked.
|
||||||
|
//
|
||||||
|
// Returns only non-logical errors related to underlying database.
|
||||||
|
func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, err error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("GetLocked", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.GetLocked",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("address", addr.EncodeToString()),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return res, ErrDegradedMode
|
||||||
|
}
|
||||||
|
err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
res, err = getLocked(tx, addr.Container(), addr.Object())
|
||||||
|
return nil
|
||||||
|
}))
|
||||||
|
success = err == nil
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
|
@ -115,7 +115,7 @@ func (db *DB) put(tx *bbolt.Tx,
|
||||||
|
|
||||||
isParent := si != nil
|
isParent := si != nil
|
||||||
|
|
||||||
exists, err := db.exists(tx, objectCore.AddressOf(obj), currEpoch)
|
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
|
||||||
|
|
||||||
var splitInfoError *objectSDK.SplitInfoError
|
var splitInfoError *objectSDK.SplitInfoError
|
||||||
if errors.As(err, &splitInfoError) {
|
if errors.As(err, &splitInfoError) {
|
||||||
|
|
|
@ -428,7 +428,7 @@ func (db *DB) selectObjectID(
|
||||||
addr.SetObject(id)
|
addr.SetObject(id)
|
||||||
|
|
||||||
var splitInfoError *objectSDK.SplitInfoError
|
var splitInfoError *objectSDK.SplitInfoError
|
||||||
ok, err := db.exists(tx, addr, currEpoch)
|
ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch)
|
||||||
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
||||||
raw := make([]byte, objectKeySize)
|
raw := make([]byte, objectKeySize)
|
||||||
id.Encode(raw)
|
id.Encode(raw)
|
||||||
|
|
|
@ -13,17 +13,16 @@ import (
|
||||||
|
|
||||||
// ExistsPrm groups the parameters of Exists operation.
|
// ExistsPrm groups the parameters of Exists operation.
|
||||||
type ExistsPrm struct {
|
type ExistsPrm struct {
|
||||||
addr oid.Address
|
// Exists option to set object checked for existence.
|
||||||
|
Address oid.Address
|
||||||
|
// Exists option to set parent object checked for existence.
|
||||||
|
ParentAddress oid.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExistsRes groups the resulting values of Exists operation.
|
// ExistsRes groups the resulting values of Exists operation.
|
||||||
type ExistsRes struct {
|
type ExistsRes struct {
|
||||||
ex bool
|
ex bool
|
||||||
}
|
lc bool
|
||||||
|
|
||||||
// SetAddress is an Exists option to set object checked for existence.
|
|
||||||
func (p *ExistsPrm) SetAddress(addr oid.Address) {
|
|
||||||
p.addr = addr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exists returns the fact that the object is in the shard.
|
// Exists returns the fact that the object is in the shard.
|
||||||
|
@ -31,6 +30,11 @@ func (p ExistsRes) Exists() bool {
|
||||||
return p.ex
|
return p.ex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Locked returns the fact that the object is locked.
|
||||||
|
func (p ExistsRes) Locked() bool {
|
||||||
|
return p.lc
|
||||||
|
}
|
||||||
|
|
||||||
// Exists checks if object is presented in shard.
|
// Exists checks if object is presented in shard.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that does not allow to
|
// Returns any error encountered that does not allow to
|
||||||
|
@ -43,11 +47,12 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists",
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("shard_id", s.ID().String()),
|
attribute.String("shard_id", s.ID().String()),
|
||||||
attribute.String("address", prm.addr.EncodeToString()),
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
var exists bool
|
var exists bool
|
||||||
|
var locked bool
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
|
@ -57,21 +62,24 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
||||||
return ExistsRes{}, ErrShardDisabled
|
return ExistsRes{}, ErrShardDisabled
|
||||||
} else if s.info.Mode.NoMetabase() {
|
} else if s.info.Mode.NoMetabase() {
|
||||||
var p common.ExistsPrm
|
var p common.ExistsPrm
|
||||||
p.Address = prm.addr
|
p.Address = prm.Address
|
||||||
|
|
||||||
var res common.ExistsRes
|
var res common.ExistsRes
|
||||||
res, err = s.blobStor.Exists(ctx, p)
|
res, err = s.blobStor.Exists(ctx, p)
|
||||||
exists = res.Exists
|
exists = res.Exists
|
||||||
} else {
|
} else {
|
||||||
var existsPrm meta.ExistsPrm
|
var existsPrm meta.ExistsPrm
|
||||||
existsPrm.SetAddress(prm.addr)
|
existsPrm.SetAddress(prm.Address)
|
||||||
|
existsPrm.SetParent(prm.ParentAddress)
|
||||||
|
|
||||||
var res meta.ExistsRes
|
var res meta.ExistsRes
|
||||||
res, err = s.metaBase.Exists(ctx, existsPrm)
|
res, err = s.metaBase.Exists(ctx, existsPrm)
|
||||||
exists = res.Exists()
|
exists = res.Exists()
|
||||||
|
locked = res.Locked()
|
||||||
}
|
}
|
||||||
|
|
||||||
return ExistsRes{
|
return ExistsRes{
|
||||||
ex: exists,
|
ex: exists,
|
||||||
|
lc: locked,
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -525,7 +525,9 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug(logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp)))
|
log.Debug(logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp)))
|
||||||
s.expiredTombstonesCallback(ctx, tssExp)
|
if len(tssExp) > 0 {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
To be clear: is this an optimization or a functional change? To be clear: is this an optimization or a functional change?
acid-ant
commented
It is an optimization - here we do nothing but getting lock on metabase, because call It is an optimization - here we do nothing but getting lock on metabase, because call `db.boltDB.Update(...)`.
|
|||||||
|
s.expiredTombstonesCallback(ctx, tssExp)
|
||||||
|
}
|
||||||
|
|
||||||
iterPrm.SetOffset(tss[tssLen-1].Address())
|
iterPrm.SetOffset(tss[tssLen-1].Address())
|
||||||
tss = tss[:0]
|
tss = tss[:0]
|
||||||
|
|
|
@ -71,3 +71,20 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
|
||||||
|
|
||||||
return res.Locked(), nil
|
return res.Locked(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLocked return lock id's of the provided object. Not found object is
|
||||||
|
// considered as not locked. Requires healthy metabase, returns ErrDegradedMode otherwise.
|
||||||
|
func (s *Shard) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetLocked",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
attribute.String("address", addr.EncodeToString()),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
m := s.GetMode()
|
||||||
|
if m.NoMetabase() {
|
||||||
|
return nil, ErrDegradedMode
|
||||||
|
}
|
||||||
|
return s.metaBase.GetLocked(ctx, addr)
|
||||||
|
}
|
||||||
|
|
|
@ -72,7 +72,7 @@ func TestShardReload(t *testing.T) {
|
||||||
checkHasObjects := func(t *testing.T, exists bool) {
|
checkHasObjects := func(t *testing.T, exists bool) {
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
var prm ExistsPrm
|
var prm ExistsPrm
|
||||||
prm.SetAddress(objects[i].addr)
|
prm.Address = objects[i].addr
|
||||||
|
|
||||||
res, err := sh.Exists(context.Background(), prm)
|
res, err := sh.Exists(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -24,9 +24,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var errEmptyBodySignature = errors.New("malformed request: empty body signature")
|
||||||
errEmptyBodySignature = errors.New("malformed request: empty body signature")
|
|
||||||
)
|
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
|
|
@ -2,6 +2,7 @@ package deletesvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
@ -40,7 +41,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
|
||||||
)}
|
)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec execCtx) isLocal() bool {
|
func (exec *execCtx) isLocal() bool {
|
||||||
return exec.prm.common.LocalOnly()
|
return exec.prm.common.LocalOnly()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,10 +65,33 @@ func (exec *execCtx) newAddress(id oid.ID) oid.Address {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) formSplitInfo(ctx context.Context) error {
|
func (exec *execCtx) formExtendedInfo(ctx context.Context) error {
|
||||||
var err error
|
_, err := exec.svc.header.head(ctx, exec)
|
||||||
exec.splitInfo, err = exec.svc.header.splitInfo(ctx, exec)
|
|
||||||
if err != nil && !apiclient.IsErrObjectAlreadyRemoved(err) {
|
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
|
var errECInfo *objectSDK.ECInfoError
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case err == nil:
|
||||||
|
return nil
|
||||||
|
case errors.As(err, &errSplitInfo):
|
||||||
|
exec.splitInfo = errSplitInfo.SplitInfo()
|
||||||
|
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
|
||||||
|
|
||||||
|
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
|
||||||
|
|
||||||
|
if err := exec.collectMembers(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
|
||||||
|
return nil
|
||||||
|
case errors.As(err, &errECInfo):
|
||||||
|
exec.log.Debug(logs.DeleteECObjectReceived)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !apiclient.IsErrObjectAlreadyRemoved(err) {
|
||||||
// IsErrObjectAlreadyRemoved check is required because splitInfo
|
// IsErrObjectAlreadyRemoved check is required because splitInfo
|
||||||
// implicitly performs Head request that may return ObjectAlreadyRemoved
|
// implicitly performs Head request that may return ObjectAlreadyRemoved
|
||||||
// status that is not specified for Delete.
|
// status that is not specified for Delete.
|
||||||
|
|
|
@ -35,19 +35,9 @@ func (exec *execCtx) formTombstone(ctx context.Context) error {
|
||||||
|
|
||||||
exec.log.Debug(logs.DeleteFormingSplitInfo)
|
exec.log.Debug(logs.DeleteFormingSplitInfo)
|
||||||
|
|
||||||
if err := exec.formSplitInfo(ctx); err != nil {
|
if err := exec.formExtendedInfo(ctx); err != nil {
|
||||||
return fmt.Errorf("form split info: %w", err)
|
return fmt.Errorf("form extended info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
|
|
||||||
|
|
||||||
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
|
|
||||||
|
|
||||||
if err := exec.collectMembers(ctx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
|
|
||||||
|
|
||||||
return exec.initTombstoneObject()
|
return exec.initTombstoneObject()
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ type cfg struct {
|
||||||
|
|
||||||
header interface {
|
header interface {
|
||||||
// must return (nil, nil) for PHY objects
|
// must return (nil, nil) for PHY objects
|
||||||
splitInfo(context.Context, *execCtx) (*objectSDK.SplitInfo, error)
|
head(context.Context, *execCtx) (*objectSDK.Object, error)
|
||||||
|
|
||||||
children(context.Context, *execCtx) ([]oid.ID, error)
|
children(context.Context, *execCtx) ([]oid.ID, error)
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@ package deletesvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
|
|
||||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
|
@ -44,19 +43,8 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi
|
||||||
return wr.Object(), nil
|
return wr.Object(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *headSvcWrapper) splitInfo(ctx context.Context, exec *execCtx) (*objectSDK.SplitInfo, error) {
|
func (w *headSvcWrapper) head(ctx context.Context, exec *execCtx) (*objectSDK.Object, error) {
|
||||||
_, err := w.headAddress(ctx, exec, exec.address())
|
return w.headAddress(ctx, exec, exec.address())
|
||||||
|
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
|
||||||
|
|
||||||
switch {
|
|
||||||
case err == nil:
|
|
||||||
return nil, nil
|
|
||||||
case errors.As(err, &errSplitInfo):
|
|
||||||
return errSplitInfo.SplitInfo(), nil
|
|
||||||
default:
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *headSvcWrapper) children(ctx context.Context, exec *execCtx) ([]oid.ID, error) {
|
func (w *headSvcWrapper) children(ctx context.Context, exec *execCtx) ([]oid.ID, error) {
|
||||||
|
|
|
@ -41,12 +41,10 @@ func (r *request) assemble(ctx context.Context) {
|
||||||
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
||||||
|
|
||||||
r.log.Debug(logs.GetAssemblingSplittedObject,
|
r.log.Debug(logs.GetAssemblingSplittedObject,
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
|
defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
|
@ -55,7 +53,6 @@ func (r *request) assemble(ctx context.Context) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.log.Warn(logs.GetFailedToAssembleSplittedObject,
|
r.log.Warn(logs.GetFailedToAssembleSplittedObject,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
|
|
|
@ -39,12 +39,10 @@ func (r *request) assembleEC(ctx context.Context) {
|
||||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
||||||
|
|
||||||
r.log.Debug(logs.GetAssemblingECObject,
|
r.log.Debug(logs.GetAssemblingECObject,
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
|
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
|
@ -53,7 +51,6 @@ func (r *request) assembleEC(ctx context.Context) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.log.Warn(logs.GetFailedToAssembleECObject,
|
r.log.Warn(logs.GetFailedToAssembleECObject,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
|
|
|
@ -28,7 +28,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
|
||||||
)}
|
)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec execCtx) isLocal() bool {
|
func (exec *execCtx) isLocal() bool {
|
||||||
return exec.prm.common.LocalOnly()
|
return exec.prm.common.LocalOnly()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Was there any problem with the previous implementation (pipe instead of temp file)?
We are unable to use
tee /dev/tty
for security reason. If replace it for... check 2>&1 | tee | wc -l
there are no output for error.