engine, policer: Properly process error already removed #813

Merged
fyrchik merged 2 commits from acid-ant/frostfs-node:bugfix/799-policer-skip-already-removed into master 2024-02-01 17:49:25 +00:00
3 changed files with 72 additions and 48 deletions

View file

@ -406,21 +406,21 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok { if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
continue continue
} }
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object) switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status {
if putDone || exists { case putToShardSuccess:
if putDone { res.evacuated.Add(1)
res.evacuated.Add(1) e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard, zap.Stringer("from", sh.ID()),
zap.Stringer("from", sh.ID()), zap.Stringer("to", shards[j].ID()),
zap.Stringer("to", shards[j].ID()), zap.Stringer("addr", addr),
zap.Stringer("addr", addr), evacuationOperationLogField,
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
if exists {
res.skipped.Add(1)
}
return true, nil return true, nil
case putToShardExists, putToShardRemoved:
res.skipped.Add(1)
return true, nil
default:
continue
} }
} }

View file

@ -12,6 +12,7 @@ import (
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -26,6 +27,20 @@ type PutPrm struct {
var errPutShard = errors.New("could not put object to any shard") var errPutShard = errors.New("could not put object to any shard")
type putToShardStatus byte
const (
putToShardUnknown putToShardStatus = iota
putToShardSuccess
putToShardExists
putToShardRemoved
)
type putToShardRes struct {
status putToShardStatus
err error
}
// WithObject is a Put option to set object to save. // WithObject is a Put option to set object to save.
// //
// Option is required. // Option is required.
@ -68,8 +83,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
return err return err
} }
finished := false var shRes putToShardRes
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
e.mtx.RLock() e.mtx.RLock()
pool, ok := e.shardPools[sh.ID().String()] pool, ok := e.shardPools[sh.ID().String()]
@ -78,25 +92,26 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// Shard was concurrently removed, skip. // Shard was concurrently removed, skip.
return false return false
} }
shRes = e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
putDone, exists := e.putToShard(ctx, sh, ind, pool, addr, prm.obj) return shRes.status != putToShardUnknown
finished = putDone || exists
return finished
}) })
switch shRes.status {
if !finished { case putToShardUnknown:
err = errPutShard return errPutShard

Can we introduce putToShardRes now? Return values are named here, but it is easy to misorder on the callsize.

Can we introduce `putToShardRes` now? Return values are named here, but it is easy to misorder on the callsize.

Yeah, thought about this too. If others do not mind about this, I'll add it.
@TrueCloudLab/storage-core-developers @TrueCloudLab/storage-core-committers

Yeah, thought about this too. If others do not mind about this, I'll add it. @TrueCloudLab/storage-core-developers @TrueCloudLab/storage-core-committers
case putToShardRemoved:
return shRes.err
case putToShardExists, putToShardSuccess:
return nil
default:

Having return nil in the default branch is dangerous IMO, because if we add and error and forget to add the case here, this is DL scenario (OK to client, don't save in reality).

Having `return nil` in the default branch is dangerous IMO, because if we add and error and forget to add the case here, this is DL scenario (OK to client, don't save in reality).

Agree, I've updated this switch.

Agree, I've updated this switch.
return errPutShard
fyrchik marked this conversation as resolved Outdated

How about returning nil for explicit statuses enumeration?
It is easy to miss new here and return OK without object being put.

How about returning nil for explicit statuses enumeration? It is easy to miss new here and return OK without object being put.

old comment, didn't see

old comment, didn't see
} }
return err
} }
// putToShard puts object to sh. // putToShard puts object to sh.
// First return value is true iff put has been successfully done. // Return putToShardStatus and error if it is necessary to propagate an error upper.
// Second return value is true iff object already exists. func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool,
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) { addr oid.Address, obj *objectSDK.Object,
var putSuccess, alreadyExists bool ) (res putToShardRes) {
exitCh := make(chan struct{}) exitCh := make(chan struct{})

Comment is not actual

Comment is not actual

Updated, thanks.

Updated, thanks.
if err := pool.Submit(func() { if err := pool.Submit(func() {
@ -110,14 +125,13 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
if shard.IsErrObjectExpired(err) { if shard.IsErrObjectExpired(err) {
// object is already found but // object is already found but
// expired => do nothing with it // expired => do nothing with it
alreadyExists = true res.status = putToShardExists
} }
return // this is not ErrAlreadyRemoved error so we can go to the next shard return // this is not ErrAlreadyRemoved error so we can go to the next shard
} }
alreadyExists = exists.Exists() if exists.Exists() {
if alreadyExists {
if ind != 0 { if ind != 0 {
var toMoveItPrm shard.ToMoveItPrm var toMoveItPrm shard.ToMoveItPrm
toMoveItPrm.SetAddress(addr) toMoveItPrm.SetAddress(addr)
@ -132,6 +146,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
} }
} }
res.status = putToShardExists
return return
} }
@ -148,12 +163,21 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return return
} }
if client.IsErrObjectAlreadyRemoved(err) {
e.log.Warn(logs.EngineCouldNotPutObjectToShard,
zap.Stringer("shard_id", sh.ID()),

If we remove an object with frostfs-cli control drop-objects, what kind of error will we receive?

If we remove an object with `frostfs-cli control drop-objects`, what kind of error will we receive?

frostfs-cli control drop-objects calls shard.Inhume(), so we will have here AlreadyRemoved.

`frostfs-cli control drop-objects` calls `shard.Inhume()`, so we will have here `AlreadyRemoved`.
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
res.status = putToShardRemoved
res.err = err
return
}
e.reportShardError(sh, "could not put object to shard", err) e.reportShardError(sh, "could not put object to shard", err)
return return
} }
putSuccess = true res.status = putToShardSuccess
}); err != nil { }); err != nil {
e.log.Warn(logs.EngineCouldNotPutObjectToShard, zap.Error(err)) e.log.Warn(logs.EngineCouldNotPutObjectToShard, zap.Error(err))
close(exitCh) close(exitCh)
@ -161,7 +185,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
<-exitCh <-exitCh
return putSuccess, alreadyExists return
} }
// Put writes provided object to local storage. // Put writes provided object to local storage.

View file

@ -132,21 +132,21 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
cancel() cancel()
if client.IsErrObjectNotFound(err) { if err == nil {

What do we achiev with this refactoring?

What do we achiev with this refactoring?

Had a decision to keep this refactoring because err will be nil more often, so it is not necessary to check for the type of the error at first. In normal situation, node holds replicas.

Had a decision to keep this refactoring because `err` will be `nil` more often, so it is not necessary to check for the type of the error at first. In normal situation, node holds replicas.
dstepanov-yadro marked this conversation as resolved Outdated

If object is already removed on other node, so we can just skip it on current node, because it will be skiped on the next policer run.

If object is already removed on other node, so we can just skip it on current node, because it will be skiped on the next policer run.

Updated a bit, propagate error to the upper layer.

Updated a bit, propagate error to the upper layer.
checkedNodes.submitReplicaCandidate(nodes[i])
continue
}
if isClientErrMaintenance(err) {
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
} else if err != nil {
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
zap.Stringer("object", addr),
zap.String("error", err.Error()),
)
} else {
shortage-- shortage--
checkedNodes.submitReplicaHolder(nodes[i]) checkedNodes.submitReplicaHolder(nodes[i])
} else {
if client.IsErrObjectNotFound(err) {
checkedNodes.submitReplicaCandidate(nodes[i])
continue

Do we need the changes in policer even after we have changed logic in shard.put?

Do we need the changes in policer even after we have changed logic in shard.put?

I think yes. The idea here is to fail faster and do not process other nodes in queue.

I think yes. The idea here is to fail faster and do not process other nodes in queue.
} else if isClientErrMaintenance(err) {
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
} else {
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
zap.Stringer("object", addr),
zap.String("error", err.Error()),
)
}
} }
} }