251 lines
6.2 KiB
Go
251 lines
6.2 KiB
Go
|
package scheduler
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"github.com/docker/distribution/context"
|
||
|
"github.com/docker/distribution/registry/storage/driver"
|
||
|
)
|
||
|
|
||
|
// onTTLExpiryFunc is called when a repositories' TTL expires
|
||
|
type expiryFunc func(string) error
|
||
|
|
||
|
const (
|
||
|
entryTypeBlob = iota
|
||
|
entryTypeManifest
|
||
|
)
|
||
|
|
||
|
// schedulerEntry represents an entry in the scheduler
|
||
|
// fields are exported for serialization
|
||
|
type schedulerEntry struct {
|
||
|
Key string `json:"Key"`
|
||
|
Expiry time.Time `json:"ExpiryData"`
|
||
|
EntryType int `json:"EntryType"`
|
||
|
}
|
||
|
|
||
|
// New returns a new instance of the scheduler
|
||
|
func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler {
|
||
|
return &TTLExpirationScheduler{
|
||
|
entries: make(map[string]schedulerEntry),
|
||
|
addChan: make(chan schedulerEntry),
|
||
|
stopChan: make(chan bool),
|
||
|
driver: driver,
|
||
|
pathToStateFile: path,
|
||
|
ctx: ctx,
|
||
|
stopped: true,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// TTLExpirationScheduler is a scheduler used to perform actions
|
||
|
// when TTLs expire
|
||
|
type TTLExpirationScheduler struct {
|
||
|
entries map[string]schedulerEntry
|
||
|
addChan chan schedulerEntry
|
||
|
stopChan chan bool
|
||
|
|
||
|
driver driver.StorageDriver
|
||
|
ctx context.Context
|
||
|
pathToStateFile string
|
||
|
|
||
|
stopped bool
|
||
|
|
||
|
onBlobExpire expiryFunc
|
||
|
onManifestExpire expiryFunc
|
||
|
}
|
||
|
|
||
|
// addChan allows more TTLs to be pushed to the scheduler
|
||
|
type addChan chan schedulerEntry
|
||
|
|
||
|
// stopChan allows the scheduler to be stopped - used for testing.
|
||
|
type stopChan chan bool
|
||
|
|
||
|
// OnBlobExpire is called when a scheduled blob's TTL expires
|
||
|
func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) {
|
||
|
ttles.onBlobExpire = f
|
||
|
}
|
||
|
|
||
|
// OnManifestExpire is called when a scheduled manifest's TTL expires
|
||
|
func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
|
||
|
ttles.onManifestExpire = f
|
||
|
}
|
||
|
|
||
|
// AddBlob schedules a blob cleanup after ttl expires
|
||
|
func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error {
|
||
|
if ttles.stopped {
|
||
|
return fmt.Errorf("scheduler not started")
|
||
|
}
|
||
|
ttles.add(dgst, ttl, entryTypeBlob)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// AddManifest schedules a manifest cleanup after ttl expires
|
||
|
func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error {
|
||
|
if ttles.stopped {
|
||
|
return fmt.Errorf("scheduler not started")
|
||
|
}
|
||
|
|
||
|
ttles.add(repoName, ttl, entryTypeManifest)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Start starts the scheduler
|
||
|
func (ttles *TTLExpirationScheduler) Start() error {
|
||
|
return ttles.start()
|
||
|
}
|
||
|
|
||
|
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
|
||
|
entry := schedulerEntry{
|
||
|
Key: key,
|
||
|
Expiry: time.Now().Add(ttl),
|
||
|
EntryType: eType,
|
||
|
}
|
||
|
ttles.addChan <- entry
|
||
|
}
|
||
|
|
||
|
func (ttles *TTLExpirationScheduler) stop() {
|
||
|
ttles.stopChan <- true
|
||
|
}
|
||
|
|
||
|
func (ttles *TTLExpirationScheduler) start() error {
|
||
|
err := ttles.readState()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if !ttles.stopped {
|
||
|
return fmt.Errorf("Scheduler already started")
|
||
|
}
|
||
|
|
||
|
context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...")
|
||
|
ttles.stopped = false
|
||
|
go ttles.mainloop()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// mainloop uses a select statement to listen for events. Most of its time
|
||
|
// is spent in waiting on a TTL to expire but can be interrupted when TTLs
|
||
|
// are added.
|
||
|
func (ttles *TTLExpirationScheduler) mainloop() {
|
||
|
for {
|
||
|
if ttles.stopped {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
nextEntry, ttl := nextExpiringEntry(ttles.entries)
|
||
|
if len(ttles.entries) == 0 {
|
||
|
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Nothing to do, sleeping...")
|
||
|
} else {
|
||
|
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Sleeping for %s until cleanup of %s", ttl, nextEntry.Key)
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case <-time.After(ttl):
|
||
|
var f expiryFunc
|
||
|
|
||
|
switch nextEntry.EntryType {
|
||
|
case entryTypeBlob:
|
||
|
f = ttles.onBlobExpire
|
||
|
case entryTypeManifest:
|
||
|
f = ttles.onManifestExpire
|
||
|
default:
|
||
|
f = func(repoName string) error {
|
||
|
return fmt.Errorf("Unexpected scheduler entry type")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err := f(nextEntry.Key); err != nil {
|
||
|
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", nextEntry.Key, err)
|
||
|
}
|
||
|
|
||
|
delete(ttles.entries, nextEntry.Key)
|
||
|
if err := ttles.writeState(); err != nil {
|
||
|
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||
|
}
|
||
|
case entry := <-ttles.addChan:
|
||
|
context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
|
||
|
ttles.entries[entry.Key] = entry
|
||
|
if err := ttles.writeState(); err != nil {
|
||
|
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||
|
}
|
||
|
break
|
||
|
|
||
|
case <-ttles.stopChan:
|
||
|
if err := ttles.writeState(); err != nil {
|
||
|
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||
|
}
|
||
|
ttles.stopped = true
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func nextExpiringEntry(entries map[string]schedulerEntry) (*schedulerEntry, time.Duration) {
|
||
|
if len(entries) == 0 {
|
||
|
return nil, 24 * time.Hour
|
||
|
}
|
||
|
|
||
|
// todo:(richardscothern) this is a primitive o(n) algorithm
|
||
|
// but n will never be *that* big and it's all in memory. Investigate
|
||
|
// time.AfterFunc for heap based expiries
|
||
|
|
||
|
first := true
|
||
|
var nextEntry schedulerEntry
|
||
|
for _, entry := range entries {
|
||
|
if first {
|
||
|
nextEntry = entry
|
||
|
first = false
|
||
|
continue
|
||
|
}
|
||
|
if entry.Expiry.Before(nextEntry.Expiry) {
|
||
|
nextEntry = entry
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Dates may be from the past if the scheduler has
|
||
|
// been restarted, set their ttl to 0
|
||
|
if nextEntry.Expiry.Before(time.Now()) {
|
||
|
nextEntry.Expiry = time.Now()
|
||
|
return &nextEntry, 0
|
||
|
}
|
||
|
|
||
|
return &nextEntry, nextEntry.Expiry.Sub(time.Now())
|
||
|
}
|
||
|
|
||
|
func (ttles *TTLExpirationScheduler) writeState() error {
|
||
|
jsonBytes, err := json.Marshal(ttles.entries)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
err = ttles.driver.PutContent(ttles.ctx, ttles.pathToStateFile, jsonBytes)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ttles *TTLExpirationScheduler) readState() error {
|
||
|
if _, err := ttles.driver.Stat(ttles.ctx, ttles.pathToStateFile); err != nil {
|
||
|
switch err := err.(type) {
|
||
|
case driver.PathNotFoundError:
|
||
|
return nil
|
||
|
default:
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
bytes, err := ttles.driver.GetContent(ttles.ctx, ttles.pathToStateFile)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
err = json.Unmarshal(bytes, &ttles.entries)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|