diff --git a/scheduling/mclock.go b/scheduling/mclock.go index 82037d6..213b3cf 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -14,6 +14,7 @@ import ( const ( invalidIndex = -1 undefinedReservation float64 = -1.0 + minusOne = ^uint64(0) ) var ( @@ -60,6 +61,8 @@ type MClock struct { clock clock idleTimeout float64 tagInfo map[string]TagInfo + tagStat map[string]*Stat + stats []*Stat mtx sync.Mutex previous map[string]*request @@ -110,6 +113,16 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo } result.previous = previous + result.tagStat = make(map[string]*Stat, len(tagInfo)) + result.stats = make([]*Stat, 0, len(tagInfo)) + for tag := range tagInfo { + s := &Stat{ + tag: tag, + } + result.tagStat[tag] = s + result.stats = append(result.stats, s) + } + return result, nil } @@ -150,6 +163,12 @@ func (q *MClock) Close() { } } +// Stats returns per tag stat. +// Returned slice should not be modified. +func (q *MClock) Stats() []*Stat { + return q.stats +} + func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error { if runLimit == 0 { return ErrInvalidRunLimit @@ -172,11 +191,16 @@ func (q *MClock) dropRequest(req *request) { q.mtx.Lock() defer q.mtx.Unlock() + s, ok := q.tagStat[req.tag] + assert.Cond(ok, "undefined stat tag:", req.tag) + select { case <-req.scheduled: assert.Cond(q.inProgress > 0, "invalid requests count") q.inProgress-- + s.inProgress.Add(minusOne) default: + s.pending.Add(minusOne) } q.removeFromQueues(req) @@ -234,9 +258,14 @@ func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) { } heap.Push(q.sharesQueue, &sharesMQueueItem{r: r}) heap.Push(q.limitQueue, &limitMQueueItem{r: r}) + + s, ok := q.tagStat[tag] + assert.Cond(ok, "undefined stat tag:", tag) + s.pending.Add(1) + q.scheduleRequestUnsafe() - return r, q.requestCompleted, nil + return r, func() { q.requestCompleted(tag) }, nil } func (q *MClock) adjustTags(now float64, idleTag string) { @@ -318,6 +347,10 @@ func (q *MClock) scheduleByLimitAndWeight(now float64) { } q.removeFromQueues(next.r) + s, ok := q.tagStat[next.r.tag] + assert.Cond(ok, "undefined stat tag:", next.r.tag) + s.pending.Add(minusOne) + tagInfo, ok := q.tagInfo[next.r.tag] assert.Cond(ok, "unknown tag:", next.r.tag) if tagInfo.ReservedIOPS != nil && hadReservation { @@ -342,6 +375,7 @@ func (q *MClock) scheduleByLimitAndWeight(now float64) { assertIndexInvalid(next.r) q.inProgress++ + s.inProgress.Add(1) close(next.r.scheduled) } } @@ -351,6 +385,10 @@ func (q *MClock) scheduleByReservation(now float64) { next := heap.Pop(q.reservationQueue).(*reservationMQueueItem) q.removeFromQueues(next.r) + s, ok := q.tagStat[next.r.tag] + assert.Cond(ok, "undefined stat tag:", next.r.tag) + s.pending.Add(minusOne) + select { case <-next.r.canceled: continue @@ -359,6 +397,7 @@ func (q *MClock) scheduleByReservation(now float64) { assertIndexInvalid(next.r) q.inProgress++ + s.inProgress.Add(1) close(next.r.scheduled) } } @@ -378,7 +417,7 @@ func (q *MClock) removeFromQueues(r *request) { } } -func (q *MClock) requestCompleted() { +func (q *MClock) requestCompleted(tag string) { q.mtx.Lock() defer q.mtx.Unlock() @@ -388,6 +427,9 @@ func (q *MClock) requestCompleted() { assert.Cond(q.inProgress > 0, "invalid requests count") q.inProgress-- + s, ok := q.tagStat[tag] + assert.Cond(ok, "undefined stat tag:", tag) + s.inProgress.Add(minusOne) q.scheduleRequestUnsafe() } diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go index 3aa261f..90b3b5a 100644 --- a/scheduling/mclock_test.go +++ b/scheduling/mclock_test.go @@ -39,6 +39,21 @@ func TestMClockSharesScheduling(t *testing.T) { releases = append(releases, release) } + stats := q.Stats() + require.Equal(t, 2, len(stats)) + for _, s := range stats { + switch s.Tag() { + case "class1": + require.Equal(t, uint64(1), s.InProgress()) + require.Equal(t, uint64(reqCount/2-1), s.Pending()) + case "class2": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(reqCount/2), s.Pending()) + default: + require.Fail(t, "unknown tag:"+s.Tag()) + } + } + var result []string var wg sync.WaitGroup for i := 0; i < reqCount; i++ { @@ -52,6 +67,21 @@ func TestMClockSharesScheduling(t *testing.T) { } wg.Wait() + stats = q.Stats() + require.Equal(t, 2, len(stats)) + for _, s := range stats { + switch s.Tag() { + case "class1": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(0), s.Pending()) + case "class2": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(0), s.Pending()) + default: + require.Fail(t, "unknown tag:"+s.Tag()) + } + } + // Requests must be scheduled as class1->class1->class2->class1->class1->class2..., // because the ratio is 2 to 1. // However, there may be deviations due to rounding and sorting. @@ -116,7 +146,37 @@ func TestMClockRequestCancel(t *testing.T) { require.Equal(t, 0, q.limitQueue.Len()) require.Equal(t, 0, q.reservationQueue.Len()) + stats := q.Stats() + require.Equal(t, 2, len(stats)) + for _, s := range stats { + switch s.Tag() { + case "class1": + require.Equal(t, uint64(1), s.InProgress()) + require.Equal(t, uint64(0), s.Pending()) + case "class2": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(0), s.Pending()) + default: + require.Fail(t, "unknown tag:"+s.Tag()) + } + } + release1() + + stats = q.Stats() + require.Equal(t, 2, len(stats)) + for _, s := range stats { + switch s.Tag() { + case "class1": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(0), s.Pending()) + case "class2": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(0), s.Pending()) + default: + require.Fail(t, "unknown tag:"+s.Tag()) + } + } } func TestMClockLimitScheduling(t *testing.T) { @@ -159,6 +219,21 @@ func TestMClockLimitScheduling(t *testing.T) { } } + stats := q.Stats() + require.Equal(t, 2, len(stats)) + for _, s := range stats { + switch s.Tag() { + case "class1": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(reqCount/2), s.Pending()) + case "class2": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(reqCount/2), s.Pending()) + default: + require.Fail(t, "unknown tag:"+s.Tag()) + } + } + cl.v = math.MaxFloat64 var result []string @@ -202,6 +277,21 @@ func TestMClockLimitScheduling(t *testing.T) { require.Equal(t, 0, q.sharesQueue.Len()) require.Equal(t, 0, q.limitQueue.Len()) require.Equal(t, 0, q.reservationQueue.Len()) + + stats = q.Stats() + require.Equal(t, 2, len(stats)) + for _, s := range stats { + switch s.Tag() { + case "class1": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(0), s.Pending()) + case "class2": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(0), s.Pending()) + default: + require.Fail(t, "unknown tag:"+s.Tag()) + } + } } func TestMClockReservationScheduling(t *testing.T) { @@ -245,9 +335,39 @@ func TestMClockReservationScheduling(t *testing.T) { } } + stats := q.Stats() + require.Equal(t, 2, len(stats)) + for _, s := range stats { + switch s.Tag() { + case "class1": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(reqCount/2), s.Pending()) + case "class2": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(reqCount/2), s.Pending()) + default: + require.Fail(t, "unknown tag:"+s.Tag()) + } + } + cl.v = 1.00001 // 1s elapsed q.scheduleRequest() + stats = q.Stats() + require.Equal(t, 2, len(stats)) + for _, s := range stats { + switch s.Tag() { + case "class1": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(reqCount/2), s.Pending()) + case "class2": + require.Equal(t, uint64(100), s.InProgress()) + require.Equal(t, uint64(reqCount/2-100), s.Pending()) + default: + require.Fail(t, "unknown tag:"+s.Tag()) + } + } + var result []string for i, req := range requests { select { @@ -263,6 +383,21 @@ func TestMClockReservationScheduling(t *testing.T) { require.Equal(t, "class2", res) } + stats = q.Stats() + require.Equal(t, 2, len(stats)) + for _, s := range stats { + switch s.Tag() { + case "class1": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(reqCount/2), s.Pending()) + case "class2": + require.Equal(t, uint64(0), s.InProgress()) + require.Equal(t, uint64(reqCount/2-100), s.Pending()) + default: + require.Fail(t, "unknown tag:"+s.Tag()) + } + } + cl.v = math.MaxFloat64 q.scheduleRequest() @@ -270,6 +405,21 @@ func TestMClockReservationScheduling(t *testing.T) { require.Equal(t, 0, q.sharesQueue.Len()) require.Equal(t, 0, q.limitQueue.Len()) require.Equal(t, 0, q.reservationQueue.Len()) + + stats = q.Stats() + require.Equal(t, 2, len(stats)) + for _, s := range stats { + switch s.Tag() { + case "class1": + require.Equal(t, uint64(reqCount/2), s.InProgress()) + require.Equal(t, uint64(0), s.Pending()) + case "class2": + require.Equal(t, uint64(reqCount/2-100), s.InProgress()) + require.Equal(t, uint64(0), s.Pending()) + default: + require.Fail(t, "unknown tag:"+s.Tag()) + } + } } func TestMClockIdleTag(t *testing.T) { diff --git a/scheduling/stat.go b/scheduling/stat.go new file mode 100644 index 0000000..1775027 --- /dev/null +++ b/scheduling/stat.go @@ -0,0 +1,20 @@ +package scheduling + +import "sync/atomic" + +type Stat struct { + tag string + inProgress, pending atomic.Uint64 +} + +func (s *Stat) Tag() string { + return s.tag +} + +func (s *Stat) InProgress() uint64 { + return s.inProgress.Load() +} + +func (s *Stat) Pending() uint64 { + return s.pending.Load() +}