Stop proxy scheduler on system exit (#4293)
This commit is contained in:
commit
56a020f7f1
5 changed files with 40 additions and 8 deletions
|
@ -430,6 +430,14 @@ func (app *App) RegisterHealthChecks(healthRegistries ...*health.Registry) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Shutdown close the underlying registry
|
||||||
|
func (app *App) Shutdown() error {
|
||||||
|
if r, ok := app.registry.(proxy.Closer); ok {
|
||||||
|
return r.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// register a handler with the application, by route name. The handler will be
|
// register a handler with the application, by route name. The handler will be
|
||||||
// passed through the application filters and context will be constructed at
|
// passed through the application filters and context will be constructed at
|
||||||
// request time.
|
// request time.
|
||||||
|
|
|
@ -211,6 +211,15 @@ func (pr *proxyingRegistry) BlobStatter() distribution.BlobStatter {
|
||||||
return pr.embedded.BlobStatter()
|
return pr.embedded.BlobStatter()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Closer interface {
|
||||||
|
// Close release all resources used by this object
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *proxyingRegistry) Close() error {
|
||||||
|
return pr.scheduler.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
// authChallenger encapsulates a request to the upstream to establish credential challenges
|
// authChallenger encapsulates a request to the upstream to establish credential challenges
|
||||||
type authChallenger interface {
|
type authChallenger interface {
|
||||||
tryEstablishChallenges(context.Context) error
|
tryEstablishChallenges(context.Context) error
|
||||||
|
|
|
@ -206,12 +206,13 @@ func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops the scheduler.
|
// Stop stops the scheduler.
|
||||||
func (ttles *TTLExpirationScheduler) Stop() {
|
func (ttles *TTLExpirationScheduler) Stop() error {
|
||||||
ttles.Lock()
|
ttles.Lock()
|
||||||
defer ttles.Unlock()
|
defer ttles.Unlock()
|
||||||
|
|
||||||
if err := ttles.writeState(); err != nil {
|
err := ttles.writeState()
|
||||||
dcontext.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
if err != nil {
|
||||||
|
err = fmt.Errorf("error writing scheduler state: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, entry := range ttles.entries {
|
for _, entry := range ttles.entries {
|
||||||
|
@ -221,6 +222,7 @@ func (ttles *TTLExpirationScheduler) Stop() {
|
||||||
close(ttles.doneChan)
|
close(ttles.doneChan)
|
||||||
ttles.saveTimer.Stop()
|
ttles.saveTimer.Stop()
|
||||||
ttles.stopped = true
|
ttles.stopped = true
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ttles *TTLExpirationScheduler) writeState() error {
|
func (ttles *TTLExpirationScheduler) writeState() error {
|
||||||
|
|
|
@ -136,7 +136,12 @@ func TestRestoreOld(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
||||||
}
|
}
|
||||||
defer s.Stop()
|
defer func(s *TTLExpirationScheduler) {
|
||||||
|
err := s.Stop()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error stopping ttlExpirationScheduler: %s", err)
|
||||||
|
}
|
||||||
|
}(s)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
|
@ -177,7 +182,10 @@ func TestStopRestore(t *testing.T) {
|
||||||
|
|
||||||
// Start and stop before all operations complete
|
// Start and stop before all operations complete
|
||||||
// state will be written to fs
|
// state will be written to fs
|
||||||
s.Stop()
|
err = s.Stop()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf(err.Error())
|
||||||
|
}
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
// v2 will restore state from fs
|
// v2 will restore state from fs
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
@ -312,7 +313,7 @@ func (registry *Registry) ListenAndServe() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setup channel to get notified on SIGTERM signal
|
// setup channel to get notified on SIGTERM signal
|
||||||
signal.Notify(registry.quit, syscall.SIGTERM)
|
signal.Notify(registry.quit, os.Interrupt, syscall.SIGTERM)
|
||||||
serveErr := make(chan error)
|
serveErr := make(chan error)
|
||||||
|
|
||||||
// Start serving in goroutine and listen for stop signal in main thread
|
// Start serving in goroutine and listen for stop signal in main thread
|
||||||
|
@ -332,9 +333,13 @@ func (registry *Registry) ListenAndServe() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown gracefully shuts down the registry's HTTP server.
|
// Shutdown gracefully shuts down the registry's HTTP server and application object.
|
||||||
func (registry *Registry) Shutdown(ctx context.Context) error {
|
func (registry *Registry) Shutdown(ctx context.Context) error {
|
||||||
return registry.server.Shutdown(ctx)
|
err := registry.server.Shutdown(ctx)
|
||||||
|
if appErr := registry.app.Shutdown(); appErr != nil {
|
||||||
|
err = errors.Join(err, appErr)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func configureDebugServer(config *configuration.Configuration) {
|
func configureDebugServer(config *configuration.Configuration) {
|
||||||
|
|
Loading…
Reference in a new issue