[#9999] evacuate: Add read timeout
Some checks failed
Vulncheck / Vulncheck (pull_request) Failing after 22s
Tests and linters / Lint (pull_request) Failing after 45s
DCO action / DCO (pull_request) Successful in 55s
Build / Build Components (pull_request) Successful in 1m32s
Tests and linters / Staticcheck (pull_request) Successful in 1m58s
Tests and linters / gopls check (pull_request) Successful in 3m17s
Tests and linters / Tests with -race (pull_request) Successful in 3m21s
Tests and linters / Tests (pull_request) Failing after 46s
Some checks failed
Vulncheck / Vulncheck (pull_request) Failing after 22s
Tests and linters / Lint (pull_request) Failing after 45s
DCO action / DCO (pull_request) Successful in 55s
Build / Build Components (pull_request) Successful in 1m32s
Tests and linters / Staticcheck (pull_request) Successful in 1m58s
Tests and linters / gopls check (pull_request) Successful in 3m17s
Tests and linters / Tests with -race (pull_request) Successful in 3m21s
Tests and linters / Tests (pull_request) Failing after 46s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
9d1c915c42
commit
a8f11b2595
1 changed files with 30 additions and 1 deletions
|
@ -7,6 +7,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
|
@ -622,6 +623,8 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
|||
return shards, nil
|
||||
}
|
||||
|
||||
var incompletedEvacuationReads atomic.Int32
|
||||
|
||||
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) error {
|
||||
|
@ -642,7 +645,33 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
|||
var getPrm shard.GetPrm
|
||||
getPrm.SetAddress(addr)
|
||||
|
||||
getRes, err := sh.Get(ctx, getPrm)
|
||||
if incompletedEvacuationReads.Load() > 10_000 {
|
||||
return errors.New("too many objects failed to read: restart service required")
|
||||
}
|
||||
|
||||
var getRes shard.GetRes
|
||||
var err error
|
||||
done := make(chan interface{})
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
close(done)
|
||||
incompletedEvacuationReads.Add(-1)
|
||||
}()
|
||||
|
||||
incompletedEvacuationReads.Add(1)
|
||||
getRes, err = sh.Get(ctx, getPrm)
|
||||
}()
|
||||
|
||||
timer := time.NewTimer(2 * time.Minute)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-timer.C:
|
||||
err = errors.New("failed to read object by timeout")
|
||||
case <-done:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
res.objFailed.Add(1)
|
||||
|
|
Loading…
Reference in a new issue