[#1] mclock: Refactor scheduleRequest

Split to `scheduleRequest` and `scheduleRequestUnsafe`.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-01-28 11:23:07 +03:00
parent d8663f1a74
commit 9a48a50220
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
2 changed files with 15 additions and 13 deletions

View file

@ -236,7 +236,7 @@ func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
} }
heap.Push(q.sharesQueue, &sharesMQueueItem{r: r}) heap.Push(q.sharesQueue, &sharesMQueueItem{r: r})
heap.Push(q.limitQueue, &limitMQueueItem{r: r}) heap.Push(q.limitQueue, &limitMQueueItem{r: r})
q.scheduleRequest(true) q.scheduleRequestUnsafe()
return r, q.requestCompleted, nil return r, q.requestCompleted, nil
} }
@ -261,12 +261,14 @@ func (q *MClock) adjustTags(now float64, idleTag string) {
} }
} }
func (q *MClock) scheduleRequest(lockTaken bool) { func (q *MClock) scheduleRequest() {
if !lockTaken {
q.mtx.Lock() q.mtx.Lock()
defer q.mtx.Unlock() defer q.mtx.Unlock()
}
q.scheduleRequestUnsafe()
}
func (q *MClock) scheduleRequestUnsafe() {
if q.inProgress >= q.runLimit { if q.inProgress >= q.runLimit {
return return
} }
@ -297,7 +299,7 @@ func (q *MClock) setNextScheduleTimer(now float64) {
if q.timeBasedScheduleTs > nextTs { if q.timeBasedScheduleTs > nextTs {
q.clock.runAt(nextTs, func() { q.clock.runAt(nextTs, func() {
q.scheduleRequest(false) q.scheduleRequest()
}) })
q.timeBasedScheduleTs = nextTs q.timeBasedScheduleTs = nextTs
} }
@ -392,7 +394,7 @@ func (q *MClock) requestCompleted() {
panic("invalid requests count") panic("invalid requests count")
} }
q.inProgress-- q.inProgress--
q.scheduleRequest(true) q.scheduleRequestUnsafe()
} }
func assertIndexInvalid(r *request) { func assertIndexInvalid(r *request) {

View file

@ -149,7 +149,7 @@ func TestMClockLimitScheduling(t *testing.T) {
releases = append(releases, release) releases = append(releases, release)
} }
q.scheduleRequest(false) q.scheduleRequest()
for _, req := range requests { for _, req := range requests {
select { select {
@ -172,7 +172,7 @@ func TestMClockLimitScheduling(t *testing.T) {
releases[i]() releases[i]()
}() }()
} }
q.scheduleRequest(false) q.scheduleRequest()
wg.Wait() wg.Wait()
// Requests must be scheduled as class1->class1->class2->class1->class1->class2..., // Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
@ -235,7 +235,7 @@ func TestMClockReservationScheduling(t *testing.T) {
releases = append(releases, release) releases = append(releases, release)
} }
q.scheduleRequest(false) q.scheduleRequest()
for _, req := range requests { for _, req := range requests {
select { select {
@ -246,7 +246,7 @@ func TestMClockReservationScheduling(t *testing.T) {
} }
cl.v = 1.00001 // 1s elapsed cl.v = 1.00001 // 1s elapsed
q.scheduleRequest(false) q.scheduleRequest()
var result []string var result []string
for i, req := range requests { for i, req := range requests {
@ -264,7 +264,7 @@ func TestMClockReservationScheduling(t *testing.T) {
} }
cl.v = math.MaxFloat64 cl.v = math.MaxFloat64
q.scheduleRequest(false) q.scheduleRequest()
require.Equal(t, 0, q.readyQueue.Len()) require.Equal(t, 0, q.readyQueue.Len())
require.Equal(t, 0, q.sharesQueue.Len()) require.Equal(t, 0, q.sharesQueue.Len())