[#799] engine: Skip put when object removed from shard
All checks were successful
DCO action / DCO (pull_request) Successful in 4m52s
Vulncheck / Vulncheck (pull_request) Successful in 5m44s
Build / Build Components (1.20) (pull_request) Successful in 7m8s
Build / Build Components (1.21) (pull_request) Successful in 7m9s
Tests and linters / Staticcheck (pull_request) Successful in 3m48s
Tests and linters / Tests (1.21) (pull_request) Successful in 4m14s
Tests and linters / Tests (1.20) (pull_request) Successful in 4m21s
Tests and linters / Lint (pull_request) Successful in 4m33s
Tests and linters / Tests with -race (pull_request) Successful in 6m0s
All checks were successful
DCO action / DCO (pull_request) Successful in 4m52s
Vulncheck / Vulncheck (pull_request) Successful in 5m44s
Build / Build Components (1.20) (pull_request) Successful in 7m8s
Build / Build Components (1.21) (pull_request) Successful in 7m9s
Tests and linters / Staticcheck (pull_request) Successful in 3m48s
Tests and linters / Tests (1.21) (pull_request) Successful in 4m14s
Tests and linters / Tests (1.20) (pull_request) Successful in 4m21s
Tests and linters / Lint (pull_request) Successful in 4m33s
Tests and linters / Tests with -race (pull_request) Successful in 6m0s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
18cfb41917
commit
db247442dd
2 changed files with 59 additions and 35 deletions
|
@ -406,21 +406,21 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
|||
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
|
||||
continue
|
||||
}
|
||||
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
|
||||
if putDone || exists {
|
||||
if putDone {
|
||||
res.evacuated.Add(1)
|
||||
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
|
||||
zap.Stringer("from", sh.ID()),
|
||||
zap.Stringer("to", shards[j].ID()),
|
||||
zap.Stringer("addr", addr),
|
||||
evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
if exists {
|
||||
res.skipped.Add(1)
|
||||
}
|
||||
switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status {
|
||||
case putToShardSuccess:
|
||||
res.evacuated.Add(1)
|
||||
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
|
||||
zap.Stringer("from", sh.ID()),
|
||||
zap.Stringer("to", shards[j].ID()),
|
||||
zap.Stringer("addr", addr),
|
||||
evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return true, nil
|
||||
case putToShardExists, putToShardRemoved:
|
||||
res.skipped.Add(1)
|
||||
return true, nil
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -26,6 +27,20 @@ type PutPrm struct {
|
|||
|
||||
var errPutShard = errors.New("could not put object to any shard")
|
||||
|
||||
type putToShardStatus byte
|
||||
|
||||
const (
|
||||
putToShardUnknown putToShardStatus = iota
|
||||
putToShardSuccess
|
||||
putToShardExists
|
||||
putToShardRemoved
|
||||
)
|
||||
|
||||
type putToShardRes struct {
|
||||
status putToShardStatus
|
||||
err error
|
||||
}
|
||||
|
||||
// WithObject is a Put option to set object to save.
|
||||
//
|
||||
// Option is required.
|
||||
|
@ -68,8 +83,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
return err
|
||||
}
|
||||
|
||||
finished := false
|
||||
|
||||
var shRes putToShardRes
|
||||
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
|
||||
e.mtx.RLock()
|
||||
pool, ok := e.shardPools[sh.ID().String()]
|
||||
|
@ -78,25 +92,26 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
// Shard was concurrently removed, skip.
|
||||
return false
|
||||
}
|
||||
|
||||
putDone, exists := e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
|
||||
finished = putDone || exists
|
||||
return finished
|
||||
shRes = e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
|
||||
return shRes.status != putToShardUnknown
|
||||
})
|
||||
|
||||
if !finished {
|
||||
err = errPutShard
|
||||
switch shRes.status {
|
||||
case putToShardUnknown:
|
||||
return errPutShard
|
||||
case putToShardRemoved:
|
||||
return shRes.err
|
||||
case putToShardExists, putToShardSuccess:
|
||||
return nil
|
||||
default:
|
||||
return errPutShard
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// putToShard puts object to sh.
|
||||
// First return value is true iff put has been successfully done.
|
||||
// Second return value is true iff object already exists.
|
||||
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
|
||||
var putSuccess, alreadyExists bool
|
||||
|
||||
// Return putToShardStatus and error if it is necessary to propagate an error upper.
|
||||
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool,
|
||||
addr oid.Address, obj *objectSDK.Object,
|
||||
) (res putToShardRes) {
|
||||
exitCh := make(chan struct{})
|
||||
|
||||
if err := pool.Submit(func() {
|
||||
|
@ -110,14 +125,13 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
|
|||
if shard.IsErrObjectExpired(err) {
|
||||
// object is already found but
|
||||
// expired => do nothing with it
|
||||
alreadyExists = true
|
||||
res.status = putToShardExists
|
||||
}
|
||||
|
||||
return // this is not ErrAlreadyRemoved error so we can go to the next shard
|
||||
}
|
||||
|
||||
alreadyExists = exists.Exists()
|
||||
if alreadyExists {
|
||||
if exists.Exists() {
|
||||
if ind != 0 {
|
||||
var toMoveItPrm shard.ToMoveItPrm
|
||||
toMoveItPrm.SetAddress(addr)
|
||||
|
@ -132,6 +146,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
|
|||
}
|
||||
}
|
||||
|
||||
res.status = putToShardExists
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -148,12 +163,21 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
|
|||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return
|
||||
}
|
||||
if client.IsErrObjectAlreadyRemoved(err) {
|
||||
e.log.Warn(logs.EngineCouldNotPutObjectToShard,
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
res.status = putToShardRemoved
|
||||
res.err = err
|
||||
return
|
||||
}
|
||||
|
||||
e.reportShardError(sh, "could not put object to shard", err)
|
||||
return
|
||||
}
|
||||
|
||||
putSuccess = true
|
||||
res.status = putToShardSuccess
|
||||
}); err != nil {
|
||||
e.log.Warn(logs.EngineCouldNotPutObjectToShard, zap.Error(err))
|
||||
close(exitCh)
|
||||
|
@ -161,7 +185,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
|
|||
|
||||
<-exitCh
|
||||
|
||||
return putSuccess, alreadyExists
|
||||
return
|
||||
}
|
||||
|
||||
// Put writes provided object to local storage.
|
||||
|
|
Loading…
Reference in a new issue