From aa4e732eb46f0c0aa3eeda226befe2f50d986dba Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Mon, 27 May 2024 15:57:02 +0300 Subject: [PATCH 01/10] [#1147] Fix gofumpt issue Signed-off-by: Anton Nikiforov --- cmd/frostfs-cli/modules/ape_manager/add_chain.go | 4 +--- pkg/services/apemanager/executor.go | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/cmd/frostfs-cli/modules/ape_manager/add_chain.go b/cmd/frostfs-cli/modules/ape_manager/add_chain.go index 7e6614392..dea7fb36e 100644 --- a/cmd/frostfs-cli/modules/ape_manager/add_chain.go +++ b/cmd/frostfs-cli/modules/ape_manager/add_chain.go @@ -38,9 +38,7 @@ const ( groupTarget = "group" ) -var ( - errUnknownTargetType = errors.New("unknown target type") -) +var errUnknownTargetType = errors.New("unknown target type") var addCmd = &cobra.Command{ Use: "add", diff --git a/pkg/services/apemanager/executor.go b/pkg/services/apemanager/executor.go index bad43b5fa..9cfc97473 100644 --- a/pkg/services/apemanager/executor.go +++ b/pkg/services/apemanager/executor.go @@ -24,9 +24,7 @@ import ( "go.uber.org/zap" ) -var ( - errEmptyBodySignature = errors.New("malformed request: empty body signature") -) +var errEmptyBodySignature = errors.New("malformed request: empty body signature") type cfg struct { log *logger.Logger -- 2.45.2 From 699fa87ec01160e5f6cffac232b0db603aefcfc1 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Mon, 27 May 2024 15:57:48 +0300 Subject: [PATCH 02/10] [#1147] log: Remove redundant `address` field from log Filled when logger created for `request` object from package `getsvc`. Signed-off-by: Anton Nikiforov --- pkg/services/object/get/assemble.go | 3 --- pkg/services/object/get/assembleec.go | 3 --- 2 files changed, 6 deletions(-) diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index 3f4a02c02..66c4580b0 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -41,12 +41,10 @@ func (r *request) assemble(ctx context.Context) { assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r) r.log.Debug(logs.GetAssemblingSplittedObject, - zap.Stringer("address", r.address()), zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_length", r.ctxRange().GetLength()), ) defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted, - zap.Stringer("address", r.address()), zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_length", r.ctxRange().GetLength()), ) @@ -55,7 +53,6 @@ func (r *request) assemble(ctx context.Context) { if err != nil { r.log.Warn(logs.GetFailedToAssembleSplittedObject, zap.Error(err), - zap.Stringer("address", r.address()), zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_length", r.ctxRange().GetLength()), ) diff --git a/pkg/services/object/get/assembleec.go b/pkg/services/object/get/assembleec.go index 58641c975..5c999929a 100644 --- a/pkg/services/object/get/assembleec.go +++ b/pkg/services/object/get/assembleec.go @@ -39,12 +39,10 @@ func (r *request) assembleEC(ctx context.Context) { assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log) r.log.Debug(logs.GetAssemblingECObject, - zap.Stringer("address", r.address()), zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_length", r.ctxRange().GetLength()), ) defer r.log.Debug(logs.GetAssemblingECObjectCompleted, - zap.Stringer("address", r.address()), zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_length", r.ctxRange().GetLength()), ) @@ -53,7 +51,6 @@ func (r *request) assembleEC(ctx context.Context) { if err != nil { r.log.Warn(logs.GetFailedToAssembleECObject, zap.Error(err), - zap.Stringer("address", r.address()), zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_length", r.ctxRange().GetLength()), ) -- 2.45.2 From f4ffda70b0bab8a0509147e0baeadde0fa308a85 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Mon, 27 May 2024 16:03:22 +0300 Subject: [PATCH 03/10] [#1147] object: Use methods on pointer for `deletesvc.execCtx` Signed-off-by: Anton Nikiforov --- pkg/services/object/delete/exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/services/object/delete/exec.go b/pkg/services/object/delete/exec.go index d48f8ab7b..d119328a6 100644 --- a/pkg/services/object/delete/exec.go +++ b/pkg/services/object/delete/exec.go @@ -40,7 +40,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) { )} } -func (exec execCtx) isLocal() bool { +func (exec *execCtx) isLocal() bool { return exec.prm.common.LocalOnly() } -- 2.45.2 From e122b35fe925b3ec460e3156f3bf8967802c3961 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Mon, 27 May 2024 16:05:20 +0300 Subject: [PATCH 04/10] [#1147] object: Use methods on pointer for `searchsvc.execCtx` Signed-off-by: Anton Nikiforov --- pkg/services/object/search/exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go index 2e0d48773..4a2c04ecd 100644 --- a/pkg/services/object/search/exec.go +++ b/pkg/services/object/search/exec.go @@ -28,7 +28,7 @@ func (exec *execCtx) setLogger(l *logger.Logger) { )} } -func (exec execCtx) isLocal() bool { +func (exec *execCtx) isLocal() bool { return exec.prm.common.LocalOnly() } -- 2.45.2 From 4c943eb00d3f5b55b358f4b58baed5910b6b7d35 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Mon, 27 May 2024 16:11:57 +0300 Subject: [PATCH 05/10] [#1147] gc: Execute callback for expired tombstones when they exists Signed-off-by: Anton Nikiforov --- pkg/local_object_storage/shard/gc.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index ef8e97d34..d605746e8 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -525,7 +525,9 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { } log.Debug(logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp))) - s.expiredTombstonesCallback(ctx, tssExp) + if len(tssExp) > 0 { + s.expiredTombstonesCallback(ctx, tssExp) + } iterPrm.SetOffset(tss[tssLen-1].Address()) tss = tss[:0] -- 2.45.2 From 8f9f20698bf6cec33e3854ed8c418af032d27211 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Mon, 27 May 2024 21:31:59 +0300 Subject: [PATCH 06/10] [#1147] cli: Fix output when print EC info with flags `json` & `proto` Signed-off-by: Anton Nikiforov --- cmd/frostfs-cli/modules/object/range.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/frostfs-cli/modules/object/range.go b/cmd/frostfs-cli/modules/object/range.go index 9ba752237..ad4bc3d59 100644 --- a/cmd/frostfs-cli/modules/object/range.go +++ b/cmd/frostfs-cli/modules/object/range.go @@ -152,7 +152,11 @@ func printECInfoErr(cmd *cobra.Command, err error) bool { ok := errors.As(err, &errECInfo) if ok { - cmd.PrintErrln("Object is erasure-encoded, ec information received.") + toJSON, _ := cmd.Flags().GetBool(commonflags.JSON) + toProto, _ := cmd.Flags().GetBool("proto") + if !(toJSON || toProto) { + cmd.PrintErrln("Object is erasure-encoded, ec information received.") + } printECInfo(cmd, errECInfo.ECInfo()) } -- 2.45.2 From f40e27ef04cf590dfab45cddb09e5d0ec0fc61c8 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Mon, 27 May 2024 22:07:43 +0300 Subject: [PATCH 07/10] [#1147] node: Implement `Lock\Delete` requests for EC object Signed-off-by: Anton Nikiforov --- cmd/frostfs-cli/modules/object/util.go | 28 +++++---- internal/logs/logs.go | 2 + pkg/local_object_storage/engine/delete.go | 38 +++++++++++- .../engine/engine_test.go | 3 +- pkg/local_object_storage/engine/exists.go | 11 +++- pkg/local_object_storage/engine/inhume.go | 30 +++++++++- pkg/local_object_storage/engine/lock.go | 21 ++++++- pkg/local_object_storage/engine/put.go | 20 ++++++- pkg/local_object_storage/metabase/delete.go | 44 +++++++++++++- pkg/local_object_storage/metabase/exists.go | 38 ++++++++---- pkg/local_object_storage/metabase/inhume.go | 43 ++++++++++++++ pkg/local_object_storage/metabase/lock.go | 58 +++++++++++++++++++ pkg/local_object_storage/metabase/put.go | 2 +- pkg/local_object_storage/metabase/select.go | 2 +- pkg/local_object_storage/shard/exists.go | 18 +++++- pkg/local_object_storage/shard/lock.go | 17 ++++++ pkg/services/object/delete/exec.go | 30 +++++++++- pkg/services/object/delete/local.go | 14 +---- pkg/services/object/delete/service.go | 2 +- pkg/services/object/delete/util.go | 16 +---- 20 files changed, 371 insertions(+), 66 deletions(-) diff --git a/cmd/frostfs-cli/modules/object/util.go b/cmd/frostfs-cli/modules/object/util.go index c8625eb94..ff6b3219d 100644 --- a/cmd/frostfs-cli/modules/object/util.go +++ b/cmd/frostfs-cli/modules/object/util.go @@ -357,6 +357,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, _, err := internal.HeadObject(cmd.Context(), prmHead) var errSplit *objectSDK.SplitInfoError + var errEC *objectSDK.ECInfoError switch { default: @@ -366,19 +367,22 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, return nil case errors.As(err, &errSplit): common.PrintVerbose(cmd, "Split information received - object is virtual.") + splitInfo := errSplit.SplitInfo() + + if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok { + return members + } + + if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnr); ok { + return members + } + + return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj) + case errors.As(err, &errEC): + common.PrintVerbose(cmd, "Object is erasure-coded.") + return nil } - - splitInfo := errSplit.SplitInfo() - - if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok { - return members - } - - if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnr); ok { - return members - } - - return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj) + return nil } func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) { diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 43fd77624..7dc63341d 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -95,6 +95,7 @@ const ( DeleteFormingSplitInfo = "forming split info..." DeleteSplitInfoSuccessfullyFormedCollectingMembers = "split info successfully formed, collecting members..." DeleteMembersSuccessfullyCollected = "members successfully collected" + DeleteECObjectReceived = "erasure-coded object received, form tombstone" GetRemoteCallFailed = "remote call failed" GetCanNotAssembleTheObject = "can not assemble the object" GetTryingToAssembleTheObject = "trying to assemble the object..." @@ -213,6 +214,7 @@ const ( EngineFinishedRemovalOfLocallyredundantCopies = "finished removal of locally-redundant copies" EngineRemovingAnObjectWithoutFullLockingCheck = "removing an object without full locking check" EngineInterruptProcessingTheExpiredLocks = "interrupt processing the expired locks" + EngineInterruptGettingLockers = "can't get object's lockers" EngineInterruptProcessingTheDeletedLocks = "interrupt processing the deleted locks" EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly = "failed to move shard in degraded-read-only mode, moving to read-only" EngineFailedToMoveShardInReadonlyMode = "failed to move shard in read-only mode" diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 68a7325c6..44a612213 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -76,6 +76,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e is bool } var splitInfo *objectSDK.SplitInfo + var ecInfo *objectSDK.ECInfo // Removal of a big object is done in multiple stages: // 1. Remove the parent object. If it is locked or already removed, return immediately. @@ -91,13 +92,18 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e } var splitErr *objectSDK.SplitInfoError - if !errors.As(err, &splitErr) { + var ecErr *objectSDK.ECInfoError + if errors.As(err, &splitErr) { + splitInfo = splitErr.SplitInfo() + } else if errors.As(err, &ecErr) { + e.deleteChunks(ctx, sh, ecInfo, prm) + return false + } else { if !client.IsErrObjectNotFound(err) { e.reportShardError(sh, "could not check object existence", err) } return false } - splitInfo = splitErr.SplitInfo() } else if !resExists.Exists() { return false } @@ -171,3 +177,31 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo return false }) } + +func (e *StorageEngine) deleteChunks( + ctx context.Context, sh hashedShard, ecInfo *objectSDK.ECInfo, prm DeletePrm, +) { + var inhumePrm shard.InhumePrm + if prm.forceRemoval { + inhumePrm.ForceRemoval() + } + for _, chunk := range ecInfo.Chunks { + var addr oid.Address + addr.SetContainer(prm.addr.Container()) + var objID oid.ID + err := objID.ReadFromV2(chunk.ID) + if err != nil { + e.reportShardError(sh, "could not delete EC chunk", err) + } + addr.SetObject(objID) + inhumePrm.MarkAsGarbage(addr) + _, err = sh.Inhume(ctx, inhumePrm) + if err != nil { + e.log.Debug(logs.EngineCouldNotInhumeObjectInShard, + zap.Stringer("addr", addr), + zap.String("err", err.Error()), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + continue + } + } +} diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index b82700ed8..70c54590f 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -17,6 +17,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "git.frostfs.info/TrueCloudLab/hrw" "github.com/panjf2000/ants/v2" @@ -62,7 +63,7 @@ func benchmarkExists(b *testing.B, shardNum int) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - ok, err := e.exists(context.Background(), addr) + ok, _, err := e.exists(context.Background(), addr, oid.Address{}) if err != nil || ok { b.Fatalf("%t %v", ok, err) } diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index ef6292768..ce669ec5e 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -11,11 +11,13 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) -func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, error) { +func (e *StorageEngine) exists(ctx context.Context, addr oid.Address, parentAddr oid.Address) (bool, bool, error) { var shPrm shard.ExistsPrm shPrm.SetAddress(addr) + shPrm.SetParentAddress(parentAddr) alreadyRemoved := false exists := false + locked := false e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Exists(ctx, shPrm) @@ -44,13 +46,16 @@ func (e *StorageEngine) exists(ctx context.Context, addr oid.Address) (bool, err if !exists { exists = res.Exists() } + if !locked { + locked = res.Locked() + } return false }) if alreadyRemoved { - return false, new(apistatus.ObjectAlreadyRemoved) + return false, false, new(apistatus.ObjectAlreadyRemoved) } - return exists, nil + return exists, locked, nil } diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 80eda2204..62e7be933 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -152,7 +152,8 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh } var siErr *objectSDK.SplitInfoError - if !errors.As(err, &siErr) { + var ecErr *objectSDK.ECInfoError + if !(errors.As(err, &siErr) || errors.As(err, &ecErr)) { e.reportShardError(sh, "could not check for presents in shard", err) return } @@ -220,6 +221,33 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e return locked, outErr } +// GetLocked return lock id's if object is locked according to StorageEngine's state. +func (e *StorageEngine) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.GetLocked", + trace.WithAttributes( + attribute.String("address", addr.EncodeToString()), + )) + defer span.End() + + var locked []oid.ID + var outErr error + + e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) { + ld, err := h.Shard.GetLocked(ctx, addr) + if err != nil { + e.reportShardError(h, logs.EngineInterruptGettingLockers, err, zap.Stringer("addr", addr), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + outErr = err + } + locked = append(locked, ld...) + return false + }) + if len(locked) > 0 { + return locked, nil + } + return locked, outErr +} + func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { sh.HandleExpiredTombstones(ctx, addrs) diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index 5ad603421..af56c3e1e 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -83,7 +83,26 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo exRes, err := sh.Exists(ctx, existsPrm) if err != nil { var siErr *objectSDK.SplitInfoError - if !errors.As(err, &siErr) { + var eiErr *objectSDK.ECInfoError + if errors.As(err, &eiErr) { + eclocked := []oid.ID{locked} + for _, chunk := range eiErr.ECInfo().Chunks { + var objID oid.ID + err = objID.ReadFromV2(chunk.ID) + if err != nil { + e.reportShardError(sh, "could not lock object in shard", err) + return false + } + eclocked = append(eclocked, objID) + } + err = sh.Lock(ctx, idCnr, locker, eclocked) + if err != nil { + e.reportShardError(sh, "could not lock object in shard", err) + return false + } + root = true + return false + } else if !errors.As(err, &siErr) { if shard.IsErrObjectExpired(err) { // object is already expired => // do not lock it diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 777f728b7..aa94ef22e 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -80,11 +80,29 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { // In #1146 this check was parallelized, however, it became // much slower on fast machines for 4 shards. - _, err := e.exists(ctx, addr) + var parent oid.Address + if prm.obj.ECHeader() != nil { + parent.SetObject(prm.obj.ECHeader().Parent()) + parent.SetContainer(addr.Container()) + } + existed, locked, err := e.exists(ctx, addr, parent) if err != nil { return err } + if !existed && locked { + lockers, err := e.GetLocked(ctx, parent) + if err != nil { + return err + } + for _, locker := range lockers { + err = e.lock(ctx, addr.Container(), locker, []oid.ID{addr.Object()}) + if err != nil { + return err + } + } + } + var shRes putToShardRes e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { e.mtx.RLock() diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index e591b8661..4addeae77 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -267,8 +267,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter return deleteSingleResult{}, nil } var siErr *objectSDK.SplitInfoError - if errors.As(err, &siErr) { + var ecErr *objectSDK.ECInfoError + if errors.As(err, &siErr) || errors.As(err, &ecErr) { // if object is virtual (parent) then do nothing, it will be deleted with last child + // if object is erasure-coded it will be deleted with the last chunk presented on the shard return deleteSingleResult{}, nil } @@ -471,6 +473,46 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error name: rootBucketName(cnr, bucketName), key: objKey, }) + if obj.ECHeader() != nil { + err := delECInfo(tx, cnr, objKey, obj.ECHeader()) + if err != nil { + return err + } + } return nil } + +func delECInfo(tx *bbolt.Tx, cnr cid.ID, objKey []byte, ecHead *objectSDK.ECHeader) error { + parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize)) + bucketName := make([]byte, bucketKeySize) + + val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID) + if len(val) > 0 { + if bytes.Equal(val, objKey) { + delUniqueIndexItem(tx, namedBucketItem{ + name: ecInfoBucketName(cnr, bucketName), + key: parentID, + }) + } else { + val = bytes.Clone(val) + offset := 0 + for offset < len(val) { + if bytes.Equal(objKey, val[offset:offset+objectKeySize]) { + val = append(val[:offset], val[offset+objectKeySize:]...) + break + } + offset += objectKeySize + } + err := putUniqueIndexItem(tx, namedBucketItem{ + name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)), + key: parentID, + val: val, + }) + if err != nil { + return err + } + } + } + return nil +} diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index bf6766c05..153d92110 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -20,12 +20,14 @@ import ( // ExistsPrm groups the parameters of Exists operation. type ExistsPrm struct { - addr oid.Address + addr oid.Address + paddr oid.Address } // ExistsRes groups the resulting values of Exists operation. type ExistsRes struct { exists bool + locked bool } var ErrLackSplitInfo = logicerr.New("no split info on parent object") @@ -35,11 +37,21 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) { p.addr = addr } +// SetParent is an Exists option to set objects parent. +func (p *ExistsPrm) SetParent(addr oid.Address) { + p.paddr = addr +} + // Exists returns the fact that the object is in the metabase. func (p ExistsRes) Exists() bool { return p.exists } +// Locked returns the fact that the object is locked. +func (p ExistsRes) Locked() bool { + return p.locked +} + // Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it // returns true if addr is in primary index or false if it is not. // @@ -70,7 +82,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err currEpoch := db.epochState.CurrentEpoch() err = db.boltDB.View(func(tx *bbolt.Tx) error { - res.exists, err = db.exists(tx, prm.addr, currEpoch) + res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch) return err }) @@ -78,15 +90,19 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err return res, metaerr.Wrap(err) } -func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists bool, err error) { +func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) { + var locked bool + if !parent.Equals(oid.Address{}) { + locked = objectLocked(tx, parent.Container(), parent.Object()) + } // check graveyard and object expiration first switch objectStatus(tx, addr, currEpoch) { case 1: - return false, logicerr.Wrap(new(apistatus.ObjectNotFound)) + return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound)) case 2: - return false, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved)) + return false, locked, logicerr.Wrap(new(apistatus.ObjectAlreadyRemoved)) case 3: - return false, ErrObjectIsExpired + return false, locked, ErrObjectIsExpired } objKey := objectKey(addr.Object(), make([]byte, objectKeySize)) @@ -96,25 +112,25 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b // if graveyard is empty, then check if object exists in primary bucket if inBucket(tx, primaryBucketName(cnr, key), objKey) { - return true, nil + return true, locked, nil } // if primary bucket is empty, then check if object exists in parent bucket if inBucket(tx, parentBucketName(cnr, key), objKey) { splitInfo, err := getSplitInfo(tx, cnr, objKey) if err != nil { - return false, err + return false, locked, err } - return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) + return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) } // if parent bucket is empty, then check if object exists in ec bucket if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 { - return false, getECInfoError(tx, cnr, data) + return false, locked, getECInfoError(tx, cnr, data) } // if parent bucket is empty, then check if object exists in typed buckets - return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil + return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil } // objectStatus returns: diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 250504120..c265fb217 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -229,11 +229,17 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes obj, err := db.get(tx, prm.target[i], buf, false, true, epoch) targetKey := addressKey(prm.target[i], buf) + var ecErr *objectSDK.ECInfoError if err == nil { err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res) if err != nil { return err } + } else if errors.As(err, &ecErr) { + err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey) + if err != nil { + return err + } } if prm.tomb != nil { @@ -272,6 +278,43 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes return db.applyInhumeResToCounters(tx, res) } +func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes, + garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket, + ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte, +) error { + for _, chunk := range ecInfo.Chunks { + chunkBuf := make([]byte, addressKeySize) + var chunkAddr oid.Address + chunkAddr.SetContainer(cnr) + var chunkID oid.ID + err := chunkID.ReadFromV2(chunk.ID) + if err != nil { + return err + } + chunkAddr.SetObject(chunkID) + chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch) + if err != nil { + return err + } + err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res) + if err != nil { + return err + } + chunkKey := addressKey(chunkAddr, chunkBuf) + if tomb != nil { + _, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey) + if err != nil { + return err + } + } + err = targetBucket.Put(chunkKey, value) + if err != nil { + return err + } + } + return nil +} + func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil { return err diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index 0bc2b06f0..732ba426d 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -175,6 +175,31 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool { return false } +// return `LOCK` id's if specified object is locked in the specified container. +func getLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) { + var lockers []oid.ID + bucketLocked := tx.Bucket(bucketNameLocked) + if bucketLocked != nil { + key := make([]byte, cidSize) + idCnr.Encode(key) + bucketLockedContainer := bucketLocked.Bucket(key) + if bucketLockedContainer != nil { + binObjIDs, err := decodeList(bucketLockedContainer.Get(objectKey(idObj, key))) + if err != nil { + return nil, fmt.Errorf("decode list of object lockers: %w", err) + } + for _, binObjID := range binObjIDs { + var id oid.ID + if err = id.Decode(binObjID); err != nil { + return nil, err + } + lockers = append(lockers, id) + } + } + } + return lockers, nil +} + // releases all records about the objects locked by the locker. // Returns slice of unlocked object ID's or an error. // @@ -325,3 +350,36 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e success = err == nil return res, err } + +// GetLocked return `LOCK` id's if provided object is locked by any `LOCK`. Not found +// object is considered as non-locked. +// +// Returns only non-logical errors related to underlying database. +func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, err error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("GetLocked", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.GetLocked", + trace.WithAttributes( + attribute.String("address", addr.EncodeToString()), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return res, ErrDegradedMode + } + err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { + res, err = getLocked(tx, addr.Container(), addr.Object()) + return nil + })) + success = err == nil + return res, err +} diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 8b7b83016..94e68d85a 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -115,7 +115,7 @@ func (db *DB) put(tx *bbolt.Tx, isParent := si != nil - exists, err := db.exists(tx, objectCore.AddressOf(obj), currEpoch) + exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch) var splitInfoError *objectSDK.SplitInfoError if errors.As(err, &splitInfoError) { diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index e5321c794..65d0111c4 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -428,7 +428,7 @@ func (db *DB) selectObjectID( addr.SetObject(id) var splitInfoError *objectSDK.SplitInfoError - ok, err := db.exists(tx, addr, currEpoch) + ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch) if (err == nil && ok) || errors.As(err, &splitInfoError) { raw := make([]byte, objectKeySize) id.Encode(raw) diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index 2cdb8dfa8..12c339c05 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -13,12 +13,14 @@ import ( // ExistsPrm groups the parameters of Exists operation. type ExistsPrm struct { - addr oid.Address + addr oid.Address + paddr oid.Address } // ExistsRes groups the resulting values of Exists operation. type ExistsRes struct { ex bool + lc bool } // SetAddress is an Exists option to set object checked for existence. @@ -26,11 +28,21 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) { p.addr = addr } +// SetParentAddress is an Exists option to set parent object checked for existence. +func (p *ExistsPrm) SetParentAddress(addr oid.Address) { + p.paddr = addr +} + // Exists returns the fact that the object is in the shard. func (p ExistsRes) Exists() bool { return p.ex } +// Locked returns the fact that the object is locked. +func (p ExistsRes) Locked() bool { + return p.lc +} + // Exists checks if object is presented in shard. // // Returns any error encountered that does not allow to @@ -48,6 +60,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { defer span.End() var exists bool + var locked bool var err error s.m.RLock() @@ -65,13 +78,16 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { } else { var existsPrm meta.ExistsPrm existsPrm.SetAddress(prm.addr) + existsPrm.SetParent(prm.paddr) var res meta.ExistsRes res, err = s.metaBase.Exists(ctx, existsPrm) exists = res.Exists() + locked = res.Locked() } return ExistsRes{ ex: exists, + lc: locked, }, err } diff --git a/pkg/local_object_storage/shard/lock.go b/pkg/local_object_storage/shard/lock.go index 52186cbfd..4a8d89d63 100644 --- a/pkg/local_object_storage/shard/lock.go +++ b/pkg/local_object_storage/shard/lock.go @@ -71,3 +71,20 @@ func (s *Shard) IsLocked(ctx context.Context, addr oid.Address) (bool, error) { return res.Locked(), nil } + +// GetLocked return lock id's of the provided object. Not found object is +// considered as not locked. Requires healthy metabase, returns ErrDegradedMode otherwise. +func (s *Shard) GetLocked(ctx context.Context, addr oid.Address) ([]oid.ID, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetLocked", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.String("address", addr.EncodeToString()), + )) + defer span.End() + + m := s.GetMode() + if m.NoMetabase() { + return nil, ErrDegradedMode + } + return s.metaBase.GetLocked(ctx, addr) +} diff --git a/pkg/services/object/delete/exec.go b/pkg/services/object/delete/exec.go index d119328a6..45bd89d36 100644 --- a/pkg/services/object/delete/exec.go +++ b/pkg/services/object/delete/exec.go @@ -2,6 +2,7 @@ package deletesvc import ( "context" + "errors" "fmt" "strconv" @@ -64,9 +65,32 @@ func (exec *execCtx) newAddress(id oid.ID) oid.Address { return a } -func (exec *execCtx) formSplitInfo(ctx context.Context) error { - var err error - exec.splitInfo, err = exec.svc.header.splitInfo(ctx, exec) +func (exec *execCtx) formExtendedInfo(ctx context.Context) error { + _, err := exec.svc.header.head(ctx, exec) + + var errSplitInfo *objectSDK.SplitInfoError + var errECInfo *objectSDK.ECInfoError + + switch { + case err == nil: + return nil + case errors.As(err, &errSplitInfo): + exec.splitInfo = errSplitInfo.SplitInfo() + exec.tombstone.SetSplitID(exec.splitInfo.SplitID()) + + exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers) + + if err := exec.collectMembers(ctx); err != nil { + return err + } + + exec.log.Debug(logs.DeleteMembersSuccessfullyCollected) + return nil + case errors.As(err, &errECInfo): + exec.log.Debug(logs.DeleteECObjectReceived) + return nil + } + if err != nil && !apiclient.IsErrObjectAlreadyRemoved(err) { // IsErrObjectAlreadyRemoved check is required because splitInfo // implicitly performs Head request that may return ObjectAlreadyRemoved diff --git a/pkg/services/object/delete/local.go b/pkg/services/object/delete/local.go index 55ce4408d..2c3c47f49 100644 --- a/pkg/services/object/delete/local.go +++ b/pkg/services/object/delete/local.go @@ -35,19 +35,9 @@ func (exec *execCtx) formTombstone(ctx context.Context) error { exec.log.Debug(logs.DeleteFormingSplitInfo) - if err := exec.formSplitInfo(ctx); err != nil { - return fmt.Errorf("form split info: %w", err) + if err := exec.formExtendedInfo(ctx); err != nil { + return fmt.Errorf("form extended info: %w", err) } - exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers) - - exec.tombstone.SetSplitID(exec.splitInfo.SplitID()) - - if err := exec.collectMembers(ctx); err != nil { - return err - } - - exec.log.Debug(logs.DeleteMembersSuccessfullyCollected) - return exec.initTombstoneObject() } diff --git a/pkg/services/object/delete/service.go b/pkg/services/object/delete/service.go index 0ba4da437..0ba21eee3 100644 --- a/pkg/services/object/delete/service.go +++ b/pkg/services/object/delete/service.go @@ -41,7 +41,7 @@ type cfg struct { header interface { // must return (nil, nil) for PHY objects - splitInfo(context.Context, *execCtx) (*objectSDK.SplitInfo, error) + head(context.Context, *execCtx) (*objectSDK.Object, error) children(context.Context, *execCtx) ([]oid.ID, error) diff --git a/pkg/services/object/delete/util.go b/pkg/services/object/delete/util.go index 439abca2b..bb2b5f00b 100644 --- a/pkg/services/object/delete/util.go +++ b/pkg/services/object/delete/util.go @@ -2,7 +2,6 @@ package deletesvc import ( "context" - "errors" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" @@ -44,19 +43,8 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi return wr.Object(), nil } -func (w *headSvcWrapper) splitInfo(ctx context.Context, exec *execCtx) (*objectSDK.SplitInfo, error) { - _, err := w.headAddress(ctx, exec, exec.address()) - - var errSplitInfo *objectSDK.SplitInfoError - - switch { - case err == nil: - return nil, nil - case errors.As(err, &errSplitInfo): - return errSplitInfo.SplitInfo(), nil - default: - return nil, err - } +func (w *headSvcWrapper) head(ctx context.Context, exec *execCtx) (*objectSDK.Object, error) { + return w.headAddress(ctx, exec, exec.address()) } func (w *headSvcWrapper) children(ctx context.Context, exec *execCtx) ([]oid.ID, error) { -- 2.45.2 From eaaf16bf237eea162e9e4e7b9588ac2ab3b7f513 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Tue, 28 May 2024 12:14:13 +0300 Subject: [PATCH 08/10] [#1147] Makefile: Fix `gopls-run` target Signed-off-by: Anton Nikiforov --- Makefile | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 710c12703..c93d06aa8 100755 --- a/Makefile +++ b/Makefile @@ -44,9 +44,12 @@ PROTOGEN_FROSTFS_DIR ?= $(PROTOBUF_DIR)/protogen-$(PROTOGEN_FROSTFS_VERSION) STATICCHECK_DIR ?= $(abspath $(BIN))/staticcheck STATICCHECK_VERSION_DIR ?= $(STATICCHECK_DIR)/$(STATICCHECK_VERSION) +SOURCES = $(shell find . -type f -name "*.go" -print) + GOPLS_VERSION ?= v0.15.1 GOPLS_DIR ?= $(abspath $(BIN))/gopls GOPLS_VERSION_DIR ?= $(GOPLS_DIR)/$(GOPLS_VERSION) +GOPLS_TEMP_FILE := $(shell mktemp) FROSTFS_CONTRACTS_PATH=$(abspath ./../frostfs-contract) LOCODE_DB_PATH=$(abspath ./.cache/locode_db) @@ -220,9 +223,12 @@ gopls-run: @if [ ! -d "$(GOPLS_VERSION_DIR)" ]; then \ make gopls-install; \ fi - @if [[ $$(find . -type f -name "*.go" -print | xargs $(GOPLS_VERSION_DIR)/gopls check | tee /dev/tty | wc -l) -ne 0 ]]; then \ + $(GOPLS_VERSION_DIR)/gopls check $(SOURCES) 2>&1 >$(GOPLS_TEMP_FILE) + @if [[ $$(wc -l < $(GOPLS_TEMP_FILE)) -ne 0 ]]; then \ + cat $(GOPLS_TEMP_FILE); \ exit 1; \ fi + rm $(GOPLS_TEMP_FILE) # Run linters in Docker docker/lint: -- 2.45.2 From 2a4f637861b8c3beebb14ca69b7543fec5fa7a3f Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Tue, 28 May 2024 12:21:08 +0300 Subject: [PATCH 09/10] [#1147] node: Fix issue from `gopls` Signed-off-by: Anton Nikiforov --- pkg/services/object/delete/exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/services/object/delete/exec.go b/pkg/services/object/delete/exec.go index 45bd89d36..22928dcd5 100644 --- a/pkg/services/object/delete/exec.go +++ b/pkg/services/object/delete/exec.go @@ -91,7 +91,7 @@ func (exec *execCtx) formExtendedInfo(ctx context.Context) error { return nil } - if err != nil && !apiclient.IsErrObjectAlreadyRemoved(err) { + if !apiclient.IsErrObjectAlreadyRemoved(err) { // IsErrObjectAlreadyRemoved check is required because splitInfo // implicitly performs Head request that may return ObjectAlreadyRemoved // status that is not specified for Delete. -- 2.45.2 From f1f267a9a5f51783fd8cdd081a7aa0a342b4ab48 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Thu, 30 May 2024 09:26:06 +0300 Subject: [PATCH 10/10] [#1147] node: Use public fields for `shard.ExistsPrm` Signed-off-by: Anton Nikiforov --- pkg/local_object_storage/engine/delete.go | 2 +- .../engine/engine_test.go | 5 +++- pkg/local_object_storage/engine/exists.go | 10 ++++---- pkg/local_object_storage/engine/inhume.go | 2 +- pkg/local_object_storage/engine/lock.go | 2 +- pkg/local_object_storage/engine/put.go | 7 ++++-- .../engine/remove_copies.go | 2 +- pkg/local_object_storage/shard/exists.go | 24 +++++++------------ pkg/local_object_storage/shard/reload_test.go | 2 +- 9 files changed, 26 insertions(+), 30 deletions(-) diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 44a612213..096528967 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -83,7 +83,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e // 2. Otherwise, search for all objects with a particular SplitID and delete them too. e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { var existsPrm shard.ExistsPrm - existsPrm.SetAddress(prm.addr) + existsPrm.Address = prm.addr resExists, err := sh.Exists(ctx, existsPrm) if err != nil { diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 70c54590f..49976abbb 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -63,7 +63,10 @@ func benchmarkExists(b *testing.B, shardNum int) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - ok, _, err := e.exists(context.Background(), addr, oid.Address{}) + var shPrm shard.ExistsPrm + shPrm.Address = addr + shPrm.ParentAddress = oid.Address{} + ok, _, err := e.exists(context.Background(), shPrm) if err != nil || ok { b.Fatalf("%t %v", ok, err) } diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index ce669ec5e..c57f79691 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -8,18 +8,16 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) -func (e *StorageEngine) exists(ctx context.Context, addr oid.Address, parentAddr oid.Address) (bool, bool, error) { - var shPrm shard.ExistsPrm - shPrm.SetAddress(addr) - shPrm.SetParentAddress(parentAddr) +// exists return in the first value true if object exists. +// Second return value marks is parent object locked. +func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool, bool, error) { alreadyRemoved := false exists := false locked := false - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(shPrm.Address, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Exists(ctx, shPrm) if err != nil { if client.IsErrObjectAlreadyRemoved(err) { diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 62e7be933..991305af0 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -142,7 +142,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh }() if checkExists { - existPrm.SetAddress(addr) + existPrm.Address = addr exRes, err := sh.Exists(ctx, existPrm) if err != nil { if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) { diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index af56c3e1e..5354c205f 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -78,7 +78,7 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo if checkExists { var existsPrm shard.ExistsPrm - existsPrm.SetAddress(addrLocked) + existsPrm.Address = addrLocked exRes, err := sh.Exists(ctx, existsPrm) if err != nil { diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index aa94ef22e..2a78febed 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -85,7 +85,10 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { parent.SetObject(prm.obj.ECHeader().Parent()) parent.SetContainer(addr.Container()) } - existed, locked, err := e.exists(ctx, addr, parent) + var shPrm shard.ExistsPrm + shPrm.Address = addr + shPrm.ParentAddress = parent + existed, locked, err := e.exists(ctx, shPrm) if err != nil { return err } @@ -138,7 +141,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti defer close(exitCh) var existPrm shard.ExistsPrm - existPrm.SetAddress(addr) + existPrm.Address = addr exists, err := sh.Exists(ctx, existPrm) if err != nil { diff --git a/pkg/local_object_storage/engine/remove_copies.go b/pkg/local_object_storage/engine/remove_copies.go index 00562e4cb..b99cf4f44 100644 --- a/pkg/local_object_storage/engine/remove_copies.go +++ b/pkg/local_object_storage/engine/remove_copies.go @@ -115,7 +115,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address found := false for i := range shards { var existsPrm shard.ExistsPrm - existsPrm.SetAddress(addr) + existsPrm.Address = addr res, err := shards[i].Exists(ctx, existsPrm) if err != nil { diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index 12c339c05..b5a9604b4 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -13,8 +13,10 @@ import ( // ExistsPrm groups the parameters of Exists operation. type ExistsPrm struct { - addr oid.Address - paddr oid.Address + // Exists option to set object checked for existence. + Address oid.Address + // Exists option to set parent object checked for existence. + ParentAddress oid.Address } // ExistsRes groups the resulting values of Exists operation. @@ -23,16 +25,6 @@ type ExistsRes struct { lc bool } -// SetAddress is an Exists option to set object checked for existence. -func (p *ExistsPrm) SetAddress(addr oid.Address) { - p.addr = addr -} - -// SetParentAddress is an Exists option to set parent object checked for existence. -func (p *ExistsPrm) SetParentAddress(addr oid.Address) { - p.paddr = addr -} - // Exists returns the fact that the object is in the shard. func (p ExistsRes) Exists() bool { return p.ex @@ -55,7 +47,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), - attribute.String("address", prm.addr.EncodeToString()), + attribute.String("address", prm.Address.EncodeToString()), )) defer span.End() @@ -70,15 +62,15 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { return ExistsRes{}, ErrShardDisabled } else if s.info.Mode.NoMetabase() { var p common.ExistsPrm - p.Address = prm.addr + p.Address = prm.Address var res common.ExistsRes res, err = s.blobStor.Exists(ctx, p) exists = res.Exists } else { var existsPrm meta.ExistsPrm - existsPrm.SetAddress(prm.addr) - existsPrm.SetParent(prm.paddr) + existsPrm.SetAddress(prm.Address) + existsPrm.SetParent(prm.ParentAddress) var res meta.ExistsRes res, err = s.metaBase.Exists(ctx, existsPrm) diff --git a/pkg/local_object_storage/shard/reload_test.go b/pkg/local_object_storage/shard/reload_test.go index b5ea2fec7..7dacbfa6c 100644 --- a/pkg/local_object_storage/shard/reload_test.go +++ b/pkg/local_object_storage/shard/reload_test.go @@ -72,7 +72,7 @@ func TestShardReload(t *testing.T) { checkHasObjects := func(t *testing.T, exists bool) { for i := range objects { var prm ExistsPrm - prm.SetAddress(objects[i].addr) + prm.Address = objects[i].addr res, err := sh.Exists(context.Background(), prm) require.NoError(t, err) -- 2.45.2