engine, policer: Properly process error already removed #813

Merged
fyrchik merged 2 commits from acid-ant/frostfs-node:bugfix/799-policer-skip-already-removed into master 2024-02-01 17:49:25 +00:00
3 changed files with 72 additions and 48 deletions

View file

@ -406,21 +406,21 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
if putDone || exists {
if putDone {
res.evacuated.Add(1)
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", addr),
evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
if exists {
res.skipped.Add(1)
}
switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status {
case putToShardSuccess:
res.evacuated.Add(1)
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", addr),
evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return true, nil
case putToShardExists, putToShardRemoved:
res.skipped.Add(1)
return true, nil
default:
continue
}
}

View file

@ -12,6 +12,7 @@ import (
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
@ -26,6 +27,20 @@ type PutPrm struct {
var errPutShard = errors.New("could not put object to any shard")
type putToShardStatus byte
const (
putToShardUnknown putToShardStatus = iota
putToShardSuccess
putToShardExists
putToShardRemoved
)
type putToShardRes struct {
status putToShardStatus
err error
}
// WithObject is a Put option to set object to save.
//
// Option is required.
@ -68,8 +83,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
return err
}
finished := false
var shRes putToShardRes
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
e.mtx.RLock()
pool, ok := e.shardPools[sh.ID().String()]
@ -78,25 +92,26 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// Shard was concurrently removed, skip.
return false
}
putDone, exists := e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
finished = putDone || exists
return finished
shRes = e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
return shRes.status != putToShardUnknown
})
if !finished {
err = errPutShard
switch shRes.status {
case putToShardUnknown:
return errPutShard
case putToShardRemoved:
return shRes.err
case putToShardExists, putToShardSuccess:
return nil
default:
return errPutShard
}
return err
}
// putToShard puts object to sh.
// First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
var putSuccess, alreadyExists bool
// Return putToShardStatus and error if it is necessary to propagate an error upper.
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool,
addr oid.Address, obj *objectSDK.Object,
) (res putToShardRes) {
exitCh := make(chan struct{})
if err := pool.Submit(func() {
@ -110,14 +125,13 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
if shard.IsErrObjectExpired(err) {
// object is already found but
// expired => do nothing with it
alreadyExists = true
res.status = putToShardExists
}
return // this is not ErrAlreadyRemoved error so we can go to the next shard
}
alreadyExists = exists.Exists()
if alreadyExists {
if exists.Exists() {
if ind != 0 {
var toMoveItPrm shard.ToMoveItPrm
toMoveItPrm.SetAddress(addr)
@ -132,6 +146,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
}
}
res.status = putToShardExists
return
}
@ -148,12 +163,21 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return
}
if client.IsErrObjectAlreadyRemoved(err) {
e.log.Warn(logs.EngineCouldNotPutObjectToShard,
zap.Stringer("shard_id", sh.ID()),
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
res.status = putToShardRemoved
res.err = err
return
}
e.reportShardError(sh, "could not put object to shard", err)
return
}
putSuccess = true
res.status = putToShardSuccess
}); err != nil {
e.log.Warn(logs.EngineCouldNotPutObjectToShard, zap.Error(err))
close(exitCh)
@ -161,7 +185,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
<-exitCh
return putSuccess, alreadyExists
return
}
// Put writes provided object to local storage.

View file

@ -132,21 +132,21 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
cancel()
if client.IsErrObjectNotFound(err) {
checkedNodes.submitReplicaCandidate(nodes[i])
continue
}
if isClientErrMaintenance(err) {
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
} else if err != nil {
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
zap.Stringer("object", addr),
zap.String("error", err.Error()),
)
} else {
if err == nil {
shortage--
checkedNodes.submitReplicaHolder(nodes[i])
} else {
if client.IsErrObjectNotFound(err) {
checkedNodes.submitReplicaCandidate(nodes[i])
continue
} else if isClientErrMaintenance(err) {
shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies)
} else {
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance,
zap.Stringer("object", addr),
zap.String("error", err.Error()),
)
}
}
}