mclock: Initial implementation #1
No reviewers
Labels
No labels
Infrastructure
blocked
bug
config
discussion
documentation
duplicate
enhancement
go
help wanted
internal
invalid
kludge
observability
perfomance
question
refactoring
wontfix
No milestone
No project
No assignees
6 participants
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference: TrueCloudLab/frostfs-qos#1
Loading…
Add table
Reference in a new issue
No description provided.
Delete branch "dstepanov-yadro/frostfs-qos:feat/mclock"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Initial mClock scheduler implementation.
c65068d4e2
to95ccf46c64
dstepanov-yadro referenced this pull request2025-01-21 09:26:02 +00:00
@ -0,0 +147,4 @@
}
}
// Close clocses MClock scheduler.
clocses -> closes
done
c6789eea71
to2ef8e5508f
@ -0,0 +62,4 @@
Shares float64
}
// MClock is mClock scheduling algorithm realization.
realization
->implementation
?Realize is rather about plans in general :)
fixed
2ef8e5508f
to170388db8d
@ -0,0 +215,4 @@
canceled: make(chan struct{}),
}
if tagInfo.Reservation != nil {
r.reservation = max(prev.reservation + 1.0 / *tagInfo.Reservation, now)
I suppose we should add some value validation for tags as we expect that
*tagInfo.Reservation != 0
Added validation and unit tests
@ -0,0 +514,4 @@
}
func newSystemClock() *systemClock {
c := &systemClock{
schedule
is never initialized - way to panic?fixed
170388db8d
to0edfa036a5
0edfa036a5
to3d0219d012
@ -0,0 +585,4 @@
f = nil
continue
}
t.Reset(time.Duration((s.ts - now) * 1e9))
https://pkg.go.dev/time#Timer.Reset
We use
go 1.22
here. So, it seems we need to drain the timer before resetEither
Reset
may not resetThx, fixed!
3d0219d012
to180e62c296
@ -0,0 +415,4 @@
// waitingCount is for using in tests only.
//
// nolint: unused
This comment is outdated - you already use this function in the tests. Please, remove it
golangci-lint doesn't includes tests in check, so this comment is required
@ -0,0 +536,4 @@
type systemClock struct {
since time.Time
schedule chan scheduleInfo
wg sync.WaitGroup
What do you think about using a channel instead of a
WaitGroup
? I think it'll be enough for your current purposesI prefer wait group)
@ -0,0 +19,4 @@
defaultLimit float64 = 100_000
shortReservation float64 = 1
medReservation float64 = 100
largeReservation float64 = 100_00
I think
10_000
would look betterFixed
@ -0,0 +66,4 @@
if limit != nil {
limitStr = strconv.FormatFloat(*limit, 'f', 1, 64)
}
b.Run(fmt.Sprintf("mclock, %s limit, %s reservation, %d parallelism, %d tags", limitStr, resStr, parallelism, tags), func(b *testing.B) {
Could we use benchstat syntax for logging benchmark parameters?
TrueCloudLab/frostfs-node#1472
https://pkg.go.dev/golang.org/x/perf/benchproc/syntax
@ -0,0 +289,4 @@
defer q.mtx.Unlock()
}
if q.inProgress >= q.runLimit {
When can
inProgress
be greater thanrunLimit
?These values can be equal. Greater check is just foolproof.
@ -0,0 +394,4 @@
heap.Remove(q.readyQueue, r.readyIdx)
}
if r.reservationIdx != invalidIndex {
heap.Remove(q.reservationQueue, r.readyIdx)
You used
readyIdx
instead ofreservationIdx
. Typo?Thx, fixed!
180e62c296
to62d5c81455
Excellent idea 👍
@ -0,0 +313,4 @@
nextTs = q.limitQueue.items[0].ts()
}
if q.lastSchedule < now && q.lastSchedule > nextTs {
What does this condition mean?
Good question! It was an error.
It must be right so:
if q.lastSchedule > nextTs
This means that nextTs is closer than already scheduled, so we must overwrite it.
020829ced1
to90052efd34
90052efd34
to095b24f865
@ -0,0 +55,4 @@
}
// Release is the type of function that should be called after the request is completed.
type Release func()
ReleaseFunc
?Done
@ -0,0 +106,4 @@
tagInfo: tagInfo,
reservationQueue: &queue{
items: make([]queueItem, 0),
make([]queueItem, 0)
looks like an anti-pattern to me, why don't we usenil
and omit this line?Done
@ -0,0 +176,4 @@
return ErrInvalidRunLimit
}
for _, v := range tagInfo {
if v.Limit != nil && *v.Limit <= float64(0) {
Hm, is
float64
obligatory here?Also, is there any reason we use floats instead of ints?
Algorithm uses division, and the weights can be arbitrary. That's why I chose float64.
I thought about it too
Your implementation uses seconds (float64 in Go) as a time unit. I think nanoseconds (int64 in Go) could be used too with some changes in the algorithm
For example, take the following formula
Let's add units for each value
As I understand,
1\ \mathrm{IOPS} = 1\ \mathrm{s}^{-1}
, soThen rewrite all formulas
What do you think?
As for me, using float64 shouldn't lead to any problems, and I personally find it more comfortable working with seconds than with nanoseconds.
But also I changed units for
idleTimeout
and names ofTagInfo
fields.@ -0,0 +182,4 @@
if v.Reservation != nil && *v.Reservation <= float64(0) {
return ErrInvalidTagInfo
}
if v.Shares <= float64(0) {
NaN
will not trigger this condition, though it seems like it should.Fixed
@ -0,0 +428,4 @@
return q.sharesQueue.Len()
}
func assertIndexInvalid(r *request) {
We assert that the index is valid, no?
We assert that all request's indexes are invalid.
What is the goal of such operation?
Just to check that request removed from all queues.
@ -0,0 +491,4 @@
i.r.reservationIdx = idx
}
var _ queueItem = &limitMQueueItem{}
Have you considered splitting code between multiple files? (queue-related, request-related, clock-related etc.)
Done
095b24f865
tocb2263035a
@ -0,0 +396,4 @@
// waitingCount is for using in tests only.
//
// nolint: unused
Still need this?
Yes:
waitingCount
is used by tests only, but linter excludes test files.Can we implement it in test file then?
Ok, done
cb2263035a
tob9abb25e2d
time.Duration
for idle timeout7ca4c76093
to20a882fc37
20a882fc37
to2e388f14ca
@ -0,0 +116,4 @@
// request with the tag is scheduled for execution,
// context ctx is canceled or the scheduler is closed.
// If the method call returned non-nil ReleaseFunc,
// then it should be called after the request is completed.
Probably
s/should/MUST/
?Checking my understanding: if it is not called, the space is not released and the queue is eventually saturated.
However,
Close()
will release all requests, without surprises.Right?
If it is not called, the count of in progress requests will not decreased and the queue is eventually saturated.
Close()
will release all requests, without surprises.@ -0,0 +247,4 @@
}
minShare := q.sharesQueue.items[0].ts()
for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed
limitItem := (item).(*limitMQueueItem)
Why do you have brackets around
item
?Fixed
@ -0,0 +262,4 @@
}
func (q *MClock) scheduleRequest(lockTaken bool) {
if !lockTaken {
IMO making 2 separate functions would be more readable (i.e.
schedule() { lock(); defer unlock(); scheduleUnsafe() }
andscheduleUnsafe()
), because it is not obvious what this bool value means without looking at the function definition.Done
@ -0,0 +297,4 @@
if q.timeBasedScheduleTs > nextTs {
q.clock.runAt(nextTs, func() {
q.scheduleRequest(false)
There is a possible call-chain
runAt()
->scheduleRequest()
->setNextScheduleTimer()
->runAt()
, where the lastrunAt
call blocks on a non-bufferedschedule
channel.Why does this not happen? Can we make it more obvious?
Thank you very much! This is really a bug!
Fixed:
systemClock
's timer created stoppedscheduleRequest
called only when timer is fired insidesystemClock
runAt
hasdefault
branch, to if timer fired and otherscheduleRequest
called, we just skip push toschedule
channel, because anotherrunAt
call will be performed byscheduleRequest
@ -0,0 +309,4 @@
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
}
for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 {
Just a comment, not expecting any changes in this PR:
This loop is run with under
q.mtx
lock, so arriving requests are blocked by this processing routine.Not saying that this is a problem, but it could become in theory (latency spikes or slow degradation).
We have limited queue size, though. And new requests can be dropped, so the damage is resricted.
@ -0,0 +328,4 @@
ri := i.(*reservationMQueueItem)
if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation {
ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS
updated = true
Have you considered using
heap.Fix(q.reservationQueue, i)
here instead ofheap.Init()
after the loop?I've been thinking about it. But iterating through the elements and changing the queue at the same time can be difficult to understand.
Oh, yes, missed that it can be done multiple times.
@ -0,0 +397,4 @@
func assertIndexInvalid(r *request) {
if r.limitIdx != invalidIndex {
panic("limitIdx is not -1")
I like the practice of writing
panic
. You haveif cond { panic(str) }
, how about introducingassert(cond, str, ...)
?...
is for avoiding concatenating strings unless assertion fails.Done
5a774e7b26
tof4d8ebf13d