engine, policer: Properly process error already removed
#813
|
@ -406,21 +406,21 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
|||
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
|
||||
continue
|
||||
}
|
||||
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
|
||||
if putDone || exists {
|
||||
if putDone {
|
||||
res.evacuated.Add(1)
|
||||
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
|
||||
zap.Stringer("from", sh.ID()),
|
||||
zap.Stringer("to", shards[j].ID()),
|
||||
zap.Stringer("addr", addr),
|
||||
evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
if exists {
|
||||
res.skipped.Add(1)
|
||||
}
|
||||
switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status {
|
||||
case putToShardSuccess:
|
||||
res.evacuated.Add(1)
|
||||
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
|
||||
zap.Stringer("from", sh.ID()),
|
||||
zap.Stringer("to", shards[j].ID()),
|
||||
zap.Stringer("addr", addr),
|
||||
evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return true, nil
|
||||
case putToShardExists, putToShardRemoved:
|
||||
res.skipped.Add(1)
|
||||
return true, nil
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -26,6 +27,20 @@ type PutPrm struct {
|
|||
|
||||
var errPutShard = errors.New("could not put object to any shard")
|
||||
|
||||
type putToShardStatus byte
|
||||
|
||||
const (
|
||||
putToShardUnknown putToShardStatus = iota
|
||||
putToShardSuccess
|
||||
putToShardExists
|
||||
putToShardRemoved
|
||||
)
|
||||
|
||||
type putToShardRes struct {
|
||||
status putToShardStatus
|
||||
err error
|
||||
}
|
||||
|
||||
// WithObject is a Put option to set object to save.
|
||||
//
|
||||
// Option is required.
|
||||
|
@ -68,8 +83,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
return err
|
||||
}
|
||||
|
||||
finished := false
|
||||
|
||||
var shRes putToShardRes
|
||||
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
|
||||
e.mtx.RLock()
|
||||
pool, ok := e.shardPools[sh.ID().String()]
|
||||
|
@ -78,25 +92,26 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
// Shard was concurrently removed, skip.
|
||||
return false
|
||||
}
|
||||
|
||||
putDone, exists := e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
|
||||
finished = putDone || exists
|
||||
return finished
|
||||
shRes = e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
|
||||
return shRes.status != putToShardUnknown
|
||||
})
|
||||
|
||||
if !finished {
|
||||
err = errPutShard
|
||||
switch shRes.status {
|
||||
case putToShardUnknown:
|
||||
return errPutShard
|
||||
|
||||
case putToShardRemoved:
|
||||
return shRes.err
|
||||
case putToShardExists, putToShardSuccess:
|
||||
return nil
|
||||
default:
|
||||
fyrchik
commented
Having Having `return nil` in the default branch is dangerous IMO, because if we add and error and forget to add the case here, this is DL scenario (OK to client, don't save in reality).
acid-ant
commented
Agree, I've updated this switch. Agree, I've updated this switch.
|
||||
return errPutShard
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
How about returning nil for explicit statuses enumeration? How about returning nil for explicit statuses enumeration?
It is easy to miss new here and return OK without object being put.
fyrchik
commented
old comment, didn't see old comment, didn't see
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// putToShard puts object to sh.
|
||||
// First return value is true iff put has been successfully done.
|
||||
// Second return value is true iff object already exists.
|
||||
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
|
||||
var putSuccess, alreadyExists bool
|
||||
|
||||
// Return putToShardStatus and error if it is necessary to propagate an error upper.
|
||||
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool,
|
||||
addr oid.Address, obj *objectSDK.Object,
|
||||
) (res putToShardRes) {
|
||||
exitCh := make(chan struct{})
|
||||
dstepanov-yadro
commented
Comment is not actual Comment is not actual
acid-ant
commented
Updated, thanks. Updated, thanks.
|
||||
|
||||
if err := pool.Submit(func() {
|
||||
|
@ -110,14 +125,13 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
|
|||
if shard.IsErrObjectExpired(err) {
|
||||
// object is already found but
|
||||
// expired => do nothing with it
|
||||
alreadyExists = true
|
||||
res.status = putToShardExists
|
||||
}
|
||||
|
||||
return // this is not ErrAlreadyRemoved error so we can go to the next shard
|
||||
}
|
||||
|
||||
alreadyExists = exists.Exists()
|
||||
if alreadyExists {
|
||||
if exists.Exists() {
|
||||
if ind != 0 {
|
||||
var toMoveItPrm shard.ToMoveItPrm
|
||||
toMoveItPrm.SetAddress(addr)
|
||||
|
@ -132,6 +146,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
|
|||
}
|
||||
}
|
||||
|
||||
res.status = putToShardExists
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -148,12 +163,21 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
|
|||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return
|
||||
}
|
||||
if client.IsErrObjectAlreadyRemoved(err) {
|
||||
e.log.Warn(logs.EngineCouldNotPutObjectToShard,
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
fyrchik
commented
If we remove an object with If we remove an object with `frostfs-cli control drop-objects`, what kind of error will we receive?
acid-ant
commented
`frostfs-cli control drop-objects` calls `shard.Inhume()`, so we will have here `AlreadyRemoved`.
|
||||
zap.String("error", err.Error()),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
res.status = putToShardRemoved
|
||||
res.err = err
|
||||
return
|
||||
}
|
||||
|
||||
e.reportShardError(sh, "could not put object to shard", err)
|
||||
return
|
||||
}
|
||||
|
||||
putSuccess = true
|
||||
res.status = putToShardSuccess
|
||||
}); err != nil {
|
||||
e.log.Warn(logs.EngineCouldNotPutObjectToShard, zap.Error(err))
|
||||
close(exitCh)
|
||||
|
@ -161,7 +185,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
|
|||
|
||||
<-exitCh
|
||||
|
||||
return putSuccess, alreadyExists
|
||||
return
|
||||
}
|
||||
|
||||
// Put writes provided object to local storage.
|
||||
|
|
|
@ -132,21 +132,21 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
|||
|
||||
cancel()
|
||||
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
fyrchik
commented
What do we achiev with this refactoring? What do we achiev with this refactoring?
acid-ant
commented
Had a decision to keep this refactoring because Had a decision to keep this refactoring because `err` will be `nil` more often, so it is not necessary to check for the type of the error at first. In normal situation, node holds replicas.
|
||||
checkedNodes.submitReplicaCandidate(nodes[i])
|
||||
continue
|
||||
}
|
||||
|
||||
if isClientErrMaintenance(err) {
|
||||
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
|
||||
} else if err != nil {
|
||||
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
|
||||
zap.Stringer("object", addr),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
} else {
|
||||
if err == nil {
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
If object is already removed on other node, so we can just skip it on current node, because it will be skiped on the next policer run. If object is already removed on other node, so we can just skip it on current node, because it will be skiped on the next policer run.
acid-ant
commented
Updated a bit, propagate error to the upper layer. Updated a bit, propagate error to the upper layer.
|
||||
shortage--
|
||||
checkedNodes.submitReplicaHolder(nodes[i])
|
||||
} else {
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
checkedNodes.submitReplicaCandidate(nodes[i])
|
||||
continue
|
||||
fyrchik
commented
Do we need the changes in policer even after we have changed logic in shard.put? Do we need the changes in policer even after we have changed logic in shard.put?
acid-ant
commented
I think yes. The idea here is to fail faster and do not process other nodes in queue. I think yes. The idea here is to fail faster and do not process other nodes in queue.
|
||||
} else if isClientErrMaintenance(err) {
|
||||
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
|
||||
} else {
|
||||
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
|
||||
zap.Stringer("object", addr),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Can we introduce
putToShardRes
now? Return values are named here, but it is easy to misorder on the callsize.Yeah, thought about this too. If others do not mind about this, I'll add it.
@TrueCloudLab/storage-core-developers @TrueCloudLab/storage-core-committers