diff --git a/docs/proxy/scheduler/scheduler.go b/docs/proxy/scheduler/scheduler.go index 056b148ad..6af777cc4 100644 --- a/docs/proxy/scheduler/scheduler.go +++ b/docs/proxy/scheduler/scheduler.go @@ -3,13 +3,14 @@ package scheduler import ( "encoding/json" "fmt" + "sync" "time" "github.com/docker/distribution/context" "github.com/docker/distribution/registry/storage/driver" ) -// onTTLExpiryFunc is called when a repositories' TTL expires +// onTTLExpiryFunc is called when a repository's TTL expires type expiryFunc func(string) error const ( @@ -23,14 +24,14 @@ type schedulerEntry struct { Key string `json:"Key"` Expiry time.Time `json:"ExpiryData"` EntryType int `json:"EntryType"` + + timer *time.Timer } // New returns a new instance of the scheduler func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler { return &TTLExpirationScheduler{ - entries: make(map[string]schedulerEntry), - addChan: make(chan schedulerEntry), - stopChan: make(chan bool), + entries: make(map[string]*schedulerEntry), driver: driver, pathToStateFile: path, ctx: ctx, @@ -41,9 +42,9 @@ func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpi // TTLExpirationScheduler is a scheduler used to perform actions // when TTLs expire type TTLExpirationScheduler struct { - entries map[string]schedulerEntry - addChan chan schedulerEntry - stopChan chan bool + sync.Mutex + + entries map[string]*schedulerEntry driver driver.StorageDriver ctx context.Context @@ -55,24 +56,27 @@ type TTLExpirationScheduler struct { onManifestExpire expiryFunc } -// addChan allows more TTLs to be pushed to the scheduler -type addChan chan schedulerEntry - -// stopChan allows the scheduler to be stopped - used for testing. -type stopChan chan bool - // OnBlobExpire is called when a scheduled blob's TTL expires func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) { + ttles.Lock() + defer ttles.Unlock() + ttles.onBlobExpire = f } // OnManifestExpire is called when a scheduled manifest's TTL expires func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) { + ttles.Lock() + defer ttles.Unlock() + ttles.onManifestExpire = f } // AddBlob schedules a blob cleanup after ttl expires func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error { + ttles.Lock() + defer ttles.Unlock() + if ttles.stopped { return fmt.Errorf("scheduler not started") } @@ -82,6 +86,9 @@ func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) err // AddManifest schedules a manifest cleanup after ttl expires func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error { + ttles.Lock() + defer ttles.Unlock() + if ttles.stopped { return fmt.Errorf("scheduler not started") } @@ -92,23 +99,9 @@ func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Durat // Start starts the scheduler func (ttles *TTLExpirationScheduler) Start() error { - return ttles.start() -} + ttles.Lock() + defer ttles.Unlock() -func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) { - entry := schedulerEntry{ - Key: key, - Expiry: time.Now().Add(ttl), - EntryType: eType, - } - ttles.addChan <- entry -} - -func (ttles *TTLExpirationScheduler) stop() { - ttles.stopChan <- true -} - -func (ttles *TTLExpirationScheduler) start() error { err := ttles.readState() if err != nil { return err @@ -120,97 +113,75 @@ func (ttles *TTLExpirationScheduler) start() error { context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...") ttles.stopped = false - go ttles.mainloop() + + // Start timer for each deserialized entry + for _, entry := range ttles.entries { + entry.timer = ttles.startTimer(entry, entry.Expiry.Sub(time.Now())) + } return nil } -// mainloop uses a select statement to listen for events. Most of its time -// is spent in waiting on a TTL to expire but can be interrupted when TTLs -// are added. -func (ttles *TTLExpirationScheduler) mainloop() { - for { - if ttles.stopped { - return - } +func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) { + entry := &schedulerEntry{ + Key: key, + Expiry: time.Now().Add(ttl), + EntryType: eType, + } + context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now())) + if oldEntry, present := ttles.entries[key]; present && oldEntry.timer != nil { + oldEntry.timer.Stop() + } + ttles.entries[key] = entry + entry.timer = ttles.startTimer(entry, ttl) - nextEntry, ttl := nextExpiringEntry(ttles.entries) - if len(ttles.entries) == 0 { - context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Nothing to do, sleeping...") - } else { - context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Sleeping for %s until cleanup of %s", ttl, nextEntry.Key) - } - - select { - case <-time.After(ttl): - var f expiryFunc - - switch nextEntry.EntryType { - case entryTypeBlob: - f = ttles.onBlobExpire - case entryTypeManifest: - f = ttles.onManifestExpire - default: - f = func(repoName string) error { - return fmt.Errorf("Unexpected scheduler entry type") - } - } - - if err := f(nextEntry.Key); err != nil { - context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", nextEntry.Key, err) - } - - delete(ttles.entries, nextEntry.Key) - if err := ttles.writeState(); err != nil { - context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) - } - case entry := <-ttles.addChan: - context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now())) - ttles.entries[entry.Key] = entry - if err := ttles.writeState(); err != nil { - context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) - } - break - - case <-ttles.stopChan: - if err := ttles.writeState(); err != nil { - context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) - } - ttles.stopped = true - } + if err := ttles.writeState(); err != nil { + context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) } } -func nextExpiringEntry(entries map[string]schedulerEntry) (*schedulerEntry, time.Duration) { - if len(entries) == 0 { - return nil, 24 * time.Hour - } +func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.Duration) *time.Timer { + return time.AfterFunc(ttl, func() { + ttles.Lock() + defer ttles.Unlock() - // todo:(richardscothern) this is a primitive o(n) algorithm - // but n will never be *that* big and it's all in memory. Investigate - // time.AfterFunc for heap based expiries + var f expiryFunc - first := true - var nextEntry schedulerEntry - for _, entry := range entries { - if first { - nextEntry = entry - first = false - continue + switch entry.EntryType { + case entryTypeBlob: + f = ttles.onBlobExpire + case entryTypeManifest: + f = ttles.onManifestExpire + default: + f = func(repoName string) error { + return fmt.Errorf("Unexpected scheduler entry type") + } } - if entry.Expiry.Before(nextEntry.Expiry) { - nextEntry = entry + + if err := f(entry.Key); err != nil { + context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err) } + + delete(ttles.entries, entry.Key) + if err := ttles.writeState(); err != nil { + context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) + } + }) +} + +// Stop stops the scheduler. +func (ttles *TTLExpirationScheduler) Stop() { + ttles.Lock() + defer ttles.Unlock() + + if err := ttles.writeState(); err != nil { + context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) } - // Dates may be from the past if the scheduler has - // been restarted, set their ttl to 0 - if nextEntry.Expiry.Before(time.Now()) { - nextEntry.Expiry = time.Now() - return &nextEntry, 0 + for _, entry := range ttles.entries { + entry.timer.Stop() } - - return &nextEntry, nextEntry.Expiry.Sub(time.Now()) + ttles.stopped = true } func (ttles *TTLExpirationScheduler) writeState() error { diff --git a/docs/proxy/scheduler/scheduler_test.go b/docs/proxy/scheduler/scheduler_test.go index fb5479f01..00072ed2c 100644 --- a/docs/proxy/scheduler/scheduler_test.go +++ b/docs/proxy/scheduler/scheduler_test.go @@ -2,7 +2,6 @@ package scheduler import ( "encoding/json" - "fmt" "testing" "time" @@ -27,13 +26,13 @@ func TestSchedule(t *testing.T) { if !ok { t.Fatalf("Trying to remove nonexistant repo: %s", repoName) } - fmt.Println("removing", repoName) + t.Log("removing", repoName) delete(remainingRepos, repoName) return nil } s.onBlobExpire = deleteFunc - err := s.start() + err := s.Start() if err != nil { t.Fatalf("Error starting ttlExpirationScheduler: %s", err) } @@ -97,7 +96,7 @@ func TestRestoreOld(t *testing.T) { } s := New(context.Background(), fs, "/ttl") s.onBlobExpire = deleteFunc - err = s.start() + err = s.Start() if err != nil { t.Fatalf("Error starting ttlExpirationScheduler: %s", err) } @@ -124,7 +123,7 @@ func TestStopRestore(t *testing.T) { s := New(context.Background(), fs, pathToStateFile) s.onBlobExpire = deleteFunc - err := s.start() + err := s.Start() if err != nil { t.Fatalf(err.Error()) } @@ -133,13 +132,13 @@ func TestStopRestore(t *testing.T) { // Start and stop before all operations complete // state will be written to fs - s.stop() + s.Stop() time.Sleep(10 * time.Millisecond) // v2 will restore state from fs s2 := New(context.Background(), fs, pathToStateFile) s2.onBlobExpire = deleteFunc - err = s2.start() + err = s2.Start() if err != nil { t.Fatalf("Error starting v2: %s", err.Error()) } @@ -153,12 +152,11 @@ func TestStopRestore(t *testing.T) { func TestDoubleStart(t *testing.T) { s := New(context.Background(), inmemory.New(), "/ttl") - err := s.start() + err := s.Start() if err != nil { t.Fatalf("Unable to start scheduler") } - fmt.Printf("%#v", s) - err = s.start() + err = s.Start() if err == nil { t.Fatalf("Scheduler started twice without error") }