mClock: Add request stats #7
3 changed files with 214 additions and 2 deletions
|
@ -14,6 +14,7 @@ import (
|
||||||
const (
|
const (
|
||||||
invalidIndex = -1
|
invalidIndex = -1
|
||||||
undefinedReservation float64 = -1.0
|
undefinedReservation float64 = -1.0
|
||||||
|
minusOne = ^uint64(0)
|
||||||
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -60,6 +61,8 @@ type MClock struct {
|
||||||
clock clock
|
clock clock
|
||||||
idleTimeout float64
|
idleTimeout float64
|
||||||
tagInfo map[string]TagInfo
|
tagInfo map[string]TagInfo
|
||||||
|
tagStat map[string]*Stat
|
||||||
|
stats []*Stat
|
||||||
|
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
previous map[string]*request
|
previous map[string]*request
|
||||||
|
@ -110,6 +113,16 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo
|
||||||
}
|
}
|
||||||
result.previous = previous
|
result.previous = previous
|
||||||
|
|
||||||
|
result.tagStat = make(map[string]*Stat, len(tagInfo))
|
||||||
|
result.stats = make([]*Stat, 0, len(tagInfo))
|
||||||
|
for tag := range tagInfo {
|
||||||
|
s := &Stat{
|
||||||
|
tag: tag,
|
||||||
|
}
|
||||||
|
result.tagStat[tag] = s
|
||||||
|
result.stats = append(result.stats, s)
|
||||||
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,6 +163,12 @@ func (q *MClock) Close() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stats returns per tag stat.
|
||||||
|
// Returned slice should not be modified.
|
||||||
|
func (q *MClock) Stats() []*Stat {
|
||||||
|
return q.stats
|
||||||
|
}
|
||||||
|
|
||||||
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
|
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
|
||||||
if runLimit == 0 {
|
if runLimit == 0 {
|
||||||
return ErrInvalidRunLimit
|
return ErrInvalidRunLimit
|
||||||
|
@ -172,11 +191,16 @@ func (q *MClock) dropRequest(req *request) {
|
||||||
q.mtx.Lock()
|
q.mtx.Lock()
|
||||||
defer q.mtx.Unlock()
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
s, ok := q.tagStat[req.tag]
|
||||||
|
assert.Cond(ok, "undefined stat tag:", req.tag)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-req.scheduled:
|
case <-req.scheduled:
|
||||||
assert.Cond(q.inProgress > 0, "invalid requests count")
|
assert.Cond(q.inProgress > 0, "invalid requests count")
|
||||||
q.inProgress--
|
q.inProgress--
|
||||||
|
s.inProgress.Add(minusOne)
|
||||||
default:
|
default:
|
||||||
|
s.pending.Add(minusOne)
|
||||||
}
|
}
|
||||||
|
|
||||||
q.removeFromQueues(req)
|
q.removeFromQueues(req)
|
||||||
|
@ -234,9 +258,14 @@ func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
|
||||||
}
|
}
|
||||||
heap.Push(q.sharesQueue, &sharesMQueueItem{r: r})
|
heap.Push(q.sharesQueue, &sharesMQueueItem{r: r})
|
||||||
heap.Push(q.limitQueue, &limitMQueueItem{r: r})
|
heap.Push(q.limitQueue, &limitMQueueItem{r: r})
|
||||||
|
|
||||||
|
s, ok := q.tagStat[tag]
|
||||||
|
assert.Cond(ok, "undefined stat tag:", tag)
|
||||||
|
s.pending.Add(1)
|
||||||
|
|
||||||
q.scheduleRequestUnsafe()
|
q.scheduleRequestUnsafe()
|
||||||
|
|
||||||
return r, q.requestCompleted, nil
|
return r, func() { q.requestCompleted(tag) }, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *MClock) adjustTags(now float64, idleTag string) {
|
func (q *MClock) adjustTags(now float64, idleTag string) {
|
||||||
|
@ -318,6 +347,10 @@ func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
||||||
}
|
}
|
||||||
q.removeFromQueues(next.r)
|
q.removeFromQueues(next.r)
|
||||||
|
|
||||||
|
s, ok := q.tagStat[next.r.tag]
|
||||||
|
assert.Cond(ok, "undefined stat tag:", next.r.tag)
|
||||||
|
s.pending.Add(minusOne)
|
||||||
|
|
||||||
tagInfo, ok := q.tagInfo[next.r.tag]
|
tagInfo, ok := q.tagInfo[next.r.tag]
|
||||||
assert.Cond(ok, "unknown tag:", next.r.tag)
|
assert.Cond(ok, "unknown tag:", next.r.tag)
|
||||||
if tagInfo.ReservedIOPS != nil && hadReservation {
|
if tagInfo.ReservedIOPS != nil && hadReservation {
|
||||||
|
@ -342,6 +375,7 @@ func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
||||||
|
|
||||||
assertIndexInvalid(next.r)
|
assertIndexInvalid(next.r)
|
||||||
q.inProgress++
|
q.inProgress++
|
||||||
|
s.inProgress.Add(1)
|
||||||
close(next.r.scheduled)
|
close(next.r.scheduled)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -351,6 +385,10 @@ func (q *MClock) scheduleByReservation(now float64) {
|
||||||
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
|
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
|
||||||
q.removeFromQueues(next.r)
|
q.removeFromQueues(next.r)
|
||||||
|
|
||||||
|
s, ok := q.tagStat[next.r.tag]
|
||||||
|
assert.Cond(ok, "undefined stat tag:", next.r.tag)
|
||||||
|
s.pending.Add(minusOne)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-next.r.canceled:
|
case <-next.r.canceled:
|
||||||
continue
|
continue
|
||||||
|
@ -359,6 +397,7 @@ func (q *MClock) scheduleByReservation(now float64) {
|
||||||
|
|
||||||
assertIndexInvalid(next.r)
|
assertIndexInvalid(next.r)
|
||||||
q.inProgress++
|
q.inProgress++
|
||||||
|
s.inProgress.Add(1)
|
||||||
close(next.r.scheduled)
|
close(next.r.scheduled)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -378,7 +417,7 @@ func (q *MClock) removeFromQueues(r *request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *MClock) requestCompleted() {
|
func (q *MClock) requestCompleted(tag string) {
|
||||||
q.mtx.Lock()
|
q.mtx.Lock()
|
||||||
defer q.mtx.Unlock()
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -388,6 +427,9 @@ func (q *MClock) requestCompleted() {
|
||||||
|
|
||||||
assert.Cond(q.inProgress > 0, "invalid requests count")
|
assert.Cond(q.inProgress > 0, "invalid requests count")
|
||||||
q.inProgress--
|
q.inProgress--
|
||||||
|
s, ok := q.tagStat[tag]
|
||||||
|
assert.Cond(ok, "undefined stat tag:", tag)
|
||||||
|
s.inProgress.Add(minusOne)
|
||||||
q.scheduleRequestUnsafe()
|
q.scheduleRequestUnsafe()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,21 @@ func TestMClockSharesScheduling(t *testing.T) {
|
||||||
releases = append(releases, release)
|
releases = append(releases, release)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats := q.Stats()
|
||||||
|
require.Equal(t, 2, len(stats))
|
||||||
|
for _, s := range stats {
|
||||||
|
switch s.Tag() {
|
||||||
|
case "class1":
|
||||||
|
require.Equal(t, uint64(1), s.InProgress())
|
||||||
|
require.Equal(t, uint64(reqCount/2-1), s.Pending())
|
||||||
|
case "class2":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag:"+s.Tag())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var result []string
|
var result []string
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < reqCount; i++ {
|
for i := 0; i < reqCount; i++ {
|
||||||
|
@ -52,6 +67,21 @@ func TestMClockSharesScheduling(t *testing.T) {
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
stats = q.Stats()
|
||||||
|
require.Equal(t, 2, len(stats))
|
||||||
|
for _, s := range stats {
|
||||||
|
switch s.Tag() {
|
||||||
|
case "class1":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(0), s.Pending())
|
||||||
|
case "class2":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(0), s.Pending())
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag:"+s.Tag())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
|
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
|
||||||
// because the ratio is 2 to 1.
|
// because the ratio is 2 to 1.
|
||||||
// However, there may be deviations due to rounding and sorting.
|
// However, there may be deviations due to rounding and sorting.
|
||||||
|
@ -116,7 +146,37 @@ func TestMClockRequestCancel(t *testing.T) {
|
||||||
require.Equal(t, 0, q.limitQueue.Len())
|
require.Equal(t, 0, q.limitQueue.Len())
|
||||||
require.Equal(t, 0, q.reservationQueue.Len())
|
require.Equal(t, 0, q.reservationQueue.Len())
|
||||||
|
|
||||||
|
stats := q.Stats()
|
||||||
|
require.Equal(t, 2, len(stats))
|
||||||
|
for _, s := range stats {
|
||||||
|
switch s.Tag() {
|
||||||
|
case "class1":
|
||||||
|
require.Equal(t, uint64(1), s.InProgress())
|
||||||
|
require.Equal(t, uint64(0), s.Pending())
|
||||||
|
case "class2":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(0), s.Pending())
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag:"+s.Tag())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
release1()
|
release1()
|
||||||
|
|
||||||
|
stats = q.Stats()
|
||||||
|
require.Equal(t, 2, len(stats))
|
||||||
|
for _, s := range stats {
|
||||||
|
switch s.Tag() {
|
||||||
|
case "class1":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(0), s.Pending())
|
||||||
|
case "class2":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(0), s.Pending())
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag:"+s.Tag())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMClockLimitScheduling(t *testing.T) {
|
func TestMClockLimitScheduling(t *testing.T) {
|
||||||
|
@ -159,6 +219,21 @@ func TestMClockLimitScheduling(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats := q.Stats()
|
||||||
|
require.Equal(t, 2, len(stats))
|
||||||
|
for _, s := range stats {
|
||||||
|
switch s.Tag() {
|
||||||
|
case "class1":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
||||||
|
case "class2":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag:"+s.Tag())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cl.v = math.MaxFloat64
|
cl.v = math.MaxFloat64
|
||||||
|
|
||||||
var result []string
|
var result []string
|
||||||
|
@ -202,6 +277,21 @@ func TestMClockLimitScheduling(t *testing.T) {
|
||||||
require.Equal(t, 0, q.sharesQueue.Len())
|
require.Equal(t, 0, q.sharesQueue.Len())
|
||||||
require.Equal(t, 0, q.limitQueue.Len())
|
require.Equal(t, 0, q.limitQueue.Len())
|
||||||
require.Equal(t, 0, q.reservationQueue.Len())
|
require.Equal(t, 0, q.reservationQueue.Len())
|
||||||
|
|
||||||
|
stats = q.Stats()
|
||||||
|
require.Equal(t, 2, len(stats))
|
||||||
|
for _, s := range stats {
|
||||||
|
switch s.Tag() {
|
||||||
|
case "class1":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(0), s.Pending())
|
||||||
|
case "class2":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(0), s.Pending())
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag:"+s.Tag())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMClockReservationScheduling(t *testing.T) {
|
func TestMClockReservationScheduling(t *testing.T) {
|
||||||
|
@ -245,9 +335,39 @@ func TestMClockReservationScheduling(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats := q.Stats()
|
||||||
|
require.Equal(t, 2, len(stats))
|
||||||
|
for _, s := range stats {
|
||||||
|
switch s.Tag() {
|
||||||
|
case "class1":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
||||||
|
case "class2":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag:"+s.Tag())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cl.v = 1.00001 // 1s elapsed
|
cl.v = 1.00001 // 1s elapsed
|
||||||
q.scheduleRequest()
|
q.scheduleRequest()
|
||||||
|
|
||||||
|
stats = q.Stats()
|
||||||
|
require.Equal(t, 2, len(stats))
|
||||||
|
for _, s := range stats {
|
||||||
|
switch s.Tag() {
|
||||||
|
case "class1":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
||||||
|
case "class2":
|
||||||
|
require.Equal(t, uint64(100), s.InProgress())
|
||||||
|
require.Equal(t, uint64(reqCount/2-100), s.Pending())
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag:"+s.Tag())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var result []string
|
var result []string
|
||||||
for i, req := range requests {
|
for i, req := range requests {
|
||||||
select {
|
select {
|
||||||
|
@ -263,6 +383,21 @@ func TestMClockReservationScheduling(t *testing.T) {
|
||||||
require.Equal(t, "class2", res)
|
require.Equal(t, "class2", res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats = q.Stats()
|
||||||
|
require.Equal(t, 2, len(stats))
|
||||||
|
for _, s := range stats {
|
||||||
|
switch s.Tag() {
|
||||||
|
case "class1":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
||||||
|
case "class2":
|
||||||
|
require.Equal(t, uint64(0), s.InProgress())
|
||||||
|
require.Equal(t, uint64(reqCount/2-100), s.Pending())
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag:"+s.Tag())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cl.v = math.MaxFloat64
|
cl.v = math.MaxFloat64
|
||||||
q.scheduleRequest()
|
q.scheduleRequest()
|
||||||
|
|
||||||
|
@ -270,6 +405,21 @@ func TestMClockReservationScheduling(t *testing.T) {
|
||||||
require.Equal(t, 0, q.sharesQueue.Len())
|
require.Equal(t, 0, q.sharesQueue.Len())
|
||||||
require.Equal(t, 0, q.limitQueue.Len())
|
require.Equal(t, 0, q.limitQueue.Len())
|
||||||
require.Equal(t, 0, q.reservationQueue.Len())
|
require.Equal(t, 0, q.reservationQueue.Len())
|
||||||
|
|
||||||
|
stats = q.Stats()
|
||||||
|
require.Equal(t, 2, len(stats))
|
||||||
|
for _, s := range stats {
|
||||||
|
switch s.Tag() {
|
||||||
|
case "class1":
|
||||||
|
require.Equal(t, uint64(reqCount/2), s.InProgress())
|
||||||
|
require.Equal(t, uint64(0), s.Pending())
|
||||||
|
case "class2":
|
||||||
|
require.Equal(t, uint64(reqCount/2-100), s.InProgress())
|
||||||
|
require.Equal(t, uint64(0), s.Pending())
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag:"+s.Tag())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMClockIdleTag(t *testing.T) {
|
func TestMClockIdleTag(t *testing.T) {
|
||||||
|
|
20
scheduling/stat.go
Normal file
20
scheduling/stat.go
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
package scheduling
|
||||||
|
|
||||||
|
import "sync/atomic"
|
||||||
|
|
||||||
|
type Stat struct {
|
||||||
|
tag string
|
||||||
|
inProgress, pending atomic.Uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Stat) Tag() string {
|
||||||
|
return s.tag
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Stat) InProgress() uint64 {
|
||||||
|
return s.inProgress.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Stat) Pending() uint64 {
|
||||||
|
return s.pending.Load()
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue
math.MaxUint64
?