From d0eadf7ea2638a89e754592322bd2f1012c4756d Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Tue, 21 Nov 2023 15:06:54 +0300 Subject: [PATCH] [#799] engine: Skip put when object removed from shard Signed-off-by: Anton Nikiforov --- pkg/local_object_storage/engine/evacuate.go | 28 ++++----- pkg/local_object_storage/engine/put.go | 66 ++++++++++++++------- 2 files changed, 59 insertions(+), 35 deletions(-) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 7bb37ef61..f522e6fbd 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -406,21 +406,21 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok { continue } - putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object) - if putDone || exists { - if putDone { - res.evacuated.Add(1) - e.log.Debug(logs.EngineObjectIsMovedToAnotherShard, - zap.Stringer("from", sh.ID()), - zap.Stringer("to", shards[j].ID()), - zap.Stringer("addr", addr), - evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - } - if exists { - res.skipped.Add(1) - } + switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status { + case putToShardSuccess: + res.evacuated.Add(1) + e.log.Debug(logs.EngineObjectIsMovedToAnotherShard, + zap.Stringer("from", sh.ID()), + zap.Stringer("to", shards[j].ID()), + zap.Stringer("addr", addr), + evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return true, nil + case putToShardExists, putToShardRemoved: + res.skipped.Add(1) + return true, nil + default: + continue } } diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 7ce915ad8..2b1712f3f 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -12,6 +12,7 @@ import ( tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" @@ -26,6 +27,20 @@ type PutPrm struct { var errPutShard = errors.New("could not put object to any shard") +type putToShardStatus byte + +const ( + putToShardUnknown putToShardStatus = iota + putToShardSuccess + putToShardExists + putToShardRemoved +) + +type putToShardRes struct { + status putToShardStatus + err error +} + // WithObject is a Put option to set object to save. // // Option is required. @@ -68,8 +83,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { return err } - finished := false - + var shRes putToShardRes e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) { e.mtx.RLock() pool, ok := e.shardPools[sh.ID().String()] @@ -78,25 +92,26 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { // Shard was concurrently removed, skip. return false } - - putDone, exists := e.putToShard(ctx, sh, ind, pool, addr, prm.obj) - finished = putDone || exists - return finished + shRes = e.putToShard(ctx, sh, ind, pool, addr, prm.obj) + return shRes.status != putToShardUnknown }) - - if !finished { - err = errPutShard + switch shRes.status { + case putToShardUnknown: + return errPutShard + case putToShardRemoved: + return shRes.err + case putToShardExists, putToShardSuccess: + return nil + default: + return errPutShard } - - return err } // putToShard puts object to sh. -// First return value is true iff put has been successfully done. -// Second return value is true iff object already exists. -func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) { - var putSuccess, alreadyExists bool - +// Return putToShardStatus and error if it is necessary to propagate an error upper. +func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool, + addr oid.Address, obj *objectSDK.Object, +) (res putToShardRes) { exitCh := make(chan struct{}) if err := pool.Submit(func() { @@ -110,14 +125,13 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, if shard.IsErrObjectExpired(err) { // object is already found but // expired => do nothing with it - alreadyExists = true + res.status = putToShardExists } return // this is not ErrAlreadyRemoved error so we can go to the next shard } - alreadyExists = exists.Exists() - if alreadyExists { + if exists.Exists() { if ind != 0 { var toMoveItPrm shard.ToMoveItPrm toMoveItPrm.SetAddress(addr) @@ -132,6 +146,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, } } + res.status = putToShardExists return } @@ -148,12 +163,21 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return } + if client.IsErrObjectAlreadyRemoved(err) { + e.log.Warn(logs.EngineCouldNotPutObjectToShard, + zap.Stringer("shard_id", sh.ID()), + zap.String("error", err.Error()), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + res.status = putToShardRemoved + res.err = err + return + } e.reportShardError(sh, "could not put object to shard", err) return } - putSuccess = true + res.status = putToShardSuccess }); err != nil { e.log.Warn(logs.EngineCouldNotPutObjectToShard, zap.Error(err)) close(exitCh) @@ -161,7 +185,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, <-exitCh - return putSuccess, alreadyExists + return } // Put writes provided object to local storage.