[#1705] engine: Use condition var for evacuation unit tests
All checks were successful
Vulncheck / Vulncheck (push) Successful in 1m10s
Pre-commit hooks / Pre-commit (push) Successful in 1m34s
Build / Build Components (push) Successful in 1m52s
Tests and linters / gopls check (push) Successful in 3m51s
Tests and linters / Run gofumpt (push) Successful in 3m55s
Tests and linters / Staticcheck (push) Successful in 4m35s
Tests and linters / Lint (push) Successful in 4m45s
Tests and linters / Tests (push) Successful in 4m46s
OCI image / Build container images (push) Successful in 5m0s
Tests and linters / Tests with -race (push) Successful in 6m7s

To know exactly when the evacuation was completed,
a conditional variable was added.

Closes #1705

Change-Id: I86f6d7d2ad2b9759905b6b5e9341008cb74f5dfd
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-04-10 16:04:04 +03:00
parent 64c1392513
commit e06ecacf57
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
3 changed files with 29 additions and 11 deletions

View file

@ -212,12 +212,18 @@ func New(opts ...Option) *StorageEngine {
opts[i](c)
}
evLimMtx := &sync.RWMutex{}
evLimCond := sync.NewCond(evLimMtx)
return &StorageEngine{
cfg: c,
shards: make(map[string]hashedShard),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{},
cfg: c,
shards: make(map[string]hashedShard),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{
guard: evLimMtx,
statusCond: evLimCond,
},
}
}

View file

@ -139,7 +139,8 @@ type evacuationLimiter struct {
eg *errgroup.Group
cancel context.CancelFunc
guard sync.RWMutex
guard *sync.RWMutex
statusCond *sync.Cond // used in unit tests
}
func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, result *EvacuateShardRes) (*errgroup.Group, context.Context, error) {
@ -165,6 +166,7 @@ func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, res
startedAt: time.Now().UTC(),
result: result,
}
l.statusCond.Broadcast()
return l.eg, egCtx, nil
}
@ -180,6 +182,7 @@ func (l *evacuationLimiter) Complete(err error) {
l.state.processState = EvacuateProcessStateCompleted
l.state.errMessage = errMsq
l.state.finishedAt = time.Now().UTC()
l.statusCond.Broadcast()
l.eg = nil
}
@ -214,6 +217,7 @@ func (l *evacuationLimiter) ResetEvacuationStatus() error {
l.state = EvacuationState{}
l.eg = nil
l.cancel = nil
l.statusCond.Broadcast()
return nil
}

View file

@ -204,11 +204,10 @@ func TestEvacuateShardObjects(t *testing.T) {
func testWaitForEvacuationCompleted(t *testing.T, e *StorageEngine) *EvacuationState {
var st *EvacuationState
var err error
require.Eventually(t, func() bool {
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err)
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 6*time.Second, 10*time.Millisecond)
e.evacuateLimiter.waitForCompleted()
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err)
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
return st
}
@ -817,3 +816,12 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
t.Logf("evacuate took %v\n", time.Since(start))
require.NoError(t, err)
}
func (l *evacuationLimiter) waitForCompleted() {
l.guard.Lock()
defer l.guard.Unlock()
for l.state.processState != EvacuateProcessStateCompleted {
l.statusCond.Wait()
}
}