node: Implement Lock\Delete
requests for EC object #1147
30 changed files with 405 additions and 102 deletions
8
Makefile
8
Makefile
|
@ -44,9 +44,12 @@ PROTOGEN_FROSTFS_DIR ?= $(PROTOBUF_DIR)/protogen-$(PROTOGEN_FROSTFS_VERSION)
|
||||||
STATICCHECK_DIR ?= $(abspath $(BIN))/staticcheck
|
STATICCHECK_DIR ?= $(abspath $(BIN))/staticcheck
|
||||||
STATICCHECK_VERSION_DIR ?= $(STATICCHECK_DIR)/$(STATICCHECK_VERSION)
|
STATICCHECK_VERSION_DIR ?= $(STATICCHECK_DIR)/$(STATICCHECK_VERSION)
|
||||||
|
|
||||||
|
SOURCES = $(shell find . -type f -name "*.go" -print)
|
||||||
|
|
||||||
GOPLS_VERSION ?= v0.15.1
|
GOPLS_VERSION ?= v0.15.1
|
||||||
GOPLS_DIR ?= $(abspath $(BIN))/gopls
|
GOPLS_DIR ?= $(abspath $(BIN))/gopls
|
||||||
GOPLS_VERSION_DIR ?= $(GOPLS_DIR)/$(GOPLS_VERSION)
|
GOPLS_VERSION_DIR ?= $(GOPLS_DIR)/$(GOPLS_VERSION)
|
||||||
|
GOPLS_TEMP_FILE := $(shell mktemp)
|
||||||
|
|
||||||
FROSTFS_CONTRACTS_PATH=$(abspath ./../frostfs-contract)
|
FROSTFS_CONTRACTS_PATH=$(abspath ./../frostfs-contract)
|
||||||
LOCODE_DB_PATH=$(abspath ./.cache/locode_db)
|
LOCODE_DB_PATH=$(abspath ./.cache/locode_db)
|
||||||
|
@ -220,9 +223,12 @@ gopls-run:
|
||||||
@if [ ! -d "$(GOPLS_VERSION_DIR)" ]; then \
|
@if [ ! -d "$(GOPLS_VERSION_DIR)" ]; then \
|
||||||
make gopls-install; \
|
make gopls-install; \
|
||||||
fi
|
fi
|
||||||
@if [[ $$(find . -type f -name "*.go" -print | xargs $(GOPLS_VERSION_DIR)/gopls check | tee /dev/tty | wc -l) -ne 0 ]]; then \
|
$(GOPLS_VERSION_DIR)/gopls check $(SOURCES) 2>&1 >$(GOPLS_TEMP_FILE)
|
||||||
|
@if [[ $$(wc -l < $(GOPLS_TEMP_FILE)) -ne 0 ]]; then \
|
||||||
|
cat $(GOPLS_TEMP_FILE); \
|
||||||
exit 1; \
|
exit 1; \
|
||||||
fi
|
fi
|
||||||
|
rm $(GOPLS_TEMP_FILE)
|
||||||
|
|
||||||
# Run linters in Docker
|
# Run linters in Docker
|
||||||
docker/lint:
|
docker/lint:
|
||||||
|
|
|
@ -38,9 +38,7 @@ const (
|
||||||
groupTarget = "group"
|
groupTarget = "group"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var errUnknownTargetType = errors.New("unknown target type")
|
||||||
errUnknownTargetType = errors.New("unknown target type")
|
|
||||||
)
|
|
||||||
|
|
||||||
var addCmd = &cobra.Command{
|
var addCmd = &cobra.Command{
|
||||||
Use: "add",
|
Use: "add",
|
||||||
|
|
|
@ -152,7 +152,11 @@ func printECInfoErr(cmd *cobra.Command, err error) bool {
|
||||||
ok := errors.As(err, &errECInfo)
|
ok := errors.As(err, &errECInfo)
|
||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
|
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
|
||||||
|
toProto, _ := cmd.Flags().GetBool("proto")
|
||||||
|
if !(toJSON || toProto) {
|
||||||
|
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
|
||||||
|
}
|
||||||
printECInfo(cmd, errECInfo.ECInfo())
|
printECInfo(cmd, errECInfo.ECInfo())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -357,6 +357,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
||||||
_, err := internal.HeadObject(cmd.Context(), prmHead)
|
_, err := internal.HeadObject(cmd.Context(), prmHead)
|
||||||
|
|
||||||
var errSplit *objectSDK.SplitInfoError
|
var errSplit *objectSDK.SplitInfoError
|
||||||
|
var errEC *objectSDK.ECInfoError
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
|
@ -366,19 +367,22 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
||||||
return nil
|
return nil
|
||||||
case errors.As(err, &errSplit):
|
case errors.As(err, &errSplit):
|
||||||
common.PrintVerbose(cmd, "Split information received - object is virtual.")
|
common.PrintVerbose(cmd, "Split information received - object is virtual.")
|
||||||
|
splitInfo := errSplit.SplitInfo()
|
||||||
|
|
||||||
|
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
|
||||||
|
return members
|
||||||
|
}
|
||||||
|
|
||||||
|
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnr); ok {
|
||||||
|
return members
|
||||||
|
}
|
||||||
|
|
||||||
|
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
|
||||||
|
case errors.As(err, &errEC):
|
||||||
|
common.PrintVerbose(cmd, "Object is erasure-coded.")
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
splitInfo := errSplit.SplitInfo()
|
|
||||||
|
|
||||||
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
|
|
||||||
return members
|
|
||||||
}
|
|
||||||
|
|
||||||
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnr); ok {
|
|
||||||
return members
|
|
||||||
}
|
|
||||||
|
|
||||||
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) {
|
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) {
|
||||||
|
|
|
@ -95,6 +95,7 @@ const (
|
||||||
DeleteFormingSplitInfo = "forming split info..."
|
DeleteFormingSplitInfo = "forming split info..."
|
||||||
DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..."
|
DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..."
|
||||||
DeleteMembersSuccessfullyCollected = "members successfully collected"
|
DeleteMembersSuccessfullyCollected = "members successfully collected"
|
||||||
|
DeleteECObjectReceived = "erasure-coded object received, form tombstone"
|
||||||
GetRemoteCallFailed = "remote call failed"
|
GetRemoteCallFailed = "remote call failed"
|
||||||
GetCanNotAssembleTheObject = "can not assemble the object"
|
GetCanNotAssembleTheObject = "can not assemble the object"
|
||||||
GetTryingToAssembleTheObject = "trying to assemble the object..."
|
GetTryingToAssembleTheObject = "trying to assemble the object..."
|
||||||
|
@ -213,6 +214,7 @@ const (
|
||||||
EngineFinishedRemovalOfLocallyredundantCopies = "finished removal of locally-redundant copies"
|
EngineFinishedRemovalOfLocallyredundantCopies = "finished removal of locally-redundant copies"
|
||||||
EngineRemovingAnObjectWithoutFullLockingCheck = "removing an object without full locking check"
|
EngineRemovingAnObjectWithoutFullLockingCheck = "removing an object without full locking check"
|
||||||
EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks"
|
EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks"
|
||||||
|
EngineInterruptGettingLockers = "can't get object's lockers"
|
||||||
EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks"
|
EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks"
|
||||||
EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only"
|
EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only"
|
||||||
EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode"
|
EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode"
|
||||||
|
|
|
@ -76,13 +76,14 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
||||||
is bool
|
is bool
|
||||||
}
|
}
|
||||||
var splitInfo *objectSDK.SplitInfo
|
var splitInfo *objectSDK.SplitInfo
|
||||||
|
var ecInfo *objectSDK.ECInfo
|
||||||
|
|
||||||
// Removal of a big object is done in multiple stages:
|
// Removal of a big object is done in multiple stages:
|
||||||
// 1. Remove the parent object. If it is locked or already removed, return immediately.
|
// 1. Remove the parent object. If it is locked or already removed, return immediately.
|
||||||
// 2. Otherwise, search for all objects with a particular SplitID and delete them too.
|
// 2. Otherwise, search for all objects with a particular SplitID and delete them too.
|
||||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||||
var existsPrm shard.ExistsPrm
|
var existsPrm shard.ExistsPrm
|
||||||
existsPrm.SetAddress(prm.addr)
|
existsPrm.Address = prm.addr
|
||||||
|
|
||||||
resExists, err := sh.Exists(ctx, existsPrm)
|
resExists, err := sh.Exists(ctx, existsPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -91,13 +92,18 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
||||||
}
|
}
|
||||||
|
|
||||||
var splitErr *objectSDK.SplitInfoError
|
var splitErr *objectSDK.SplitInfoError
|
||||||
if !errors.As(err, &splitErr) {
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
if errors.As(err, &splitErr) {
|
||||||
|
splitInfo = splitErr.SplitInfo()
|
||||||
|
} else if errors.As(err, &ecErr) {
|
||||||
|
e.deleteChunks(ctx, sh, ecInfo, prm)
|
||||||
|
return false
|
||||||
|
} else {
|
||||||
if !client.IsErrObjectNotFound(err) {
|
if !client.IsErrObjectNotFound(err) {
|
||||||
e.reportShardError(sh, "could not check object existence", err)
|
e.reportShardError(sh, "could not check object existence", err)
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
splitInfo = splitErr.SplitInfo()
|
|
||||||
} else if !resExists.Exists() {
|
} else if !resExists.Exists() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -171,3 +177,31 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *StorageEngine) deleteChunks(
|
||||||
|
ctx context.Context, sh hashedShard, ecInfo *objectSDK.ECInfo, prm DeletePrm,
|
||||||
|
) {
|
||||||
|
var inhumePrm shard.InhumePrm
|
||||||
|
if prm.forceRemoval {
|
||||||
|
inhumePrm.ForceRemoval()
|
||||||
|
}
|
||||||
|
for _, chunk := range ecInfo.Chunks {
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(prm.addr.Container())
|
||||||
|
var objID oid.ID
|
||||||
|
err := objID.ReadFromV2(chunk.ID)
|
||||||
|
if err != nil {
|
||||||
|
e.reportShardError(sh, "could not delete EC chunk", err)
|
||||||
|
}
|
||||||
|
addr.SetObject(objID)
|
||||||
|
inhumePrm.MarkAsGarbage(addr)
|
||||||
|
_, err = sh.Inhume(ctx, inhumePrm)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Debug(logs.EngineCouldNotInhumeObjectInShard,
|
||||||
|
zap.Stringer("addr", addr),
|
||||||
|
zap.String("err", err.Error()),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"git.frostfs.info/TrueCloudLab/hrw"
|
"git.frostfs.info/TrueCloudLab/hrw"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
|
@ -62,7 +63,10 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
ok, err := e.exists(context.Background(), addr)
|
var shPrm shard.ExistsPrm
|
||||||
|
shPrm.Address = addr
|
||||||
|
shPrm.ParentAddress = oid.Address{}
|
||||||
|
ok, _, err := e.exists(context.Background(), shPrm)
|
||||||
if err != nil || ok {
|
if err != nil || ok {
|
||||||
b.Fatalf("%t %v", ok, err)
|
b.Fatalf("%t %v", ok, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,16 +8,16 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) {
|
// exists return in the first value true if object exists.
|
||||||
var shPrm shard.ExistsPrm
|
// Second return value marks is parent object locked.
|
||||||
shPrm.SetAddress(addr)
|
func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool, bool, error) {
|
||||||
alreadyRemoved := false
|
alreadyRemoved := false
|
||||||
exists := false
|
exists := false
|
||||||
|
locked := false
|
||||||
|
|
||||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(shPrm.Address, func(_ int, sh hashedShard) (stop bool) {
|
||||||
res, err := sh.Exists(ctx, shPrm)
|
res, err := sh.Exists(ctx, shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if client.IsErrObjectAlreadyRemoved(err) {
|
if client.IsErrObjectAlreadyRemoved(err) {
|
||||||
|
@ -44,13 +44,16 @@ func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, err
|
||||||
if !exists {
|
if !exists {
|
||||||
exists = res.Exists()
|
exists = res.Exists()
|
||||||
}
|
}
|
||||||
|
if !locked {
|
||||||
|
locked = res.Locked()
|
||||||
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
|
||||||
if alreadyRemoved {
|
if alreadyRemoved {
|
||||||
return false, new(apistatus.ObjectAlreadyRemoved)
|
return false, false, new(apistatus.ObjectAlreadyRemoved)
|
||||||
}
|
}
|
||||||
|
|
||||||
return exists, nil
|
return exists, locked, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if checkExists {
|
if checkExists {
|
||||||
existPrm.SetAddress(addr)
|
existPrm.Address = addr
|
||||||
exRes, err := sh.Exists(ctx, existPrm)
|
exRes, err := sh.Exists(ctx, existPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) {
|
if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) {
|
||||||
|
@ -152,7 +152,8 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
||||||
}
|
}
|
||||||
|
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
if !errors.As(err, &siErr) {
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) {
|
||||||
e.reportShardError(sh, "could not check for presents in shard", err)
|
e.reportShardError(sh, "could not check for presents in shard", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -220,6 +221,33 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
|
||||||
return locked, outErr
|
return locked, outErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLocked return lock id's if object is locked according to StorageEngine's state.
|
||||||
|
func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.GetLocked",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("address", addr.EncodeToString()),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
var locked []oid.ID
|
||||||
|
var outErr error
|
||||||
|
|
||||||
|
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||||
|
ld, err := h.Shard.GetLocked(ctx, addr)
|
||||||
|
if err != nil {
|
||||||
|
e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
outErr = err
|
||||||
|
}
|
||||||
|
locked = append(locked, ld...)
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
if len(locked) > 0 {
|
||||||
|
return locked, nil
|
||||||
|
}
|
||||||
|
return locked, outErr
|
||||||
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
|
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
|
||||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||||
sh.HandleExpiredTombstones(ctx, addrs)
|
sh.HandleExpiredTombstones(ctx, addrs)
|
||||||
|
|
|
@ -78,12 +78,31 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
|
||||||
|
|
||||||
if checkExists {
|
if checkExists {
|
||||||
var existsPrm shard.ExistsPrm
|
var existsPrm shard.ExistsPrm
|
||||||
existsPrm.SetAddress(addrLocked)
|
existsPrm.Address = addrLocked
|
||||||
|
|
||||||
exRes, err := sh.Exists(ctx, existsPrm)
|
exRes, err := sh.Exists(ctx, existsPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
if !errors.As(err, &siErr) {
|
var eiErr *objectSDK.ECInfoError
|
||||||
|
if errors.As(err, &eiErr) {
|
||||||
|
eclocked := []oid.ID{locked}
|
||||||
|
for _, chunk := range eiErr.ECInfo().Chunks {
|
||||||
|
var objID oid.ID
|
||||||
|
err = objID.ReadFromV2(chunk.ID)
|
||||||
|
if err != nil {
|
||||||
|
e.reportShardError(sh, "could not lock object in shard", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
eclocked = append(eclocked, objID)
|
||||||
|
}
|
||||||
|
err = sh.Lock(ctx, idCnr, locker, eclocked)
|
||||||
|
if err != nil {
|
||||||
|
e.reportShardError(sh, "could not lock object in shard", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
root = true
|
||||||
|
return false
|
||||||
|
} else if !errors.As(err, &siErr) {
|
||||||
if shard.IsErrObjectExpired(err) {
|
if shard.IsErrObjectExpired(err) {
|
||||||
// object is already expired =>
|
// object is already expired =>
|
||||||
// do not lock it
|
// do not lock it
|
||||||
|
|
|
@ -80,11 +80,32 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
||||||
|
|
||||||
// In #1146 this check was parallelized, however, it became
|
// In #1146 this check was parallelized, however, it became
|
||||||
// much slower on fast machines for 4 shards.
|
// much slower on fast machines for 4 shards.
|
||||||
_, err := e.exists(ctx, addr)
|
var parent oid.Address
|
||||||
|
if prm.obj.ECHeader() != nil {
|
||||||
|
parent.SetObject(prm.obj.ECHeader().Parent())
|
||||||
|
parent.SetContainer(addr.Container())
|
||||||
|
}
|
||||||
|
var shPrm shard.ExistsPrm
|
||||||
|
shPrm.Address = addr
|
||||||
|
shPrm.ParentAddress = parent
|
||||||
|
existed, locked, err := e.exists(ctx, shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !existed && locked {
|
||||||
|
lockers, err := e.GetLocked(ctx, parent)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, locker := range lockers {
|
||||||
|
err = e.lock(ctx, addr.Container(), locker, []oid.ID{addr.Object()})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var shRes putToShardRes
|
var shRes putToShardRes
|
||||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
|
@ -120,7 +141,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
|
||||||
defer close(exitCh)
|
defer close(exitCh)
|
||||||
|
|
||||||
var existPrm shard.ExistsPrm
|
var existPrm shard.ExistsPrm
|
||||||
existPrm.SetAddress(addr)
|
existPrm.Address = addr
|
||||||
|
|
||||||
exists, err := sh.Exists(ctx, existPrm)
|
exists, err := sh.Exists(ctx, existPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -115,7 +115,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address
|
||||||
found := false
|
found := false
|
||||||
for i := range shards {
|
for i := range shards {
|
||||||
var existsPrm shard.ExistsPrm
|
var existsPrm shard.ExistsPrm
|
||||||
existsPrm.SetAddress(addr)
|
existsPrm.Address = addr
|
||||||
|
|
||||||
res, err := shards[i].Exists(ctx, existsPrm)
|
res, err := shards[i].Exists(ctx, existsPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -267,8 +267,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
return deleteSingleResult{}, nil
|
return deleteSingleResult{}, nil
|
||||||
}
|
}
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
if errors.As(err, &siErr) {
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
if errors.As(err, &siErr) || errors.As(err, &ecErr) {
|
||||||
// if object is virtual (parent) then do nothing, it will be deleted with last child
|
// if object is virtual (parent) then do nothing, it will be deleted with last child
|
||||||
|
// if object is erasure-coded it will be deleted with the last chunk presented on the shard
|
||||||
return deleteSingleResult{}, nil
|
return deleteSingleResult{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -471,6 +473,46 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
|
||||||
name: rootBucketName(cnr, bucketName),
|
name: rootBucketName(cnr, bucketName),
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
})
|
||||||
|
if obj.ECHeader() != nil {
|
||||||
|
err := delECInfo(tx, cnr, objKey, obj.ECHeader())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func delECInfo(tx *bbolt.Tx, cnr cid.ID, objKey []byte, ecHead *objectSDK.ECHeader) error {
|
||||||
|
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
|
||||||
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
|
||||||
|
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
|
||||||
|
if len(val) > 0 {
|
||||||
|
if bytes.Equal(val, objKey) {
|
||||||
|
delUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: ecInfoBucketName(cnr, bucketName),
|
||||||
|
key: parentID,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
val = bytes.Clone(val)
|
||||||
|
offset := 0
|
||||||
|
for offset < len(val) {
|
||||||
|
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
|
||||||
|
val = append(val[:offset], val[offset+objectKeySize:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
offset += objectKeySize
|
||||||
|
}
|
||||||
|
err := putUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
|
||||||
|
key: parentID,
|
||||||
|
val: val,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -20,12 +20,14 @@ import (
|
||||||
|
|
||||||
// ExistsPrm groups the parameters of Exists operation.
|
// ExistsPrm groups the parameters of Exists operation.
|
||||||
type ExistsPrm struct {
|
type ExistsPrm struct {
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
|
paddr oid.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExistsRes groups the resulting values of Exists operation.
|
// ExistsRes groups the resulting values of Exists operation.
|
||||||
type ExistsRes struct {
|
type ExistsRes struct {
|
||||||
exists bool
|
exists bool
|
||||||
|
locked bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrLackSplitInfo = logicerr.New("no split info on parent object")
|
var ErrLackSplitInfo = logicerr.New("no split info on parent object")
|
||||||
|
@ -35,11 +37,21 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
|
||||||
p.addr = addr
|
p.addr = addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetParent is an Exists option to set objects parent.
|
||||||
|
func (p *ExistsPrm) SetParent(addr oid.Address) {
|
||||||
|
p.paddr = addr
|
||||||
|
}
|
||||||
|
|
||||||
// Exists returns the fact that the object is in the metabase.
|
// Exists returns the fact that the object is in the metabase.
|
||||||
func (p ExistsRes) Exists() bool {
|
func (p ExistsRes) Exists() bool {
|
||||||
return p.exists
|
return p.exists
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Locked returns the fact that the object is locked.
|
||||||
|
func (p ExistsRes) Locked() bool {
|
||||||
|
return p.locked
|
||||||
|
}
|
||||||
|
|
||||||
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
|
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
|
||||||
// returns true if addr is in primary index or false if it is not.
|
// returns true if addr is in primary index or false if it is not.
|
||||||
//
|
//
|
||||||
|
@ -70,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
res.exists, err = db.exists(tx, prm.addr, currEpoch)
|
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
@ -78,15 +90,19 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists bool, err error) {
|
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||||
|
var locked bool
|
||||||
|
if !parent.Equals(oid.Address{}) {
|
||||||
|
locked = objectLocked(tx, parent.Container(), parent.Object())
|
||||||
|
}
|
||||||
// check graveyard and object expiration first
|
// check graveyard and object expiration first
|
||||||
switch objectStatus(tx, addr, currEpoch) {
|
switch objectStatus(tx, addr, currEpoch) {
|
||||||
case 1:
|
case 1:
|
||||||
return false, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
case 2:
|
case 2:
|
||||||
return false, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved))
|
||||||
case 3:
|
case 3:
|
||||||
return false, ErrObjectIsExpired
|
return false, locked, ErrObjectIsExpired
|
||||||
}
|
}
|
||||||
|
|
||||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||||
|
@ -96,25 +112,25 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
|
||||||
|
|
||||||
// if graveyard is empty, then check if object exists in primary bucket
|
// if graveyard is empty, then check if object exists in primary bucket
|
||||||
if inBucket(tx, primaryBucketName(cnr, key), objKey) {
|
if inBucket(tx, primaryBucketName(cnr, key), objKey) {
|
||||||
return true, nil
|
return true, locked, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// if primary bucket is empty, then check if object exists in parent bucket
|
// if primary bucket is empty, then check if object exists in parent bucket
|
||||||
if inBucket(tx, parentBucketName(cnr, key), objKey) {
|
if inBucket(tx, parentBucketName(cnr, key), objKey) {
|
||||||
splitInfo, err := getSplitInfo(tx, cnr, objKey)
|
splitInfo, err := getSplitInfo(tx, cnr, objKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, locked, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||||
}
|
}
|
||||||
// if parent bucket is empty, then check if object exists in ec bucket
|
// if parent bucket is empty, then check if object exists in ec bucket
|
||||||
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
||||||
return false, getECInfoError(tx, cnr, data)
|
return false, locked, getECInfoError(tx, cnr, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if parent bucket is empty, then check if object exists in typed buckets
|
// if parent bucket is empty, then check if object exists in typed buckets
|
||||||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil
|
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// objectStatus returns:
|
// objectStatus returns:
|
||||||
|
|
|
@ -229,11 +229,17 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
||||||
|
|
||||||
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
|
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
|
||||||
targetKey := addressKey(prm.target[i], buf)
|
targetKey := addressKey(prm.target[i], buf)
|
||||||
|
var ecErr *objectSDK.ECInfoError
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
} else if errors.As(err, &ecErr) {
|
||||||
|
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.tomb != nil {
|
if prm.tomb != nil {
|
||||||
|
@ -272,6 +278,43 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
||||||
return db.applyInhumeResToCounters(tx, res)
|
return db.applyInhumeResToCounters(tx, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
||||||
|
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
|
||||||
|
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte,
|
||||||
|
) error {
|
||||||
|
for _, chunk := range ecInfo.Chunks {
|
||||||
|
chunkBuf := make([]byte, addressKeySize)
|
||||||
|
var chunkAddr oid.Address
|
||||||
|
chunkAddr.SetContainer(cnr)
|
||||||
|
var chunkID oid.ID
|
||||||
|
err := chunkID.ReadFromV2(chunk.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
chunkAddr.SetObject(chunkID)
|
||||||
|
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
chunkKey := addressKey(chunkAddr, chunkBuf)
|
||||||
|
if tomb != nil {
|
||||||
|
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = targetBucket.Put(chunkKey, value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||||
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
|
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -175,6 +175,31 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// return `LOCK` id's if specified object is locked in the specified container.
|
||||||
|
func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
|
||||||
|
var lockers []oid.ID
|
||||||
|
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||||
|
if bucketLocked != nil {
|
||||||
|
key := make([]byte, cidSize)
|
||||||
|
idCnr.Encode(key)
|
||||||
|
bucketLockedContainer := bucketLocked.Bucket(key)
|
||||||
|
if bucketLockedContainer != nil {
|
||||||
|
binObjIDs, err := decodeList(bucketLockedContainer.Get(objectKey(idObj, key)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decode list of object lockers: %w", err)
|
||||||
|
}
|
||||||
|
for _, binObjID := range binObjIDs {
|
||||||
|
var id oid.ID
|
||||||
|
if err = id.Decode(binObjID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
lockers = append(lockers, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lockers, nil
|
||||||
|
}
|
||||||
|
|
||||||
// releases all records about the objects locked by the locker.
|
// releases all records about the objects locked by the locker.
|
||||||
// Returns slice of unlocked object ID's or an error.
|
// Returns slice of unlocked object ID's or an error.
|
||||||
//
|
//
|
||||||
|
@ -325,3 +350,36 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLocked return `LOCK` id's if provided object is locked by any `LOCK`. Not found
|
||||||
|
// object is considered as non-locked.
|
||||||
|
//
|
||||||
|
// Returns only non-logical errors related to underlying database.
|
||||||
|
func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, err error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("GetLocked", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.GetLocked",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("address", addr.EncodeToString()),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return res, ErrDegradedMode
|
||||||
|
}
|
||||||
|
err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
res, err = getLocked(tx, addr.Container(), addr.Object())
|
||||||
|
return nil
|
||||||
|
}))
|
||||||
|
success = err == nil
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
|
@ -115,7 +115,7 @@ func (db *DB) put(tx *bbolt.Tx,
|
||||||
|
|
||||||
isParent := si != nil
|
isParent := si != nil
|
||||||
|
|
||||||
exists, err := db.exists(tx, objectCore.AddressOf(obj), currEpoch)
|
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
|
||||||
|
|
||||||
var splitInfoError *objectSDK.SplitInfoError
|
var splitInfoError *objectSDK.SplitInfoError
|
||||||
if errors.As(err, &splitInfoError) {
|
if errors.As(err, &splitInfoError) {
|
||||||
|
|
|
@ -428,7 +428,7 @@ func (db *DB) selectObjectID(
|
||||||
addr.SetObject(id)
|
addr.SetObject(id)
|
||||||
|
|
||||||
var splitInfoError *objectSDK.SplitInfoError
|
var splitInfoError *objectSDK.SplitInfoError
|
||||||
ok, err := db.exists(tx, addr, currEpoch)
|
ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch)
|
||||||
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
||||||
raw := make([]byte, objectKeySize)
|
raw := make([]byte, objectKeySize)
|
||||||
id.Encode(raw)
|
id.Encode(raw)
|
||||||
|
|
|
@ -13,17 +13,16 @@ import (
|
||||||
|
|
||||||
// ExistsPrm groups the parameters of Exists operation.
|
// ExistsPrm groups the parameters of Exists operation.
|
||||||
type ExistsPrm struct {
|
type ExistsPrm struct {
|
||||||
addr oid.Address
|
// Exists option to set object checked for existence.
|
||||||
|
Address oid.Address
|
||||||
|
// Exists option to set parent object checked for existence.
|
||||||
|
ParentAddress oid.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExistsRes groups the resulting values of Exists operation.
|
// ExistsRes groups the resulting values of Exists operation.
|
||||||
type ExistsRes struct {
|
type ExistsRes struct {
|
||||||
ex bool
|
ex bool
|
||||||
}
|
lc bool
|
||||||
|
|
||||||
// SetAddress is an Exists option to set object checked for existence.
|
|
||||||
func (p *ExistsPrm) SetAddress(addr oid.Address) {
|
|
||||||
p.addr = addr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exists returns the fact that the object is in the shard.
|
// Exists returns the fact that the object is in the shard.
|
||||||
|
@ -31,6 +30,11 @@ func (p ExistsRes) Exists() bool {
|
||||||
return p.ex
|
return p.ex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Locked returns the fact that the object is locked.
|
||||||
|
func (p ExistsRes) Locked() bool {
|
||||||
|
return p.lc
|
||||||
|
}
|
||||||
|
|
||||||
// Exists checks if object is presented in shard.
|
// Exists checks if object is presented in shard.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that does not allow to
|
// Returns any error encountered that does not allow to
|
||||||
|
@ -43,11 +47,12 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists",
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("shard_id", s.ID().String()),
|
attribute.String("shard_id", s.ID().String()),
|
||||||
attribute.String("address", prm.addr.EncodeToString()),
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
var exists bool
|
var exists bool
|
||||||
|
var locked bool
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
|
@ -57,21 +62,24 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
|
||||||
return ExistsRes{}, ErrShardDisabled
|
return ExistsRes{}, ErrShardDisabled
|
||||||
} else if s.info.Mode.NoMetabase() {
|
} else if s.info.Mode.NoMetabase() {
|
||||||
var p common.ExistsPrm
|
var p common.ExistsPrm
|
||||||
p.Address = prm.addr
|
p.Address = prm.Address
|
||||||
|
|
||||||
var res common.ExistsRes
|
var res common.ExistsRes
|
||||||
res, err = s.blobStor.Exists(ctx, p)
|
res, err = s.blobStor.Exists(ctx, p)
|
||||||
exists = res.Exists
|
exists = res.Exists
|
||||||
} else {
|
} else {
|
||||||
var existsPrm meta.ExistsPrm
|
var existsPrm meta.ExistsPrm
|
||||||
existsPrm.SetAddress(prm.addr)
|
existsPrm.SetAddress(prm.Address)
|
||||||
|
existsPrm.SetParent(prm.ParentAddress)
|
||||||
|
|
||||||
var res meta.ExistsRes
|
var res meta.ExistsRes
|
||||||
res, err = s.metaBase.Exists(ctx, existsPrm)
|
res, err = s.metaBase.Exists(ctx, existsPrm)
|
||||||
exists = res.Exists()
|
exists = res.Exists()
|
||||||
|
locked = res.Locked()
|
||||||
}
|
}
|
||||||
|
|
||||||
return ExistsRes{
|
return ExistsRes{
|
||||||
ex: exists,
|
ex: exists,
|
||||||
|
lc: locked,
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -525,7 +525,9 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug(logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp)))
|
log.Debug(logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp)))
|
||||||
s.expiredTombstonesCallback(ctx, tssExp)
|
if len(tssExp) > 0 {
|
||||||
|
s.expiredTombstonesCallback(ctx, tssExp)
|
||||||
|
}
|
||||||
|
|
||||||
iterPrm.SetOffset(tss[tssLen-1].Address())
|
iterPrm.SetOffset(tss[tssLen-1].Address())
|
||||||
tss = tss[:0]
|
tss = tss[:0]
|
||||||
|
|
|
@ -71,3 +71,20 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) {
|
||||||
|
|
||||||
return res.Locked(), nil
|
return res.Locked(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLocked return lock id's of the provided object. Not found object is
|
||||||
|
// considered as not locked. Requires healthy metabase, returns ErrDegradedMode otherwise.
|
||||||
|
func (s *Shard) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetLocked",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
attribute.String("address", addr.EncodeToString()),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
m := s.GetMode()
|
||||||
|
if m.NoMetabase() {
|
||||||
|
return nil, ErrDegradedMode
|
||||||
|
}
|
||||||
|
return s.metaBase.GetLocked(ctx, addr)
|
||||||
|
}
|
||||||
|
|
|
@ -72,7 +72,7 @@ func TestShardReload(t *testing.T) {
|
||||||
checkHasObjects := func(t *testing.T, exists bool) {
|
checkHasObjects := func(t *testing.T, exists bool) {
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
var prm ExistsPrm
|
var prm ExistsPrm
|
||||||
prm.SetAddress(objects[i].addr)
|
prm.Address = objects[i].addr
|
||||||
|
|
||||||
res, err := sh.Exists(context.Background(), prm)
|
res, err := sh.Exists(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -24,9 +24,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var errEmptyBodySignature = errors.New("malformed request: empty body signature")
|
||||||
errEmptyBodySignature = errors.New("malformed request: empty body signature")
|
|
||||||
)
|
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
|
|
@ -2,6 +2,7 @@ package deletesvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
@ -40,7 +41,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
|
||||||
)}
|
)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec execCtx) isLocal() bool {
|
func (exec *execCtx) isLocal() bool {
|
||||||
return exec.prm.common.LocalOnly()
|
return exec.prm.common.LocalOnly()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,10 +65,33 @@ func (exec *execCtx) newAddress(id oid.ID) oid.Address {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) formSplitInfo(ctx context.Context) error {
|
func (exec *execCtx) formExtendedInfo(ctx context.Context) error {
|
||||||
var err error
|
_, err := exec.svc.header.head(ctx, exec)
|
||||||
exec.splitInfo, err = exec.svc.header.splitInfo(ctx, exec)
|
|
||||||
if err != nil && !apiclient.IsErrObjectAlreadyRemoved(err) {
|
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
|
var errECInfo *objectSDK.ECInfoError
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case err == nil:
|
||||||
|
return nil
|
||||||
|
case errors.As(err, &errSplitInfo):
|
||||||
|
exec.splitInfo = errSplitInfo.SplitInfo()
|
||||||
|
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
|
||||||
|
|
||||||
|
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
|
||||||
|
|
||||||
|
if err := exec.collectMembers(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
|
||||||
|
return nil
|
||||||
|
case errors.As(err, &errECInfo):
|
||||||
|
exec.log.Debug(logs.DeleteECObjectReceived)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !apiclient.IsErrObjectAlreadyRemoved(err) {
|
||||||
// IsErrObjectAlreadyRemoved check is required because splitInfo
|
// IsErrObjectAlreadyRemoved check is required because splitInfo
|
||||||
// implicitly performs Head request that may return ObjectAlreadyRemoved
|
// implicitly performs Head request that may return ObjectAlreadyRemoved
|
||||||
// status that is not specified for Delete.
|
// status that is not specified for Delete.
|
||||||
|
|
|
@ -35,19 +35,9 @@ func (exec *execCtx) formTombstone(ctx context.Context) error {
|
||||||
|
|
||||||
exec.log.Debug(logs.DeleteFormingSplitInfo)
|
exec.log.Debug(logs.DeleteFormingSplitInfo)
|
||||||
|
|
||||||
if err := exec.formSplitInfo(ctx); err != nil {
|
if err := exec.formExtendedInfo(ctx); err != nil {
|
||||||
return fmt.Errorf("form split info: %w", err)
|
return fmt.Errorf("form extended info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
|
|
||||||
|
|
||||||
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
|
|
||||||
|
|
||||||
if err := exec.collectMembers(ctx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
|
|
||||||
|
|
||||||
return exec.initTombstoneObject()
|
return exec.initTombstoneObject()
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ type cfg struct {
|
||||||
|
|
||||||
header interface {
|
header interface {
|
||||||
// must return (nil, nil) for PHY objects
|
// must return (nil, nil) for PHY objects
|
||||||
splitInfo(context.Context, *execCtx) (*objectSDK.SplitInfo, error)
|
head(context.Context, *execCtx) (*objectSDK.Object, error)
|
||||||
|
|
||||||
children(context.Context, *execCtx) ([]oid.ID, error)
|
children(context.Context, *execCtx) ([]oid.ID, error)
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@ package deletesvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
|
|
||||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
|
@ -44,19 +43,8 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi
|
||||||
return wr.Object(), nil
|
return wr.Object(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *headSvcWrapper) splitInfo(ctx context.Context, exec *execCtx) (*objectSDK.SplitInfo, error) {
|
func (w *headSvcWrapper) head(ctx context.Context, exec *execCtx) (*objectSDK.Object, error) {
|
||||||
_, err := w.headAddress(ctx, exec, exec.address())
|
return w.headAddress(ctx, exec, exec.address())
|
||||||
|
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
|
||||||
|
|
||||||
switch {
|
|
||||||
case err == nil:
|
|
||||||
return nil, nil
|
|
||||||
case errors.As(err, &errSplitInfo):
|
|
||||||
return errSplitInfo.SplitInfo(), nil
|
|
||||||
default:
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *headSvcWrapper) children(ctx context.Context, exec *execCtx) ([]oid.ID, error) {
|
func (w *headSvcWrapper) children(ctx context.Context, exec *execCtx) ([]oid.ID, error) {
|
||||||
|
|
|
@ -41,12 +41,10 @@ func (r *request) assemble(ctx context.Context) {
|
||||||
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
||||||
|
|
||||||
r.log.Debug(logs.GetAssemblingSplittedObject,
|
r.log.Debug(logs.GetAssemblingSplittedObject,
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
|
defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
|
@ -55,7 +53,6 @@ func (r *request) assemble(ctx context.Context) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.log.Warn(logs.GetFailedToAssembleSplittedObject,
|
r.log.Warn(logs.GetFailedToAssembleSplittedObject,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
|
|
|
@ -39,12 +39,10 @@ func (r *request) assembleEC(ctx context.Context) {
|
||||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
||||||
|
|
||||||
r.log.Debug(logs.GetAssemblingECObject,
|
r.log.Debug(logs.GetAssemblingECObject,
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
|
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
|
@ -53,7 +51,6 @@ func (r *request) assembleEC(ctx context.Context) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.log.Warn(logs.GetFailedToAssembleECObject,
|
r.log.Warn(logs.GetFailedToAssembleECObject,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Stringer("address", r.address()),
|
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
|
|
|
@ -28,7 +28,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
|
||||||
)}
|
)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec execCtx) isLocal() bool {
|
func (exec *execCtx) isLocal() bool {
|
||||||
return exec.prm.common.LocalOnly()
|
return exec.prm.common.LocalOnly()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue