From 33428c37e1075630825801591eb138de72acf0d3 Mon Sep 17 00:00:00 2001 From: Richard Scothern Date: Fri, 23 Oct 2015 15:25:42 -0700 Subject: [PATCH] Buffer writing the scheduler entry state to disk by periodically checking for changes to the entries index and saving it to the filesystem. Signed-off-by: Richard Scothern --- registry/proxy/scheduler/scheduler.go | 45 ++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/registry/proxy/scheduler/scheduler.go b/registry/proxy/scheduler/scheduler.go index 6af777cc4..e91920a1d 100644 --- a/registry/proxy/scheduler/scheduler.go +++ b/registry/proxy/scheduler/scheduler.go @@ -16,6 +16,7 @@ type expiryFunc func(string) error const ( entryTypeBlob = iota entryTypeManifest + indexSaveFrequency = 5 * time.Second ) // schedulerEntry represents an entry in the scheduler @@ -36,6 +37,8 @@ func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpi pathToStateFile: path, ctx: ctx, stopped: true, + doneChan: make(chan struct{}), + saveTimer: time.NewTicker(indexSaveFrequency), } } @@ -54,6 +57,10 @@ type TTLExpirationScheduler struct { onBlobExpire expiryFunc onManifestExpire expiryFunc + + indexDirty bool + saveTimer *time.Ticker + doneChan chan struct{} } // OnBlobExpire is called when a scheduled blob's TTL expires @@ -119,6 +126,31 @@ func (ttles *TTLExpirationScheduler) Start() error { entry.timer = ttles.startTimer(entry, entry.Expiry.Sub(time.Now())) } + // Start a ticker to periodically save the entries index + + go func() { + for { + select { + case <-ttles.saveTimer.C: + if !ttles.indexDirty { + continue + } + + ttles.Lock() + err := ttles.writeState() + if err != nil { + context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) + } else { + ttles.indexDirty = false + } + ttles.Unlock() + + case <-ttles.doneChan: + return + } + } + }() + return nil } @@ -134,10 +166,7 @@ func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType in } ttles.entries[key] = entry entry.timer = ttles.startTimer(entry, ttl) - - if err := ttles.writeState(); err != nil { - context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) - } + ttles.indexDirty = true } func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.Duration) *time.Timer { @@ -163,9 +192,7 @@ func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time. } delete(ttles.entries, entry.Key) - if err := ttles.writeState(); err != nil { - context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) - } + ttles.indexDirty = true }) } @@ -181,6 +208,9 @@ func (ttles *TTLExpirationScheduler) Stop() { for _, entry := range ttles.entries { entry.timer.Stop() } + + close(ttles.doneChan) + ttles.saveTimer.Stop() ttles.stopped = true } @@ -194,6 +224,7 @@ func (ttles *TTLExpirationScheduler) writeState() error { if err != nil { return err } + return nil }