From 0dccab22c257a056a9618532ad676ba2c6b6efab Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 20 Jan 2025 17:10:24 +0300 Subject: [PATCH] [#1] mclock: Initial implementation Signed-off-by: Dmitrii Stepanov --- go.mod | 11 + go.sum | 12 + scheduling/mclock.go | 581 ++++++++++++++++++++++++++++++++ scheduling/mclock_bench.result | 172 ++++++++++ scheduling/mclock_bench_test.go | 87 +++++ scheduling/mclock_test.go | 459 +++++++++++++++++++++++++ 6 files changed, 1322 insertions(+) create mode 100644 go.sum create mode 100644 scheduling/mclock.go create mode 100644 scheduling/mclock_bench.result create mode 100644 scheduling/mclock_bench_test.go create mode 100644 scheduling/mclock_test.go diff --git a/go.mod b/go.mod index 00b2e04..f3e6160 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,14 @@ module git.frostfs.info/TrueCloudLab/frostfs-qos go 1.22 + +require ( + github.com/stretchr/testify v1.9.0 + golang.org/x/sync v0.10.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8c93871 --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/scheduling/mclock.go b/scheduling/mclock.go new file mode 100644 index 0000000..7c06bbc --- /dev/null +++ b/scheduling/mclock.go @@ -0,0 +1,581 @@ +package scheduling + +import ( + "container/heap" + "context" + "errors" + "math" + "sync" + "time" +) + +const ( + invalidIndex = -1 + undefinedReservation float64 = -1.0 +) + +var ( + ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed") + ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded") + ErrMClockSchedulerUnknownTag = errors.New("unknown tag") + ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero") + ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero") +) + +type queueItem interface { + ts() float64 + setIndex(idx int) +} + +type queue struct { + items []queueItem +} + +type request struct { + tag string + ts float64 + + reservation float64 + limit float64 + shares float64 + + reservationIdx int + limitIdx int + sharesIdx int + readyIdx int + + scheduled chan struct{} + canceled chan struct{} +} + +type clock interface { + now() float64 + runAt(ts float64, f func()) + close() +} + +// ReleaseFunc is the type of function that should be called after the request is completed. +type ReleaseFunc func() + +// TagInfo contains reserved IOPS, IOPS limit and share values for a tag. +type TagInfo struct { + ReservedIOPS *float64 + LimitIOPS *float64 + Share float64 +} + +// MClock is mClock scheduling algorithm implementation. +// +// See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details. +type MClock struct { + runLimit uint64 + waitLimit int + clock clock + idleTimeout float64 + tagInfo map[string]TagInfo + + mtx sync.Mutex + previous map[string]*request + inProgress uint64 + lastSchedule float64 + reservationQueue *queue + limitQueue *queue + sharesQueue *queue + readyQueue *queue + closed bool +} + +// NewMClock creates new MClock scheduler instance with +// runLimit maximum allowed count of running requests and +// waitLimit maximum allowed count of waiting requests +// for tags specified by tagInfo. The value of idleTimeout defines +// the difference between the current time and the time of +// the previous request in seconds, at which the tag considered idle. +// If idleTimeout is negative, it means that there is no idle tags allowed. +// If waitLimit equals zero, it means that there is no limit on the +// number of waiting requests. +func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout float64) (*MClock, error) { + if err := validateParams(runLimit, tagInfo); err != nil { + return nil, err + } + result := &MClock{ + runLimit: runLimit, + waitLimit: int(waitLimit), + clock: newSystemClock(), + idleTimeout: idleTimeout, + tagInfo: tagInfo, + + reservationQueue: &queue{}, + limitQueue: &queue{}, + sharesQueue: &queue{}, + readyQueue: &queue{}, + } + + previous := make(map[string]*request) + for tag := range tagInfo { + previous[tag] = &request{ + tag: tag, + reservationIdx: invalidIndex, + limitIdx: invalidIndex, + sharesIdx: invalidIndex, + } + } + result.previous = previous + + return result, nil +} + +// RequestArrival schedules new request with tag value. +// Method call is blocked until one of the following events occurs: +// request with the tag is scheduled for execution, +// context ctx is canceled or the scheduler is closed. +// If the method call returned non-nil ReleaseFunc, +// then it must be called after the request is completed. +func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) { + req, release, err := q.pushRequest(tag) + if err != nil { + return nil, err + } + select { + case <-ctx.Done(): + q.dropRequest(req) + return nil, ctx.Err() + case <-req.scheduled: + return release, nil + case <-req.canceled: + return nil, ErrMClockSchedulerClosed + } +} + +// Close closes MClock scheduler. +// No new requests for scheduling will be accepted after the closing. +func (q *MClock) Close() { + q.mtx.Lock() + defer q.mtx.Unlock() + + q.closed = true + q.clock.close() + for q.limitQueue.Len() > 0 { + item := heap.Pop(q.limitQueue).(*limitMQueueItem) + close(item.r.canceled) + q.removeFromQueues(item.r) + } +} + +func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error { + if runLimit == 0 { + return ErrInvalidRunLimit + } + for _, v := range tagInfo { + if v.LimitIOPS != nil && (math.IsNaN(*v.LimitIOPS) || *v.LimitIOPS <= float64(0)) { + return ErrInvalidTagInfo + } + if v.ReservedIOPS != nil && (math.IsNaN(*v.ReservedIOPS) || *v.ReservedIOPS <= float64(0)) { + return ErrInvalidTagInfo + } + if math.IsNaN(v.Share) || v.Share <= float64(0) { + return ErrInvalidTagInfo + } + } + return nil +} + +func (q *MClock) dropRequest(req *request) { + q.mtx.Lock() + defer q.mtx.Unlock() + + select { + case <-req.scheduled: + if q.inProgress == 0 { + panic("invalid requests count") + } + q.inProgress-- + default: + } + + q.removeFromQueues(req) +} + +func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) { + q.mtx.Lock() + defer q.mtx.Unlock() + + if q.closed { + return nil, nil, ErrMClockSchedulerClosed + } + if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit { + return nil, nil, ErrMClockSchedulerRequestLimitExceeded + } + + now := q.clock.now() + tagInfo, ok := q.tagInfo[tag] + if !ok { + return nil, nil, ErrMClockSchedulerUnknownTag + } + prev, ok := q.previous[tag] + if !ok { + panic("undefined previous: " + tag) + } + + if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout + q.adjustTags(now, tag) + } + + r := &request{ + tag: tag, + ts: now, + shares: max(prev.shares+1.0/tagInfo.Share, now), + reservationIdx: invalidIndex, + limitIdx: invalidIndex, + sharesIdx: invalidIndex, + readyIdx: invalidIndex, + scheduled: make(chan struct{}), + canceled: make(chan struct{}), + } + if tagInfo.ReservedIOPS != nil { + r.reservation = max(prev.reservation + 1.0 / *tagInfo.ReservedIOPS, now) + } else { + r.reservation = undefinedReservation + } + + if tagInfo.LimitIOPS != nil { + r.limit = max(prev.limit + 1.0 / *tagInfo.LimitIOPS, now) + } else { + r.limit = max(prev.limit, now) + } + + q.previous[tag] = r + if tagInfo.ReservedIOPS != nil { + heap.Push(q.reservationQueue, &reservationMQueueItem{r: r}) + } + heap.Push(q.sharesQueue, &sharesMQueueItem{r: r}) + heap.Push(q.limitQueue, &limitMQueueItem{r: r}) + q.scheduleRequest(true) + + return r, q.requestCompleted, nil +} + +func (q *MClock) adjustTags(now float64, idleTag string) { + if q.sharesQueue.Len() == 0 { + return + } + minShare := q.sharesQueue.items[0].ts() + for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed + limitItem := item.(*limitMQueueItem) + if limitItem.r.tag == idleTag { + continue + } + limitItem.r.shares -= (minShare - now) + if limitItem.r.sharesIdx != invalidIndex { + heap.Fix(q.sharesQueue, limitItem.r.sharesIdx) + } + if limitItem.r.readyIdx != invalidIndex { + heap.Fix(q.readyQueue, limitItem.r.readyIdx) + } + } +} + +func (q *MClock) scheduleRequest(lockTaken bool) { + if !lockTaken { + q.mtx.Lock() + defer q.mtx.Unlock() + } + + if q.inProgress >= q.runLimit { + return + } + now := q.clock.now() + q.scheduleByReservation(now) + if q.inProgress >= q.runLimit { + return + } + q.scheduleByLimitAndWeight(now) + if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) { + return + } + q.setNextScheduleTimer(now) +} + +func (q *MClock) setNextScheduleTimer(now float64) { + nextTs := math.MaxFloat64 + if q.reservationQueue.Len() > 0 { + nextTs = q.reservationQueue.items[0].ts() + } + if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs { + nextTs = q.limitQueue.items[0].ts() + } + + if q.lastSchedule < now && q.lastSchedule > nextTs { + q.clock.runAt(nextTs, func() { + q.scheduleRequest(false) + }) + q.lastSchedule = nextTs + } +} + +func (q *MClock) scheduleByLimitAndWeight(now float64) { + for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now { + ready := heap.Pop(q.limitQueue).(*limitMQueueItem) + heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r}) + } + + for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 { + next := heap.Pop(q.readyQueue).(*readyMQueueItem) + hadReservation := false + if next.r.reservationIdx != invalidIndex { + hadReservation = true + heap.Remove(q.reservationQueue, next.r.reservationIdx) + } + q.removeFromQueues(next.r) + + tagInfo, ok := q.tagInfo[next.r.tag] + if !ok { + panic("unknown tag: " + next.r.tag) // must be checked on top level + } + if tagInfo.ReservedIOPS != nil && hadReservation { + var updated bool + for _, i := range q.reservationQueue.items { + ri := i.(*reservationMQueueItem) + if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation { + ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS + updated = true + } + } + if updated { + heap.Init(q.reservationQueue) + } + } + + select { + case <-next.r.canceled: + continue + default: + } + + assertIndexInvalid(next.r) + q.inProgress++ + close(next.r.scheduled) + } +} + +func (q *MClock) scheduleByReservation(now float64) { + for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now { + next := heap.Pop(q.reservationQueue).(*reservationMQueueItem) + q.removeFromQueues(next.r) + + select { + case <-next.r.canceled: + continue + default: + } + + assertIndexInvalid(next.r) + q.inProgress++ + close(next.r.scheduled) + } +} + +func (q *MClock) removeFromQueues(r *request) { + if r.limitIdx != invalidIndex { + heap.Remove(q.limitQueue, r.limitIdx) + } + if r.sharesIdx != invalidIndex { + heap.Remove(q.sharesQueue, r.sharesIdx) + } + if r.readyIdx != invalidIndex { + heap.Remove(q.readyQueue, r.readyIdx) + } + if r.reservationIdx != invalidIndex { + heap.Remove(q.reservationQueue, r.reservationIdx) + } +} + +func (q *MClock) requestCompleted() { + q.mtx.Lock() + defer q.mtx.Unlock() + + if q.closed { + return + } + + if q.inProgress == 0 { + panic("invalid requests count") + } + q.inProgress-- + q.scheduleRequest(true) +} + +func assertIndexInvalid(r *request) { + if r.limitIdx != invalidIndex { + panic("limitIdx is not -1") + } + if r.sharesIdx != invalidIndex { + panic("sharesIdx is not -1") + } + if r.reservationIdx != invalidIndex { + panic("reservationIdx is not -1") + } + if r.readyIdx != invalidIndex { + panic("readyIdx is not -1") + } +} + +// Len implements heap.Interface. +func (q *queue) Len() int { + return len(q.items) +} + +// Less implements heap.Interface. +func (q *queue) Less(i int, j int) bool { + return q.items[i].ts() < q.items[j].ts() +} + +// Pop implements heap.Interface. +func (q *queue) Pop() any { + n := len(q.items) + item := q.items[n-1] + q.items[n-1] = nil + q.items = q.items[0 : n-1] + item.setIndex(invalidIndex) + return item +} + +// Push implements heap.Interface. +func (q *queue) Push(x any) { + it := x.(queueItem) + it.setIndex(q.Len()) + q.items = append(q.items, it) +} + +// Swap implements heap.Interface. +func (q *queue) Swap(i int, j int) { + q.items[i], q.items[j] = q.items[j], q.items[i] + q.items[i].setIndex(i) + q.items[j].setIndex(j) +} + +var _ queueItem = &reservationMQueueItem{} + +type reservationMQueueItem struct { + r *request +} + +func (i *reservationMQueueItem) ts() float64 { + return i.r.reservation +} + +func (i *reservationMQueueItem) setIndex(idx int) { + i.r.reservationIdx = idx +} + +var _ queueItem = &limitMQueueItem{} + +type limitMQueueItem struct { + r *request +} + +func (i *limitMQueueItem) ts() float64 { + return i.r.limit +} + +func (i *limitMQueueItem) setIndex(idx int) { + i.r.limitIdx = idx +} + +var _ queueItem = &sharesMQueueItem{} + +type sharesMQueueItem struct { + r *request +} + +func (i *sharesMQueueItem) ts() float64 { + return i.r.shares +} + +func (i *sharesMQueueItem) setIndex(idx int) { + i.r.sharesIdx = idx +} + +var _ queueItem = &readyMQueueItem{} + +type readyMQueueItem struct { + r *request +} + +func (i *readyMQueueItem) ts() float64 { + return i.r.shares +} + +func (i *readyMQueueItem) setIndex(idx int) { + i.r.readyIdx = idx +} + +type scheduleInfo struct { + ts float64 + f func() +} + +type systemClock struct { + since time.Time + schedule chan scheduleInfo + wg sync.WaitGroup +} + +func newSystemClock() *systemClock { + c := &systemClock{ + since: time.Now(), + schedule: make(chan scheduleInfo), + } + c.start() + return c +} + +func (c *systemClock) now() float64 { + return time.Since(c.since).Seconds() +} + +func (c *systemClock) runAt(ts float64, f func()) { + c.schedule <- scheduleInfo{ts: ts, f: f} +} + +func (c *systemClock) close() { + close(c.schedule) + c.wg.Wait() +} + +func (c *systemClock) start() { + c.wg.Add(1) + go func() { + defer c.wg.Done() + t := time.NewTimer(time.Hour) + var f func() + for { + select { + case <-t.C: + if f != nil { + f() + f = nil + } + t.Reset(time.Hour) + case s, ok := <-c.schedule: + if !ok { + return + } + now := c.now() + if now >= s.ts { + s.f() + f = nil + continue + } + if !t.Stop() { + select { + case <-t.C: + default: + } + } + t.Reset(time.Duration((s.ts - now) * 1e9)) + f = s.f + } + } + }() +} diff --git a/scheduling/mclock_bench.result b/scheduling/mclock_bench.result new file mode 100644 index 0000000..fa43dc4 --- /dev/null +++ b/scheduling/mclock_bench.result @@ -0,0 +1,172 @@ +Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -tags integration -bench ^BenchmarkMClock$ git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling -count=1 + +goos: linux +goarch: amd64 +pkg: git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling +cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz +BenchmarkMClock/noop,_1_parallelism-8 8660 140071 ns/op 0 B/op 0 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_1_parallelism,_1_tags-8 8433 144946 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_1_parallelism,_2_tags-8 8529 144497 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_1_parallelism,_4_tags-8 7638 144267 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_1_parallelism,_8_tags-8 8392 144710 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_1_parallelism,_16_tags-8 8474 143977 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_1_parallelism,_1_tags-8 8374 143286 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_1_parallelism,_2_tags-8 7911 144500 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_1_parallelism,_4_tags-8 7332 144296 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_1_parallelism,_8_tags-8 7830 144359 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_1_parallelism,_16_tags-8 7839 145112 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_1_parallelism,_1_tags-8 7750 143561 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_1_parallelism,_2_tags-8 7840 143975 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_1_parallelism,_4_tags-8 7886 143822 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_1_parallelism,_8_tags-8 8251 144555 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_1_parallelism,_16_tags-8 7665 144781 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_1_parallelism,_1_tags-8 7881 145169 ns/op 371 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_1_parallelism,_2_tags-8 8386 143578 ns/op 369 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_1_parallelism,_4_tags-8 8274 143942 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_1_parallelism,_8_tags-8 7830 143690 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_1_parallelism,_16_tags-8 7718 142707 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_1_parallelism,_1_tags-8 6446 180746 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_1_parallelism,_2_tags-8 6412 165058 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_1_parallelism,_4_tags-8 7323 156572 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_1_parallelism,_8_tags-8 8360 151004 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_1_parallelism,_16_tags-8 7712 147576 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_1_parallelism,_1_tags-8 6020 178971 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_1_parallelism,_2_tags-8 6448 165123 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_1_parallelism,_4_tags-8 6806 164651 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_1_parallelism,_8_tags-8 7284 152613 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_1_parallelism,_16_tags-8 7825 147727 ns/op 374 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_1_parallelism,_1_tags-8 5780 188006 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_1_parallelism,_2_tags-8 6483 172047 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_1_parallelism,_4_tags-8 7290 158680 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_1_parallelism,_8_tags-8 6862 148069 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_1_parallelism,_16_tags-8 7749 147112 ns/op 374 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_1_parallelism,_1_tags-8 5766 175459 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_1_parallelism,_2_tags-8 7200 161870 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_1_parallelism,_4_tags-8 7300 152912 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_1_parallelism,_8_tags-8 7208 148916 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_1_parallelism,_16_tags-8 7648 146154 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/noop,_8_parallelism-8 8521 140329 ns/op 0 B/op 0 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_8_parallelism,_1_tags-8 7728 143902 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_8_parallelism,_2_tags-8 8414 144178 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_8_parallelism,_4_tags-8 8403 145010 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_8_parallelism,_8_tags-8 8346 143279 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_8_parallelism,_16_tags-8 7945 141189 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_8_parallelism,_1_tags-8 7820 141144 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_8_parallelism,_2_tags-8 8460 143132 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_8_parallelism,_4_tags-8 8343 144865 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_8_parallelism,_8_tags-8 8383 143854 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_8_parallelism,_16_tags-8 8379 144622 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_8_parallelism,_1_tags-8 7818 144074 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_8_parallelism,_2_tags-8 8511 145416 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_8_parallelism,_4_tags-8 8350 144417 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_8_parallelism,_8_tags-8 8364 144918 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_8_parallelism,_16_tags-8 7776 143588 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_8_parallelism,_1_tags-8 8356 144611 ns/op 371 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_8_parallelism,_2_tags-8 7828 142666 ns/op 370 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_8_parallelism,_4_tags-8 7870 142888 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_8_parallelism,_8_tags-8 8528 140395 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_8_parallelism,_16_tags-8 8342 142833 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_8_parallelism,_1_tags-8 5640 187720 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_8_parallelism,_2_tags-8 6830 177689 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_8_parallelism,_4_tags-8 7209 156308 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_8_parallelism,_8_tags-8 7832 152150 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_8_parallelism,_16_tags-8 7520 145315 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_8_parallelism,_1_tags-8 5520 198036 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_8_parallelism,_2_tags-8 6430 171407 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_8_parallelism,_4_tags-8 7269 159044 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_8_parallelism,_8_tags-8 7760 147757 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_8_parallelism,_16_tags-8 7794 145792 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_8_parallelism,_1_tags-8 5510 199098 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_8_parallelism,_2_tags-8 7602 177956 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_8_parallelism,_4_tags-8 6955 160300 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_8_parallelism,_8_tags-8 7950 146992 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_8_parallelism,_16_tags-8 7870 145343 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_8_parallelism,_1_tags-8 6033 189148 ns/op 370 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_8_parallelism,_2_tags-8 6764 172016 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_8_parallelism,_4_tags-8 7255 156037 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_8_parallelism,_8_tags-8 7879 150515 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_8_parallelism,_16_tags-8 7802 147904 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/noop,_32_parallelism-8 7870 139959 ns/op 0 B/op 0 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_32_parallelism,_1_tags-8 8146 141951 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_32_parallelism,_2_tags-8 7737 143994 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_32_parallelism,_4_tags-8 8444 143977 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_32_parallelism,_8_tags-8 8367 142965 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_32_parallelism,_16_tags-8 7807 142984 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_32_parallelism,_1_tags-8 8326 142276 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_32_parallelism,_2_tags-8 8164 142354 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_32_parallelism,_4_tags-8 8367 143149 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_32_parallelism,_8_tags-8 7864 143681 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_32_parallelism,_16_tags-8 7666 143557 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_32_parallelism,_1_tags-8 8354 142904 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_32_parallelism,_2_tags-8 8210 143932 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_32_parallelism,_4_tags-8 8328 143229 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_32_parallelism,_8_tags-8 8224 142964 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_32_parallelism,_16_tags-8 8398 142558 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_32_parallelism,_1_tags-8 7723 142118 ns/op 371 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_32_parallelism,_2_tags-8 8337 143492 ns/op 369 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_32_parallelism,_4_tags-8 7651 144301 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_32_parallelism,_8_tags-8 8320 143327 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_32_parallelism,_16_tags-8 8214 143211 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_32_parallelism,_1_tags-8 6573 172171 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_32_parallelism,_2_tags-8 7261 158054 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_32_parallelism,_4_tags-8 7264 151381 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_32_parallelism,_8_tags-8 7887 149740 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_32_parallelism,_16_tags-8 7783 145891 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_32_parallelism,_1_tags-8 6448 184402 ns/op 374 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_32_parallelism,_2_tags-8 7142 170111 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_32_parallelism,_4_tags-8 6856 157931 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_32_parallelism,_8_tags-8 7332 147039 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_32_parallelism,_16_tags-8 8328 145941 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_32_parallelism,_1_tags-8 6741 170048 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_32_parallelism,_2_tags-8 6813 162057 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_32_parallelism,_4_tags-8 7334 152023 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_32_parallelism,_8_tags-8 7390 151674 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_32_parallelism,_16_tags-8 7299 144482 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_32_parallelism,_1_tags-8 6078 180087 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_32_parallelism,_2_tags-8 6906 171037 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_32_parallelism,_4_tags-8 7348 161815 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_32_parallelism,_8_tags-8 7352 150144 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_32_parallelism,_16_tags-8 8432 148060 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/noop,_64_parallelism-8 7905 139440 ns/op 0 B/op 0 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_64_parallelism,_1_tags-8 7698 143624 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_64_parallelism,_2_tags-8 7994 142888 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_64_parallelism,_4_tags-8 8451 142612 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_64_parallelism,_8_tags-8 8332 141805 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_no_reservation,_64_parallelism,_16_tags-8 7700 144190 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_64_parallelism,_1_tags-8 8425 143468 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_64_parallelism,_2_tags-8 8294 143356 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_64_parallelism,_4_tags-8 7993 143701 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_64_parallelism,_8_tags-8 8104 142619 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_64_parallelism,_16_tags-8 7333 143398 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_64_parallelism,_1_tags-8 8396 143165 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_64_parallelism,_2_tags-8 8485 143813 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_64_parallelism,_4_tags-8 8193 144148 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_64_parallelism,_8_tags-8 7712 143123 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_64_parallelism,_16_tags-8 7663 144352 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_64_parallelism,_1_tags-8 7795 143937 ns/op 371 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_64_parallelism,_2_tags-8 8484 144034 ns/op 369 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_64_parallelism,_4_tags-8 7846 142858 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_64_parallelism,_8_tags-8 8320 143052 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_64_parallelism,_16_tags-8 8484 142492 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_64_parallelism,_1_tags-8 5718 178028 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_64_parallelism,_2_tags-8 6993 160263 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_64_parallelism,_4_tags-8 6818 152746 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_64_parallelism,_8_tags-8 7684 149975 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_64_parallelism,_16_tags-8 7791 145647 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_64_parallelism,_1_tags-8 6402 190525 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_64_parallelism,_2_tags-8 6108 175412 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_64_parallelism,_4_tags-8 7340 159547 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_64_parallelism,_8_tags-8 7800 153072 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_64_parallelism,_16_tags-8 7863 146726 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_64_parallelism,_1_tags-8 5761 175532 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_64_parallelism,_2_tags-8 6433 165923 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_64_parallelism,_4_tags-8 7309 153218 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_64_parallelism,_8_tags-8 7173 148557 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_64_parallelism,_16_tags-8 8353 146923 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_64_parallelism,_1_tags-8 6364 172028 ns/op 369 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_64_parallelism,_2_tags-8 7300 161579 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_64_parallelism,_4_tags-8 6910 153875 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_64_parallelism,_8_tags-8 7945 147313 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_64_parallelism,_16_tags-8 7848 146027 ns/op 366 B/op 8 allocs/op +PASS +ok git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling 192.364s diff --git a/scheduling/mclock_bench_test.go b/scheduling/mclock_bench_test.go new file mode 100644 index 0000000..9888a40 --- /dev/null +++ b/scheduling/mclock_bench_test.go @@ -0,0 +1,87 @@ +package scheduling + +import ( + "context" + "fmt" + "math" + "math/rand/v2" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type noopMClockScheduler struct{} + +var ( + releaseStub ReleaseFunc = func() {} + defaultLimit float64 = 100_000 + shortReservation float64 = 1 + medReservation float64 = 100 + largeReservation float64 = 10_000 +) + +func (s *noopMClockScheduler) RequestArrival(context.Context, string) ReleaseFunc { + return releaseStub +} + +func BenchmarkMClock(b *testing.B) { + tagsCount := []int{1, 2, 4, 8, 16} + ioDuration := time.Millisecond + parallelismValues := []int{1, 8, 32, 64} + limits := []*float64{nil, &defaultLimit} + reservations := []*float64{nil, &shortReservation, &medReservation, &largeReservation} + for _, parallelism := range parallelismValues { + b.SetParallelism(parallelism) + + noopMClock := &noopMClockScheduler{} + b.Run(fmt.Sprintf("noop, %d parallelism", parallelism), func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + release := noopMClock.RequestArrival(context.Background(), "tag") + time.Sleep(ioDuration) + release() + } + }) + }) + + for _, limit := range limits { + for _, reservation := range reservations { + for _, tags := range tagsCount { + tagInfos := make(map[string]TagInfo) + for tag := 0; tag < tags; tag++ { + tagInfos["tag"+strconv.FormatInt(int64(tag), 10)] = TagInfo{Share: 50, LimitIOPS: limit, ReservedIOPS: reservation} + } + + mClockQ, _ := NewMClock(math.MaxUint64, math.MaxUint64, tagInfos, math.MaxFloat64) + + resStr := "no" + if reservation != nil { + resStr = strconv.FormatFloat(*reservation, 'f', 1, 64) + } + limitStr := "no" + if limit != nil { + limitStr = strconv.FormatFloat(*limit, 'f', 1, 64) + } + b.Run(fmt.Sprintf("mclock, %s limit, %s reservation, %d parallelism, %d tags", limitStr, resStr, parallelism, tags), func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + tag := rand.Int64N(int64(tags)) + release, err := mClockQ.RequestArrival(context.Background(), "tag"+strconv.FormatInt(int64(tag), 10)) + require.NoError(b, err) + time.Sleep(ioDuration) + release() + } + }) + }) + } + } + } + + } +} diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go new file mode 100644 index 0000000..7f0edc8 --- /dev/null +++ b/scheduling/mclock_test.go @@ -0,0 +1,459 @@ +package scheduling + +import ( + "context" + "math" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestMClockSharesScheduling(t *testing.T) { + t.Parallel() + reqCount := 1000 + reqCount = (reqCount / 2) * 2 + q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 2}, + "class2": {Share: 1}, + }, 100) + require.NoError(t, err) + q.clock = &noopClock{} + + var releases []ReleaseFunc + var requests []*request + tag := "class1" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + tag = "class2" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + + var result []string + var wg sync.WaitGroup + for i := 0; i < reqCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-requests[i].scheduled + result = append(result, requests[i].tag) + releases[i]() + }() + } + wg.Wait() + + // Requests must be scheduled as class1->class1->class2->class1->class1->class2..., + // because the ratio is 2 to 1. + // However, there may be deviations due to rounding and sorting. + result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail + var class1Count int + var class2Count int + var class2MaxSeq int + for _, res := range result { + switch res { + case "class1": + class1Count++ + class2MaxSeq = 0 + case "class2": + class2Count++ + class2MaxSeq++ + require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row + default: + require.Fail(t, "unknown tag") + } + } + + require.True(t, (class1Count*100)/(class1Count+class2Count) == 66) +} + +var _ clock = &noopClock{} + +type noopClock struct { + v float64 +} + +func (n *noopClock) now() float64 { + return n.v +} + +func (n *noopClock) runAt(ts float64, f func()) {} + +func (n *noopClock) close() {} + +func TestMClockRequestCancel(t *testing.T) { + t.Parallel() + q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 2}, + "class2": {Share: 1}, + }, 100) + require.NoError(t, err) + q.clock = &noopClock{} + + release1, err := q.RequestArrival(context.Background(), "class1") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + release2, err := q.RequestArrival(ctx, "class1") + require.Nil(t, release2) + require.ErrorIs(t, err, context.DeadlineExceeded) + + require.Equal(t, 0, q.readyQueue.Len()) + require.Equal(t, 0, q.sharesQueue.Len()) + require.Equal(t, 0, q.limitQueue.Len()) + require.Equal(t, 0, q.reservationQueue.Len()) + + release1() +} + +func TestMClockLimitScheduling(t *testing.T) { + t.Parallel() + reqCount := 100 + reqCount = (reqCount / 2) * 2 + limit := 1.0 + cl := &noopClock{} + q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 2, LimitIOPS: &limit}, + "class2": {Share: 1, LimitIOPS: &limit}, + }, 100) + require.NoError(t, err) + q.clock = cl + + var releases []ReleaseFunc + var requests []*request + tag := "class1" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + tag = "class2" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + + q.scheduleRequest(false) + + for _, req := range requests { + select { + case <-req.scheduled: + require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0") + default: + } + } + + cl.v = math.MaxFloat64 + + var result []string + var wg sync.WaitGroup + for i := 0; i < reqCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-requests[i].scheduled + result = append(result, requests[i].tag) + releases[i]() + }() + } + q.scheduleRequest(false) + wg.Wait() + + // Requests must be scheduled as class1->class1->class2->class1->class1->class2..., + // because the ratio is 2 to 1. + // However, there may be deviations due to rounding and sorting. + result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail + var class1Count int + var class2Count int + var class2MaxSeq int + for _, res := range result { + switch res { + case "class1": + class1Count++ + class2MaxSeq = 0 + case "class2": + class2Count++ + class2MaxSeq++ + require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row + default: + require.Fail(t, "unknown tag") + } + } + + require.True(t, (class1Count*100)/(class1Count+class2Count) == 66) + + require.Equal(t, 0, q.readyQueue.Len()) + require.Equal(t, 0, q.sharesQueue.Len()) + require.Equal(t, 0, q.limitQueue.Len()) + require.Equal(t, 0, q.reservationQueue.Len()) +} + +func TestMClockReservationScheduling(t *testing.T) { + t.Parallel() + reqCount := 1000 + reqCount = (reqCount / 2) * 2 + limit := 0.01 // 1 request in 100 seconds + resevation := 100.0 // 100 RPS + cl := &noopClock{} + q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 2, LimitIOPS: &limit}, + "class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation}, + }, 100) + require.NoError(t, err) + q.clock = cl + + var releases []ReleaseFunc + var requests []*request + tag := "class1" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + tag = "class2" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + + q.scheduleRequest(false) + + for _, req := range requests { + select { + case <-req.scheduled: + require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0") + default: + } + } + + cl.v = 1.00001 // 1s elapsed + q.scheduleRequest(false) + + var result []string + for i, req := range requests { + select { + case <-req.scheduled: + result = append(result, requests[i].tag) + releases[i]() + default: + } + } + + require.Equal(t, 100, len(result)) + for _, res := range result { + require.Equal(t, "class2", res) + } + + cl.v = math.MaxFloat64 + q.scheduleRequest(false) + + require.Equal(t, 0, q.readyQueue.Len()) + require.Equal(t, 0, q.sharesQueue.Len()) + require.Equal(t, 0, q.limitQueue.Len()) + require.Equal(t, 0, q.reservationQueue.Len()) +} + +func TestMClockIdleTag(t *testing.T) { + t.Parallel() + reqCount := 100 + idleTimeout := 2.0 + cl := &noopClock{} + q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 1}, + "class2": {Share: 1}, + }, idleTimeout) + require.NoError(t, err) + q.clock = cl + + var requests []*request + tag := "class1" + for i := 0; i < reqCount/2; i++ { + cl.v += idleTimeout / 2 + req, _, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + } + + // class1 requests have shares [1.0; 2.0; 3.0; ... ] + + cl.v += 2 * idleTimeout + + tag = "class2" + req, _, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + + // class2 must be defined as idle, so all shares tags must be adjusted. + + for _, req := range requests { + select { + case <-req.scheduled: + default: + require.True(t, req.shares >= cl.v) + } + } +} + +func TestMClockClose(t *testing.T) { + t.Parallel() + q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 1}, + }, 1000) + require.NoError(t, err) + q.clock = &noopClock{} + + requestRunning := make(chan struct{}) + checkDone := make(chan struct{}) + eg, ctx := errgroup.WithContext(context.Background()) + tag := "class1" + eg.Go(func() error { + release, err := q.RequestArrival(ctx, tag) + if err != nil { + return err + } + defer release() + close(requestRunning) + <-checkDone + return nil + }) + <-requestRunning + + eg.Go(func() error { + release, err := q.RequestArrival(ctx, tag) + require.Nil(t, release) + require.ErrorIs(t, err, ErrMClockSchedulerClosed) + return nil + }) + + // wait until second request will be blocked on wait + for q.waitingCount() == 0 { + time.Sleep(1 * time.Second) + } + + q.Close() + + release, err := q.RequestArrival(context.Background(), tag) + require.Nil(t, release) + require.ErrorIs(t, err, ErrMClockSchedulerClosed) + + close(checkDone) + + require.NoError(t, eg.Wait()) +} + +func TestMClockWaitLimit(t *testing.T) { + t.Parallel() + q, err := NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1}, + }, 1000) + require.NoError(t, err) + q.clock = &noopClock{} + defer q.Close() + + requestRunning := make(chan struct{}) + checkDone := make(chan struct{}) + eg, ctx := errgroup.WithContext(context.Background()) + tag := "class1" + // running request + eg.Go(func() error { + release, err := q.RequestArrival(ctx, tag) + if err != nil { + return err + } + defer release() + close(requestRunning) + <-checkDone + return nil + }) + + // waiting request + eg.Go(func() error { + <-requestRunning + release, err := q.RequestArrival(ctx, tag) + require.NotNil(t, release) + require.NoError(t, err) + defer release() + <-checkDone + return nil + }) + + // wait until second request will be waiting + for q.waitingCount() == 0 { + time.Sleep(1 * time.Second) + } + + release, err := q.RequestArrival(ctx, tag) + require.Nil(t, release) + require.ErrorIs(t, err, ErrMClockSchedulerRequestLimitExceeded) + + close(checkDone) + require.NoError(t, eg.Wait()) +} + +func TestMClockParameterValidation(t *testing.T) { + _, err := NewMClock(0, 1, map[string]TagInfo{ + "class1": {Share: 1}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidRunLimit) + _, err = NewMClock(1, 0, map[string]TagInfo{ + "class1": {Share: 1}, + }, 1000) + require.NoError(t, err) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1}, + }, -1.0) + require.NoError(t, err) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1}, + }, float64(0)) + require.NoError(t, err) + negativeValue := -1.0 + zeroValue := float64(0) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: negativeValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: zeroValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1.0, ReservedIOPS: &zeroValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1.0, ReservedIOPS: &negativeValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1.0, LimitIOPS: &zeroValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1.0, LimitIOPS: &negativeValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) +} + +func (q *MClock) waitingCount() int { + q.mtx.Lock() + defer q.mtx.Unlock() + + return q.sharesQueue.Len() +}