[#799] engine: Skip put when object removed from shard

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2023-11-21 15:06:54 +03:00 committed by Evgenii Stratonikov
parent 6534252c22
commit d0eadf7ea2
2 changed files with 59 additions and 35 deletions

View file

@ -406,9 +406,8 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
if putDone || exists {
if putDone {
switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status {
case putToShardSuccess:
res.evacuated.Add(1)
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
@ -416,11 +415,12 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
zap.Stringer("addr", addr),
evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
if exists {
res.skipped.Add(1)
}
return true, nil
case putToShardExists, putToShardRemoved:
res.skipped.Add(1)
return true, nil
default:
continue
}
}

View file

@ -12,6 +12,7 @@ import (
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
@ -26,6 +27,20 @@ type PutPrm struct {
var errPutShard = errors.New("could not put object to any shard")
type putToShardStatus byte
const (
putToShardUnknown putToShardStatus = iota
putToShardSuccess
putToShardExists
putToShardRemoved
)
type putToShardRes struct {
status putToShardStatus
err error
}
// WithObject is a Put option to set object to save.
//
// Option is required.
@ -68,8 +83,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
return err
}
finished := false
var shRes putToShardRes
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
e.mtx.RLock()
pool, ok := e.shardPools[sh.ID().String()]
@ -78,25 +92,26 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// Shard was concurrently removed, skip.
return false
}
putDone, exists := e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
finished = putDone || exists
return finished
shRes = e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
return shRes.status != putToShardUnknown
})
if !finished {
err = errPutShard
switch shRes.status {
case putToShardUnknown:
return errPutShard
case putToShardRemoved:
return shRes.err
case putToShardExists, putToShardSuccess:
return nil
default:
return errPutShard
}
return err
}
// putToShard puts object to sh.
// First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
var putSuccess, alreadyExists bool
// Return putToShardStatus and error if it is necessary to propagate an error upper.
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool,
addr oid.Address, obj *objectSDK.Object,
) (res putToShardRes) {
exitCh := make(chan struct{})
if err := pool.Submit(func() {
@ -110,14 +125,13 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
if shard.IsErrObjectExpired(err) {
// object is already found but
// expired => do nothing with it
alreadyExists = true
res.status = putToShardExists
}
return // this is not ErrAlreadyRemoved error so we can go to the next shard
}
alreadyExists = exists.Exists()
if alreadyExists {
if exists.Exists() {
if ind != 0 {
var toMoveItPrm shard.ToMoveItPrm
toMoveItPrm.SetAddress(addr)
@ -132,6 +146,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
}
}
res.status = putToShardExists
return
}
@ -148,12 +163,21 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return
}
if client.IsErrObjectAlreadyRemoved(err) {
e.log.Warn(logs.EngineCouldNotPutObjectToShard,
zap.Stringer("shard_id", sh.ID()),
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
res.status = putToShardRemoved
res.err = err
return
}
e.reportShardError(sh, "could not put object to shard", err)
return
}
putSuccess = true
res.status = putToShardSuccess
}); err != nil {
e.log.Warn(logs.EngineCouldNotPutObjectToShard, zap.Error(err))
close(exitCh)
@ -161,7 +185,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
<-exitCh
return putSuccess, alreadyExists
return
}
// Put writes provided object to local storage.