From 9a48a50220eae676b61a72c5cc4199100d96f901 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 28 Jan 2025 11:23:07 +0300 Subject: [PATCH] [#1] mclock: Refactor `scheduleRequest` Split to `scheduleRequest` and `scheduleRequestUnsafe`. Signed-off-by: Dmitrii Stepanov --- scheduling/mclock.go | 18 ++++++++++-------- scheduling/mclock_test.go | 10 +++++----- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/scheduling/mclock.go b/scheduling/mclock.go index d50e60e..a38f6f3 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -236,7 +236,7 @@ func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) { } heap.Push(q.sharesQueue, &sharesMQueueItem{r: r}) heap.Push(q.limitQueue, &limitMQueueItem{r: r}) - q.scheduleRequest(true) + q.scheduleRequestUnsafe() return r, q.requestCompleted, nil } @@ -261,12 +261,14 @@ func (q *MClock) adjustTags(now float64, idleTag string) { } } -func (q *MClock) scheduleRequest(lockTaken bool) { - if !lockTaken { - q.mtx.Lock() - defer q.mtx.Unlock() - } +func (q *MClock) scheduleRequest() { + q.mtx.Lock() + defer q.mtx.Unlock() + q.scheduleRequestUnsafe() +} + +func (q *MClock) scheduleRequestUnsafe() { if q.inProgress >= q.runLimit { return } @@ -297,7 +299,7 @@ func (q *MClock) setNextScheduleTimer(now float64) { if q.timeBasedScheduleTs > nextTs { q.clock.runAt(nextTs, func() { - q.scheduleRequest(false) + q.scheduleRequest() }) q.timeBasedScheduleTs = nextTs } @@ -392,7 +394,7 @@ func (q *MClock) requestCompleted() { panic("invalid requests count") } q.inProgress-- - q.scheduleRequest(true) + q.scheduleRequestUnsafe() } func assertIndexInvalid(r *request) { diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go index 9cebe05..3aa261f 100644 --- a/scheduling/mclock_test.go +++ b/scheduling/mclock_test.go @@ -149,7 +149,7 @@ func TestMClockLimitScheduling(t *testing.T) { releases = append(releases, release) } - q.scheduleRequest(false) + q.scheduleRequest() for _, req := range requests { select { @@ -172,7 +172,7 @@ func TestMClockLimitScheduling(t *testing.T) { releases[i]() }() } - q.scheduleRequest(false) + q.scheduleRequest() wg.Wait() // Requests must be scheduled as class1->class1->class2->class1->class1->class2..., @@ -235,7 +235,7 @@ func TestMClockReservationScheduling(t *testing.T) { releases = append(releases, release) } - q.scheduleRequest(false) + q.scheduleRequest() for _, req := range requests { select { @@ -246,7 +246,7 @@ func TestMClockReservationScheduling(t *testing.T) { } cl.v = 1.00001 // 1s elapsed - q.scheduleRequest(false) + q.scheduleRequest() var result []string for i, req := range requests { @@ -264,7 +264,7 @@ func TestMClockReservationScheduling(t *testing.T) { } cl.v = math.MaxFloat64 - q.scheduleRequest(false) + q.scheduleRequest() require.Equal(t, 0, q.readyQueue.Len()) require.Equal(t, 0, q.sharesQueue.Len())