engine, policer: Properly process error already removed #813

Merged
fyrchik merged 2 commits from acid-ant/frostfs-node:bugfix/799-policer-skip-already-removed into master 2024-02-01 17:49:25 +00:00
3 changed files with 72 additions and 48 deletions

View file

@ -406,9 +406,8 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
if putDone || exists {
if putDone {
switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status {
case putToShardSuccess:
res.evacuated.Add(1)
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
@ -416,11 +415,12 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
zap.Stringer("addr", addr),
evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
if exists {
res.skipped.Add(1)
}
return true, nil
case putToShardExists, putToShardRemoved:
res.skipped.Add(1)
return true, nil
default:
continue
}
}

View file

@ -12,6 +12,7 @@ import (
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
@ -26,6 +27,20 @@ type PutPrm struct {
var errPutShard = errors.New("could not put object to any shard")
type putToShardStatus byte
const (
putToShardUnknown putToShardStatus = iota
putToShardSuccess
putToShardExists
putToShardRemoved
)
type putToShardRes struct {
status putToShardStatus
err error
}
// WithObject is a Put option to set object to save.
//
// Option is required.
@ -68,8 +83,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
return err
}
finished := false
var shRes putToShardRes
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
e.mtx.RLock()
pool, ok := e.shardPools[sh.ID().String()]
@ -78,25 +92,26 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// Shard was concurrently removed, skip.
return false
}
putDone, exists := e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
finished = putDone || exists
return finished
shRes = e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
return shRes.status != putToShardUnknown
})
if !finished {
err = errPutShard
switch shRes.status {
case putToShardUnknown:
return errPutShard

Can we introduce putToShardRes now? Return values are named here, but it is easy to misorder on the callsize.

Can we introduce `putToShardRes` now? Return values are named here, but it is easy to misorder on the callsize.

Yeah, thought about this too. If others do not mind about this, I'll add it.
@TrueCloudLab/storage-core-developers @TrueCloudLab/storage-core-committers

Yeah, thought about this too. If others do not mind about this, I'll add it. @TrueCloudLab/storage-core-developers @TrueCloudLab/storage-core-committers
case putToShardRemoved:
return shRes.err
case putToShardExists, putToShardSuccess:
return nil
default:

Having return nil in the default branch is dangerous IMO, because if we add and error and forget to add the case here, this is DL scenario (OK to client, don't save in reality).

Having `return nil` in the default branch is dangerous IMO, because if we add and error and forget to add the case here, this is DL scenario (OK to client, don't save in reality).

Agree, I've updated this switch.

Agree, I've updated this switch.
return errPutShard
fyrchik marked this conversation as resolved Outdated

How about returning nil for explicit statuses enumeration?
It is easy to miss new here and return OK without object being put.

How about returning nil for explicit statuses enumeration? It is easy to miss new here and return OK without object being put.

old comment, didn't see

old comment, didn't see
}
return err
}
// putToShard puts object to sh.
// First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
var putSuccess, alreadyExists bool
// Return putToShardStatus and error if it is necessary to propagate an error upper.
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool,
addr oid.Address, obj *objectSDK.Object,
) (res putToShardRes) {
exitCh := make(chan struct{})

Comment is not actual

Comment is not actual

Updated, thanks.

Updated, thanks.
if err := pool.Submit(func() {
@ -110,14 +125,13 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
if shard.IsErrObjectExpired(err) {
// object is already found but
// expired => do nothing with it
alreadyExists = true
res.status = putToShardExists
}
return // this is not ErrAlreadyRemoved error so we can go to the next shard
}
alreadyExists = exists.Exists()
if alreadyExists {
if exists.Exists() {
if ind != 0 {
var toMoveItPrm shard.ToMoveItPrm
toMoveItPrm.SetAddress(addr)
@ -132,6 +146,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
}
}
res.status = putToShardExists
return
}
@ -148,12 +163,21 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return
}
if client.IsErrObjectAlreadyRemoved(err) {
e.log.Warn(logs.EngineCouldNotPutObjectToShard,
zap.Stringer("shard_id", sh.ID()),

If we remove an object with frostfs-cli control drop-objects, what kind of error will we receive?

If we remove an object with `frostfs-cli control drop-objects`, what kind of error will we receive?

frostfs-cli control drop-objects calls shard.Inhume(), so we will have here AlreadyRemoved.

`frostfs-cli control drop-objects` calls `shard.Inhume()`, so we will have here `AlreadyRemoved`.
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
res.status = putToShardRemoved
res.err = err
return
}
e.reportShardError(sh, "could not put object to shard", err)
return
}
putSuccess = true
res.status = putToShardSuccess
}); err != nil {
e.log.Warn(logs.EngineCouldNotPutObjectToShard, zap.Error(err))
close(exitCh)
@ -161,7 +185,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
<-exitCh
return putSuccess, alreadyExists
return
}
// Put writes provided object to local storage.

View file

@ -132,21 +132,21 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
cancel()
if err == nil {
dstepanov-yadro marked this conversation as resolved Outdated

If object is already removed on other node, so we can just skip it on current node, because it will be skiped on the next policer run.

If object is already removed on other node, so we can just skip it on current node, because it will be skiped on the next policer run.

Updated a bit, propagate error to the upper layer.

Updated a bit, propagate error to the upper layer.
shortage--
checkedNodes.submitReplicaHolder(nodes[i])
} else {
if client.IsErrObjectNotFound(err) {

What do we achiev with this refactoring?

What do we achiev with this refactoring?

Had a decision to keep this refactoring because err will be nil more often, so it is not necessary to check for the type of the error at first. In normal situation, node holds replicas.

Had a decision to keep this refactoring because `err` will be `nil` more often, so it is not necessary to check for the type of the error at first. In normal situation, node holds replicas.
checkedNodes.submitReplicaCandidate(nodes[i])
continue

Do we need the changes in policer even after we have changed logic in shard.put?

Do we need the changes in policer even after we have changed logic in shard.put?

I think yes. The idea here is to fail faster and do not process other nodes in queue.

I think yes. The idea here is to fail faster and do not process other nodes in queue.
}
if isClientErrMaintenance(err) {
} else if isClientErrMaintenance(err) {
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
} else if err != nil {
} else {
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
zap.Stringer("object", addr),
zap.String("error", err.Error()),
)
} else {
shortage--
checkedNodes.submitReplicaHolder(nodes[i])
}
}
}