forked from TrueCloudLab/distribution
Simplify proxy scheduler
The proxy scheduler implemented its own timer state machine. It's simpler and more efficient to leverage the Go runtime's timer heap by using time.AfterFunc. This commit adds a time.Timer to each scheduler entry, and starts and stops those timers as necessary. Then the mainloop goroutine and its associated logic are not needed. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
parent
fa3c275d63
commit
c56e8c2533
2 changed files with 84 additions and 115 deletions
|
@ -3,13 +3,14 @@ package scheduler
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/registry/storage/driver"
|
||||
)
|
||||
|
||||
// onTTLExpiryFunc is called when a repositories' TTL expires
|
||||
// onTTLExpiryFunc is called when a repository's TTL expires
|
||||
type expiryFunc func(string) error
|
||||
|
||||
const (
|
||||
|
@ -23,14 +24,14 @@ type schedulerEntry struct {
|
|||
Key string `json:"Key"`
|
||||
Expiry time.Time `json:"ExpiryData"`
|
||||
EntryType int `json:"EntryType"`
|
||||
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
// New returns a new instance of the scheduler
|
||||
func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler {
|
||||
return &TTLExpirationScheduler{
|
||||
entries: make(map[string]schedulerEntry),
|
||||
addChan: make(chan schedulerEntry),
|
||||
stopChan: make(chan bool),
|
||||
entries: make(map[string]*schedulerEntry),
|
||||
driver: driver,
|
||||
pathToStateFile: path,
|
||||
ctx: ctx,
|
||||
|
@ -41,9 +42,9 @@ func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpi
|
|||
// TTLExpirationScheduler is a scheduler used to perform actions
|
||||
// when TTLs expire
|
||||
type TTLExpirationScheduler struct {
|
||||
entries map[string]schedulerEntry
|
||||
addChan chan schedulerEntry
|
||||
stopChan chan bool
|
||||
sync.Mutex
|
||||
|
||||
entries map[string]*schedulerEntry
|
||||
|
||||
driver driver.StorageDriver
|
||||
ctx context.Context
|
||||
|
@ -55,24 +56,27 @@ type TTLExpirationScheduler struct {
|
|||
onManifestExpire expiryFunc
|
||||
}
|
||||
|
||||
// addChan allows more TTLs to be pushed to the scheduler
|
||||
type addChan chan schedulerEntry
|
||||
|
||||
// stopChan allows the scheduler to be stopped - used for testing.
|
||||
type stopChan chan bool
|
||||
|
||||
// OnBlobExpire is called when a scheduled blob's TTL expires
|
||||
func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
ttles.onBlobExpire = f
|
||||
}
|
||||
|
||||
// OnManifestExpire is called when a scheduled manifest's TTL expires
|
||||
func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
ttles.onManifestExpire = f
|
||||
}
|
||||
|
||||
// AddBlob schedules a blob cleanup after ttl expires
|
||||
func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
if ttles.stopped {
|
||||
return fmt.Errorf("scheduler not started")
|
||||
}
|
||||
|
@ -82,6 +86,9 @@ func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) err
|
|||
|
||||
// AddManifest schedules a manifest cleanup after ttl expires
|
||||
func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
if ttles.stopped {
|
||||
return fmt.Errorf("scheduler not started")
|
||||
}
|
||||
|
@ -92,23 +99,9 @@ func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Durat
|
|||
|
||||
// Start starts the scheduler
|
||||
func (ttles *TTLExpirationScheduler) Start() error {
|
||||
return ttles.start()
|
||||
}
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
|
||||
entry := schedulerEntry{
|
||||
Key: key,
|
||||
Expiry: time.Now().Add(ttl),
|
||||
EntryType: eType,
|
||||
}
|
||||
ttles.addChan <- entry
|
||||
}
|
||||
|
||||
func (ttles *TTLExpirationScheduler) stop() {
|
||||
ttles.stopChan <- true
|
||||
}
|
||||
|
||||
func (ttles *TTLExpirationScheduler) start() error {
|
||||
err := ttles.readState()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -120,97 +113,75 @@ func (ttles *TTLExpirationScheduler) start() error {
|
|||
|
||||
context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...")
|
||||
ttles.stopped = false
|
||||
go ttles.mainloop()
|
||||
|
||||
// Start timer for each deserialized entry
|
||||
for _, entry := range ttles.entries {
|
||||
entry.timer = ttles.startTimer(entry, entry.Expiry.Sub(time.Now()))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// mainloop uses a select statement to listen for events. Most of its time
|
||||
// is spent in waiting on a TTL to expire but can be interrupted when TTLs
|
||||
// are added.
|
||||
func (ttles *TTLExpirationScheduler) mainloop() {
|
||||
for {
|
||||
if ttles.stopped {
|
||||
return
|
||||
}
|
||||
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
|
||||
entry := &schedulerEntry{
|
||||
Key: key,
|
||||
Expiry: time.Now().Add(ttl),
|
||||
EntryType: eType,
|
||||
}
|
||||
context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
|
||||
if oldEntry, present := ttles.entries[key]; present && oldEntry.timer != nil {
|
||||
oldEntry.timer.Stop()
|
||||
}
|
||||
ttles.entries[key] = entry
|
||||
entry.timer = ttles.startTimer(entry, ttl)
|
||||
|
||||
nextEntry, ttl := nextExpiringEntry(ttles.entries)
|
||||
if len(ttles.entries) == 0 {
|
||||
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Nothing to do, sleeping...")
|
||||
} else {
|
||||
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Sleeping for %s until cleanup of %s", ttl, nextEntry.Key)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(ttl):
|
||||
var f expiryFunc
|
||||
|
||||
switch nextEntry.EntryType {
|
||||
case entryTypeBlob:
|
||||
f = ttles.onBlobExpire
|
||||
case entryTypeManifest:
|
||||
f = ttles.onManifestExpire
|
||||
default:
|
||||
f = func(repoName string) error {
|
||||
return fmt.Errorf("Unexpected scheduler entry type")
|
||||
}
|
||||
}
|
||||
|
||||
if err := f(nextEntry.Key); err != nil {
|
||||
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", nextEntry.Key, err)
|
||||
}
|
||||
|
||||
delete(ttles.entries, nextEntry.Key)
|
||||
if err := ttles.writeState(); err != nil {
|
||||
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||
}
|
||||
case entry := <-ttles.addChan:
|
||||
context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
|
||||
ttles.entries[entry.Key] = entry
|
||||
if err := ttles.writeState(); err != nil {
|
||||
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||
}
|
||||
break
|
||||
|
||||
case <-ttles.stopChan:
|
||||
if err := ttles.writeState(); err != nil {
|
||||
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||
}
|
||||
ttles.stopped = true
|
||||
}
|
||||
if err := ttles.writeState(); err != nil {
|
||||
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func nextExpiringEntry(entries map[string]schedulerEntry) (*schedulerEntry, time.Duration) {
|
||||
if len(entries) == 0 {
|
||||
return nil, 24 * time.Hour
|
||||
}
|
||||
func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.Duration) *time.Timer {
|
||||
return time.AfterFunc(ttl, func() {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
// todo:(richardscothern) this is a primitive o(n) algorithm
|
||||
// but n will never be *that* big and it's all in memory. Investigate
|
||||
// time.AfterFunc for heap based expiries
|
||||
var f expiryFunc
|
||||
|
||||
first := true
|
||||
var nextEntry schedulerEntry
|
||||
for _, entry := range entries {
|
||||
if first {
|
||||
nextEntry = entry
|
||||
first = false
|
||||
continue
|
||||
switch entry.EntryType {
|
||||
case entryTypeBlob:
|
||||
f = ttles.onBlobExpire
|
||||
case entryTypeManifest:
|
||||
f = ttles.onManifestExpire
|
||||
default:
|
||||
f = func(repoName string) error {
|
||||
return fmt.Errorf("Unexpected scheduler entry type")
|
||||
}
|
||||
}
|
||||
if entry.Expiry.Before(nextEntry.Expiry) {
|
||||
nextEntry = entry
|
||||
|
||||
if err := f(entry.Key); err != nil {
|
||||
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
|
||||
}
|
||||
|
||||
delete(ttles.entries, entry.Key)
|
||||
if err := ttles.writeState(); err != nil {
|
||||
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Stop stops the scheduler.
|
||||
func (ttles *TTLExpirationScheduler) Stop() {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
if err := ttles.writeState(); err != nil {
|
||||
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||
}
|
||||
|
||||
// Dates may be from the past if the scheduler has
|
||||
// been restarted, set their ttl to 0
|
||||
if nextEntry.Expiry.Before(time.Now()) {
|
||||
nextEntry.Expiry = time.Now()
|
||||
return &nextEntry, 0
|
||||
for _, entry := range ttles.entries {
|
||||
entry.timer.Stop()
|
||||
}
|
||||
|
||||
return &nextEntry, nextEntry.Expiry.Sub(time.Now())
|
||||
ttles.stopped = true
|
||||
}
|
||||
|
||||
func (ttles *TTLExpirationScheduler) writeState() error {
|
||||
|
|
|
@ -2,7 +2,6 @@ package scheduler
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -27,13 +26,13 @@ func TestSchedule(t *testing.T) {
|
|||
if !ok {
|
||||
t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
|
||||
}
|
||||
fmt.Println("removing", repoName)
|
||||
t.Log("removing", repoName)
|
||||
delete(remainingRepos, repoName)
|
||||
|
||||
return nil
|
||||
}
|
||||
s.onBlobExpire = deleteFunc
|
||||
err := s.start()
|
||||
err := s.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
||||
}
|
||||
|
@ -97,7 +96,7 @@ func TestRestoreOld(t *testing.T) {
|
|||
}
|
||||
s := New(context.Background(), fs, "/ttl")
|
||||
s.onBlobExpire = deleteFunc
|
||||
err = s.start()
|
||||
err = s.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
||||
}
|
||||
|
@ -124,7 +123,7 @@ func TestStopRestore(t *testing.T) {
|
|||
s := New(context.Background(), fs, pathToStateFile)
|
||||
s.onBlobExpire = deleteFunc
|
||||
|
||||
err := s.start()
|
||||
err := s.Start()
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
@ -133,13 +132,13 @@ func TestStopRestore(t *testing.T) {
|
|||
|
||||
// Start and stop before all operations complete
|
||||
// state will be written to fs
|
||||
s.stop()
|
||||
s.Stop()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// v2 will restore state from fs
|
||||
s2 := New(context.Background(), fs, pathToStateFile)
|
||||
s2.onBlobExpire = deleteFunc
|
||||
err = s2.start()
|
||||
err = s2.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Error starting v2: %s", err.Error())
|
||||
}
|
||||
|
@ -153,12 +152,11 @@ func TestStopRestore(t *testing.T) {
|
|||
|
||||
func TestDoubleStart(t *testing.T) {
|
||||
s := New(context.Background(), inmemory.New(), "/ttl")
|
||||
err := s.start()
|
||||
err := s.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to start scheduler")
|
||||
}
|
||||
fmt.Printf("%#v", s)
|
||||
err = s.start()
|
||||
err = s.Start()
|
||||
if err == nil {
|
||||
t.Fatalf("Scheduler started twice without error")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue