mclock: Fix timer-based scheduling #12
4 changed files with 69 additions and 34 deletions
|
@ -1,6 +1,7 @@
|
||||||
package scheduling
|
package scheduling
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -36,10 +37,7 @@ func (c *systemClock) now() float64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *systemClock) runAt(ts float64, f func()) {
|
func (c *systemClock) runAt(ts float64, f func()) {
|
||||||
select {
|
c.schedule <- scheduleInfo{ts: ts, f: f}
|
||||||
case c.schedule <- scheduleInfo{ts: ts, f: f}:
|
|
||||||
default: // timer fired, scheduleRequest will call runAt again
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *systemClock) close() {
|
func (c *systemClock) close() {
|
||||||
|
@ -53,18 +51,30 @@ func (c *systemClock) start() {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
t := time.NewTimer(0)
|
t := time.NewTimer(0)
|
||||||
<-t.C
|
<-t.C
|
||||||
var f func()
|
currentTs := math.MaxFloat64
|
||||||
|
var currentTask func()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
if f != nil {
|
if currentTask != nil {
|
||||||
|
c.wg.Add(1)
|
||||||
|
f := currentTask
|
||||||
|
go func() {
|
||||||
|
|||||||
|
defer c.wg.Done()
|
||||||
f()
|
f()
|
||||||
f = nil
|
}()
|
||||||
|
currentTask = nil
|
||||||
}
|
}
|
||||||
|
currentTs = math.MaxFloat64
|
||||||
case s, ok := <-c.schedule:
|
case s, ok := <-c.schedule:
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.ts >= currentTs {
|
||||||
|
// current timer will fire earlier
|
||||||
|
// so next scheduleRequest will push new schedule event
|
||||||
|
continue
|
||||||
aarifullin marked this conversation as resolved
aarifullin
commented
But what does happen to the scheduled task? It's gone? Will it be rescheduled? But what does happen to the scheduled task? It's gone? Will it be rescheduled?
dstepanov-yadro
commented
current timer will fire and call current timer will fire and call `scheduleRequest`, and `scheduleRequest` will set timer again on `s.ts`
|
|||||||
|
}
|
||||||
var d time.Duration
|
var d time.Duration
|
||||||
now := c.now()
|
now := c.now()
|
||||||
if now < s.ts {
|
if now < s.ts {
|
||||||
|
@ -77,7 +87,8 @@ func (c *systemClock) start() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.Reset(d)
|
t.Reset(d)
|
||||||
f = s.f
|
currentTask = s.f
|
||||||
|
currentTs = s.ts
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -64,7 +64,6 @@ type MClock struct {
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
previous map[string]*request
|
previous map[string]*request
|
||||||
inProgress uint64
|
inProgress uint64
|
||||||
timeBasedScheduleTs float64
|
|
||||||
reservationQueue *queue
|
reservationQueue *queue
|
||||||
limitQueue *queue
|
limitQueue *queue
|
||||||
sharesQueue *queue
|
sharesQueue *queue
|
||||||
|
@ -96,7 +95,6 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo
|
||||||
limitQueue: &queue{},
|
limitQueue: &queue{},
|
||||||
sharesQueue: &queue{},
|
sharesQueue: &queue{},
|
||||||
readyQueue: &queue{},
|
readyQueue: &queue{},
|
||||||
timeBasedScheduleTs: math.MaxFloat64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
previous := make(map[string]*request)
|
previous := make(map[string]*request)
|
||||||
|
@ -139,15 +137,15 @@ func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, e
|
||||||
// No new requests for scheduling will be accepted after the closing.
|
// No new requests for scheduling will be accepted after the closing.
|
||||||
func (q *MClock) Close() {
|
func (q *MClock) Close() {
|
||||||
q.mtx.Lock()
|
q.mtx.Lock()
|
||||||
defer q.mtx.Unlock()
|
|
||||||
|
|
||||||
q.closed = true
|
q.closed = true
|
||||||
q.clock.close()
|
|
||||||
for q.limitQueue.Len() > 0 {
|
for q.limitQueue.Len() > 0 {
|
||||||
item := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
item := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
||||||
close(item.r.canceled)
|
close(item.r.canceled)
|
||||||
q.removeFromQueues(item.r)
|
q.removeFromQueues(item.r)
|
||||||
}
|
}
|
||||||
|
q.mtx.Unlock()
|
||||||
|
|
||||||
|
q.clock.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
|
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
|
||||||
|
@ -263,6 +261,10 @@ func (q *MClock) scheduleRequest() {
|
||||||
q.mtx.Lock()
|
q.mtx.Lock()
|
||||||
defer q.mtx.Unlock()
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
if q.closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
q.scheduleRequestUnsafe()
|
q.scheduleRequestUnsafe()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,23 +286,23 @@ func (q *MClock) scheduleRequestUnsafe() {
|
||||||
|
|
||||||
func (q *MClock) setNextScheduleTimer(now float64) {
|
func (q *MClock) setNextScheduleTimer(now float64) {
|
||||||
nextTs := math.MaxFloat64
|
nextTs := math.MaxFloat64
|
||||||
|
var hasNext bool
|
||||||
if q.reservationQueue.Len() > 0 {
|
if q.reservationQueue.Len() > 0 {
|
||||||
nextTs = q.reservationQueue.items[0].ts()
|
nextTs = q.reservationQueue.items[0].ts()
|
||||||
|
hasNext = true
|
||||||
}
|
}
|
||||||
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
|
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
|
||||||
nextTs = q.limitQueue.items[0].ts()
|
nextTs = q.limitQueue.items[0].ts()
|
||||||
|
hasNext = true
|
||||||
}
|
}
|
||||||
if nextTs <= now {
|
if nextTs <= now {
|
||||||
// should not happen as we always compare .ts() <= now
|
// should not happen as we always compare .ts() <= now
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if !hasNext {
|
||||||
if q.timeBasedScheduleTs > nextTs {
|
return
|
||||||
q.clock.runAt(nextTs, func() {
|
|
||||||
q.scheduleRequest()
|
|
||||||
})
|
|
||||||
q.timeBasedScheduleTs = nextTs
|
|
||||||
}
|
}
|
||||||
|
q.clock.runAt(nextTs, q.scheduleRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
||||||
|
|
|
@ -493,3 +493,25 @@ func TestMClockTimeBasedSchedule(t *testing.T) {
|
||||||
close(checked)
|
close(checked)
|
||||||
require.NoError(t, eg.Wait())
|
require.NoError(t, eg.Wait())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMClockLowLimit(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
limit := 2.0
|
||||||
|
q, err := NewMClock(100, 100, map[string]TagInfo{
|
||||||
|
"class1": {Share: 50, LimitIOPS: &limit},
|
||||||
|
}, 5*time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer q.Close()
|
||||||
|
|
||||||
|
eg, ctx := errgroup.WithContext(context.Background())
|
||||||
|
eg.SetLimit(5)
|
||||||
|
eg.Go(func() error {
|
||||||
|
for range 3 {
|
||||||
|
release, err := q.RequestArrival(ctx, "class1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
release()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
}
|
||||||
|
|
|
@ -11,8 +11,8 @@ const (
|
||||||
ioTagHeader = "x-frostfs-io-tag"
|
ioTagHeader = "x-frostfs-io-tag"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewUnaryClientInteceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata.
|
// NewUnaryClientInterceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata.
|
||||||
func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor {
|
func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor {
|
||||||
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||||
return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...)
|
return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue
Run f() on separate goroutine to not to stuck on channel push.