From 94935f39bc96c8aa32634ff0996fd1568e33fccd Mon Sep 17 00:00:00 2001 From: Richard Scothern Date: Wed, 29 Jul 2015 11:12:01 -0700 Subject: [PATCH] Add pull through cache functionality to the Registry which can be configured with a new `proxy` section in the configuration file. Create a new registry type which delegates storage to a proxyBlobStore and proxyManifestStore. These stores will pull through data if not present locally. proxyBlobStore takes care not to write duplicate data to disk. Add a scheduler to cleanup expired content. The scheduler runs as a background goroutine. When a blob or manifest is pulled through from the remote registry, an entry is added to the scheduler with a TTL. When the TTL expires the scheduler calls a pre-specified function to remove the fetched resource. Add token authentication to the registry middleware. Get a token at startup and preload the credential store with the username and password supplied in the config file. Allow resumable digest functionality to be disabled at runtime and disable it when the registry is a pull through cache. Signed-off-by: Richard Scothern --- blobs.go | 3 + cmd/registry/config-cache.yml | 48 ++++ cmd/registry/main.go | 1 + configuration/configuration.go | 14 ++ docs/mirror.md | 62 +++++ notifications/listener_test.go | 2 +- registry/client/blob_writer.go | 4 + registry/client/repository.go | 3 +- registry/client/repository_test.go | 2 +- registry/handlers/app.go | 33 ++- registry/handlers/app_test.go | 2 +- registry/middleware/registry/middleware.go | 7 +- registry/middleware/repository/middleware.go | 7 +- registry/proxy/proxyauth.go | 54 ++++ registry/proxy/proxyblobstore.go | 214 ++++++++++++++++ registry/proxy/proxyblobstore_test.go | 231 +++++++++++++++++ registry/proxy/proxymanifeststore.go | 155 ++++++++++++ registry/proxy/proxymanifeststore_test.go | 235 +++++++++++++++++ registry/proxy/proxymetrics.go | 74 ++++++ registry/proxy/proxyregistry.go | 139 +++++++++++ registry/proxy/scheduler/scheduler.go | 250 +++++++++++++++++++ registry/proxy/scheduler/scheduler_test.go | 165 ++++++++++++ registry/storage/blob_test.go | 8 +- registry/storage/blobwriter.go | 28 +++ registry/storage/blobwriter_resumable.go | 8 + registry/storage/catalog_test.go | 2 +- registry/storage/linkedblobstore.go | 22 +- registry/storage/manifeststore_test.go | 4 +- registry/storage/registry.go | 6 +- registry/storage/vacuum.go | 67 +++++ testutil/handler.go | 2 +- 31 files changed, 1812 insertions(+), 40 deletions(-) create mode 100644 cmd/registry/config-cache.yml create mode 100644 docs/mirror.md create mode 100644 registry/proxy/proxyauth.go create mode 100644 registry/proxy/proxyblobstore.go create mode 100644 registry/proxy/proxyblobstore_test.go create mode 100644 registry/proxy/proxymanifeststore.go create mode 100644 registry/proxy/proxymanifeststore_test.go create mode 100644 registry/proxy/proxymetrics.go create mode 100644 registry/proxy/proxyregistry.go create mode 100644 registry/proxy/scheduler/scheduler.go create mode 100644 registry/proxy/scheduler/scheduler_test.go create mode 100644 registry/storage/vacuum.go diff --git a/blobs.go b/blobs.go index ffec41e8a..556bf93e1 100644 --- a/blobs.go +++ b/blobs.go @@ -183,6 +183,9 @@ type BlobWriter interface { // result in a no-op. This allows use of Cancel in a defer statement, // increasing the assurance that it is correctly called. Cancel(ctx context.Context) error + + // Get a reader to the blob being written by this BlobWriter + Reader() (io.ReadCloser, error) } // BlobService combines the operations to access, read and write blobs. This diff --git a/cmd/registry/config-cache.yml b/cmd/registry/config-cache.yml new file mode 100644 index 000000000..0b524043d --- /dev/null +++ b/cmd/registry/config-cache.yml @@ -0,0 +1,48 @@ +version: 0.1 +log: + level: debug + fields: + service: registry + environment: development +storage: + cache: + blobdescriptor: redis + filesystem: + rootdirectory: /var/lib/registry-cache + maintenance: + uploadpurging: + enabled: false +http: + addr: :5000 + secret: asecretforlocaldevelopment + debug: + addr: localhost:5001 +redis: + addr: localhost:6379 + pool: + maxidle: 16 + maxactive: 64 + idletimeout: 300s + dialtimeout: 10ms + readtimeout: 10ms + writetimeout: 10ms +notifications: + endpoints: + - name: local-8082 + url: http://localhost:5003/callback + headers: + Authorization: [Bearer ] + timeout: 1s + threshold: 10 + backoff: 1s + disabled: true + - name: local-8083 + url: http://localhost:8083/callback + timeout: 1s + threshold: 10 + backoff: 1s + disabled: true +proxy: + remoteurl: https://registry-1.docker.io + username: username + password: password diff --git a/cmd/registry/main.go b/cmd/registry/main.go index 7950a448c..894d18ebd 100644 --- a/cmd/registry/main.go +++ b/cmd/registry/main.go @@ -23,6 +23,7 @@ import ( _ "github.com/docker/distribution/registry/auth/token" "github.com/docker/distribution/registry/handlers" "github.com/docker/distribution/registry/listener" + _ "github.com/docker/distribution/registry/proxy" _ "github.com/docker/distribution/registry/storage/driver/azure" _ "github.com/docker/distribution/registry/storage/driver/filesystem" _ "github.com/docker/distribution/registry/storage/driver/inmemory" diff --git a/configuration/configuration.go b/configuration/configuration.go index bb2458a68..502dab3ec 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -128,6 +128,8 @@ type Configuration struct { IdleTimeout time.Duration `yaml:"idletimeout,omitempty"` } `yaml:"pool,omitempty"` } `yaml:"redis,omitempty"` + + Proxy Proxy `yaml:"proxy,omitempty"` } // LogHook is composed of hook Level and Type. @@ -430,6 +432,18 @@ type Middleware struct { Options Parameters `yaml:"options"` } +// Proxy configures the registry as a pull through cache +type Proxy struct { + // RemoteURL is the URL of the remote registry + RemoteURL string `yaml:"remoteurl"` + + // Username of the hub user + Username string `yaml:"username"` + + // Password of the hub user + Password string `yaml:"password"` +} + // Parse parses an input configuration yaml document into a Configuration struct // This should generally be capable of handling old configuration format versions // diff --git a/docs/mirror.md b/docs/mirror.md new file mode 100644 index 000000000..78928401c --- /dev/null +++ b/docs/mirror.md @@ -0,0 +1,62 @@ +# Registry as a pull through cache + +A v2 Registry can be configured as a pull through cache. In this mode a Registry responds to all normal docker pull requests but stores all content locally. + +## Why? + +If you have multiple instances of Docker running in your environment (e.g., multiple physical or virtual machines, all running the Docker daemon), each time one of them requires an image that it doesn’t have it will go out to the internet and fetch it from the public Docker registry. By running a local registry mirror, you can keep most of the image fetch traffic on your local network. + +## How does it work? + +The first time you request an image from your local registry mirror, it pulls the image from the public Docker registry and stores it locally before handing it back to you. On subsequent requests, the local registry mirror is able to serve the image from its own storage. + +## What if the content changes on the Hub? + +When a pull is attempted with a tag, the Registry will check the remote to ensure if it has the latest version of the requested content. If it doesn't it will fetch the latest content and cache it. + +## What about my disk? + +In environments with high churn rates, stale data can build up in the cache. When running as a pull through cache the Registry will periodically remove old content to save disk space. Subsequent requests for removed content will cause a remote fetch and local re-caching. + +To ensure best performance and guarantee correctness the Registry cache should be configured to use the `filesystem` driver for storage. + +## Running a Registry as a pull through cache + +The easiest way to run a registry as a pull through cache is to run the official Registry pull through cache official image. + +Multiple registry caches can be deployed over the same back-end. A single registry cache will ensure that concurrent requests do not pull duplicate data, but this property will not hold true for a registry cache cluster. + +### Configuring the cache + +To configure a Registry to run as a pull through cache, the addition of a `proxy` section is required to the config file. + +In order to access private images on the Docker Hub the username and password can be supplied. + +``` +proxy: + remoteurl: https://registry-1.docker.io + username: [username] + password: [password] +``` + + + +## Configuring the Docker daemon + +You will need to pass the `--registry-mirror` option to your Docker daemon on startup: + +``` +docker --registry-mirror=https:// -d +``` + +For example, if your mirror is serving on http://10.0.0.2:5000, you would run: + +``` +docker --registry-mirror=https://10.0.0.2:5000 -d +``` + +NOTE: Depending on your local host setup, you may be able to add the --registry-mirror options to the `DOCKER_OPTS` variable in `/etc/default/` docker. + + + + diff --git a/notifications/listener_test.go b/notifications/listener_test.go index edbdc858f..ccd845933 100644 --- a/notifications/listener_test.go +++ b/notifications/listener_test.go @@ -18,7 +18,7 @@ import ( func TestListener(t *testing.T) { ctx := context.Background() - registry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) + registry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false) tl := &testListener{ ops: make(map[string]int), } diff --git a/registry/client/blob_writer.go b/registry/client/blob_writer.go index 5f6f01f7f..c7eee4e8c 100644 --- a/registry/client/blob_writer.go +++ b/registry/client/blob_writer.go @@ -25,6 +25,10 @@ type httpBlobUpload struct { closed bool } +func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) { + panic("Not implemented") +} + func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error { if resp.StatusCode == http.StatusNotFound { return distribution.ErrBlobUploadUnknown diff --git a/registry/client/repository.go b/registry/client/repository.go index d0079f092..c1e8e07f1 100644 --- a/registry/client/repository.go +++ b/registry/client/repository.go @@ -280,14 +280,13 @@ func (ms *manifests) GetByTag(tag string, options ...distribution.ManifestServic } if _, ok := ms.etags[tag]; ok { - req.Header.Set("eTag", ms.etags[tag]) + req.Header.Set("If-None-Match", ms.etags[tag]) } resp, err := ms.client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() - if resp.StatusCode == http.StatusNotModified { return nil, nil } else if SuccessStatus(resp.StatusCode) { diff --git a/registry/client/repository_test.go b/registry/client/repository_test.go index 7219fff19..26201763c 100644 --- a/registry/client/repository_test.go +++ b/registry/client/repository_test.go @@ -463,7 +463,7 @@ func addTestManifestWithEtag(repo, reference string, content []byte, m *testutil Method: "GET", Route: "/v2/" + repo + "/manifests/" + reference, Headers: http.Header(map[string][]string{ - "Etag": {fmt.Sprintf(`"%s"`, dgst)}, + "If-None-Match": {fmt.Sprintf(`"%s"`, dgst)}, }), } diff --git a/registry/handlers/app.go b/registry/handlers/app.go index 1fcf13fc9..f60290d09 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -20,6 +20,7 @@ import ( "github.com/docker/distribution/registry/auth" registrymiddleware "github.com/docker/distribution/registry/middleware/registry" repositorymiddleware "github.com/docker/distribution/registry/middleware/repository" + "github.com/docker/distribution/registry/proxy" "github.com/docker/distribution/registry/storage" memorycache "github.com/docker/distribution/registry/storage/cache/memory" rediscache "github.com/docker/distribution/registry/storage/cache/redis" @@ -55,6 +56,9 @@ type App struct { } redis *redis.Pool + + // true if this registry is configured as a pull through cache + isCache bool } // NewApp takes a configuration and returns a configured app, ready to serve @@ -65,6 +69,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App Config: configuration, Context: ctx, router: v2.RouterWithPrefix(configuration.HTTP.Prefix), + isCache: configuration.Proxy.RemoteURL != "", } app.Context = ctxu.WithLogger(app.Context, ctxu.GetLogger(app, "instance.id")) @@ -152,10 +157,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App if app.redis == nil { panic("redis configuration required to use for layerinfo cache") } - app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis), deleteEnabled, !redirectDisabled) + app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis), deleteEnabled, !redirectDisabled, app.isCache) ctxu.GetLogger(app).Infof("using redis blob descriptor cache") case "inmemory": - app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), deleteEnabled, !redirectDisabled) + app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), deleteEnabled, !redirectDisabled, app.isCache) ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache") default: if v != "" { @@ -166,10 +171,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App if app.registry == nil { // configure the registry if no cache section is available. - app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil, deleteEnabled, !redirectDisabled) + app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil, deleteEnabled, !redirectDisabled, app.isCache) } - app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"]) + app.registry, err = applyRegistryMiddleware(app.Context, app.registry, configuration.Middleware["registry"]) if err != nil { panic(err) } @@ -185,6 +190,16 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App ctxu.GetLogger(app).Debugf("configured %q access controller", authType) } + // configure as a pull through cache + if configuration.Proxy.RemoteURL != "" { + app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, configuration.Proxy) + if err != nil { + panic(err.Error()) + } + app.isCache = true + ctxu.GetLogger(app).Info("Registry configured as a proxy cache to ", configuration.Proxy.RemoteURL) + } + return app } @@ -447,7 +462,7 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { repository, app.eventBridge(context, r)) - context.Repository, err = applyRepoMiddleware(context.Repository, app.Config.Middleware["repository"]) + context.Repository, err = applyRepoMiddleware(context.Context, context.Repository, app.Config.Middleware["repository"]) if err != nil { ctxu.GetLogger(context).Errorf("error initializing repository middleware: %v", err) context.Errors = append(context.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) @@ -668,9 +683,9 @@ func appendCatalogAccessRecord(accessRecords []auth.Access, r *http.Request) []a } // applyRegistryMiddleware wraps a registry instance with the configured middlewares -func applyRegistryMiddleware(registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) { +func applyRegistryMiddleware(ctx context.Context, registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) { for _, mw := range middlewares { - rmw, err := registrymiddleware.Get(mw.Name, mw.Options, registry) + rmw, err := registrymiddleware.Get(ctx, mw.Name, mw.Options, registry) if err != nil { return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err) } @@ -681,9 +696,9 @@ func applyRegistryMiddleware(registry distribution.Namespace, middlewares []conf } // applyRepoMiddleware wraps a repository with the configured middlewares -func applyRepoMiddleware(repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) { +func applyRepoMiddleware(ctx context.Context, repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) { for _, mw := range middlewares { - rmw, err := repositorymiddleware.Get(mw.Name, mw.Options, repository) + rmw, err := repositorymiddleware.Get(ctx, mw.Name, mw.Options, repository) if err != nil { return nil, err } diff --git a/registry/handlers/app_test.go b/registry/handlers/app_test.go index 84d842e3d..6f597527f 100644 --- a/registry/handlers/app_test.go +++ b/registry/handlers/app_test.go @@ -31,7 +31,7 @@ func TestAppDispatcher(t *testing.T) { Context: ctx, router: v2.Router(), driver: driver, - registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), true, true), + registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), true, true, false), } server := httptest.NewServer(app) router := v2.Router() diff --git a/registry/middleware/registry/middleware.go b/registry/middleware/registry/middleware.go index 048603b87..7535c6db5 100644 --- a/registry/middleware/registry/middleware.go +++ b/registry/middleware/registry/middleware.go @@ -4,11 +4,12 @@ import ( "fmt" "github.com/docker/distribution" + "github.com/docker/distribution/context" ) // InitFunc is the type of a RegistryMiddleware factory function and is // used to register the constructor for different RegistryMiddleware backends. -type InitFunc func(registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) +type InitFunc func(ctx context.Context, registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) var middlewares map[string]InitFunc @@ -28,10 +29,10 @@ func Register(name string, initFunc InitFunc) error { } // Get constructs a RegistryMiddleware with the given options using the named backend. -func Get(name string, options map[string]interface{}, registry distribution.Namespace) (distribution.Namespace, error) { +func Get(ctx context.Context, name string, options map[string]interface{}, registry distribution.Namespace) (distribution.Namespace, error) { if middlewares != nil { if initFunc, exists := middlewares[name]; exists { - return initFunc(registry, options) + return initFunc(ctx, registry, options) } } diff --git a/registry/middleware/repository/middleware.go b/registry/middleware/repository/middleware.go index d6330fc40..27b42aecf 100644 --- a/registry/middleware/repository/middleware.go +++ b/registry/middleware/repository/middleware.go @@ -4,11 +4,12 @@ import ( "fmt" "github.com/docker/distribution" + "github.com/docker/distribution/context" ) // InitFunc is the type of a RepositoryMiddleware factory function and is // used to register the constructor for different RepositoryMiddleware backends. -type InitFunc func(repository distribution.Repository, options map[string]interface{}) (distribution.Repository, error) +type InitFunc func(ctx context.Context, repository distribution.Repository, options map[string]interface{}) (distribution.Repository, error) var middlewares map[string]InitFunc @@ -28,10 +29,10 @@ func Register(name string, initFunc InitFunc) error { } // Get constructs a RepositoryMiddleware with the given options using the named backend. -func Get(name string, options map[string]interface{}, repository distribution.Repository) (distribution.Repository, error) { +func Get(ctx context.Context, name string, options map[string]interface{}, repository distribution.Repository) (distribution.Repository, error) { if middlewares != nil { if initFunc, exists := middlewares[name]; exists { - return initFunc(repository, options) + return initFunc(ctx, repository, options) } } diff --git a/registry/proxy/proxyauth.go b/registry/proxy/proxyauth.go new file mode 100644 index 000000000..e4bec75a5 --- /dev/null +++ b/registry/proxy/proxyauth.go @@ -0,0 +1,54 @@ +package proxy + +import ( + "net/http" + "net/url" + + "github.com/docker/distribution/registry/client/auth" +) + +const tokenURL = "https://auth.docker.io/token" + +type userpass struct { + username string + password string +} + +type credentials struct { + creds map[string]userpass +} + +func (c credentials) Basic(u *url.URL) (string, string) { + up := c.creds[u.String()] + + return up.username, up.password +} + +// ConfigureAuth authorizes with the upstream registry +func ConfigureAuth(remoteURL, username, password string, cm auth.ChallengeManager) (auth.CredentialStore, error) { + if err := ping(cm, remoteURL+"/v2/", "Docker-Distribution-Api-Version"); err != nil { + return nil, err + } + + creds := map[string]userpass{ + tokenURL: { + username: username, + password: password, + }, + } + return credentials{creds: creds}, nil +} + +func ping(manager auth.ChallengeManager, endpoint, versionHeader string) error { + resp, err := http.Get(endpoint) + if err != nil { + return err + } + defer resp.Body.Close() + + if err := manager.AddResponse(resp); err != nil { + return err + } + + return nil +} diff --git a/registry/proxy/proxyblobstore.go b/registry/proxy/proxyblobstore.go new file mode 100644 index 000000000..b480a1112 --- /dev/null +++ b/registry/proxy/proxyblobstore.go @@ -0,0 +1,214 @@ +package proxy + +import ( + "io" + "net/http" + "strconv" + "sync" + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/proxy/scheduler" +) + +// todo(richardscothern): from cache control header or config file +const blobTTL = time.Duration(24 * 7 * time.Hour) + +type proxyBlobStore struct { + localStore distribution.BlobStore + remoteStore distribution.BlobService + scheduler *scheduler.TTLExpirationScheduler +} + +var _ distribution.BlobStore = proxyBlobStore{} + +type inflightBlob struct { + refCount int + bw distribution.BlobWriter +} + +// inflight tracks currently downloading blobs +var inflight = make(map[digest.Digest]*inflightBlob) + +// mu protects inflight +var mu sync.Mutex + +func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) { + w.Header().Set("Content-Length", strconv.FormatInt(length, 10)) + w.Header().Set("Content-Type", mediaType) + w.Header().Set("Docker-Content-Digest", digest.String()) + w.Header().Set("Etag", digest.String()) +} + +func (pbs proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { + desc, err := pbs.localStore.Stat(ctx, dgst) + if err != nil && err != distribution.ErrBlobUnknown { + return err + } + + if err == nil { + proxyMetrics.BlobPush(uint64(desc.Size)) + return pbs.localStore.ServeBlob(ctx, w, r, dgst) + } + + desc, err = pbs.remoteStore.Stat(ctx, dgst) + if err != nil { + return err + } + + remoteReader, err := pbs.remoteStore.Open(ctx, dgst) + if err != nil { + return err + } + + bw, isNew, cleanup, err := getOrCreateBlobWriter(ctx, pbs.localStore, desc) + if err != nil { + return err + } + defer cleanup() + + if isNew { + go func() { + err := streamToStorage(ctx, remoteReader, desc, bw) + if err != nil { + context.GetLogger(ctx).Error(err) + } + + proxyMetrics.BlobPull(uint64(desc.Size)) + }() + err := streamToClient(ctx, w, desc, bw) + if err != nil { + return err + } + + proxyMetrics.BlobPush(uint64(desc.Size)) + pbs.scheduler.AddBlob(dgst.String(), blobTTL) + return nil + } + + err = streamToClient(ctx, w, desc, bw) + if err != nil { + return err + } + proxyMetrics.BlobPush(uint64(desc.Size)) + return nil +} + +type cleanupFunc func() + +// getOrCreateBlobWriter will track which blobs are currently being downloaded and enable client requesting +// the same blob concurrently to read from the existing stream. +func getOrCreateBlobWriter(ctx context.Context, blobs distribution.BlobService, desc distribution.Descriptor) (distribution.BlobWriter, bool, cleanupFunc, error) { + mu.Lock() + defer mu.Unlock() + dgst := desc.Digest + + cleanup := func() { + mu.Lock() + defer mu.Unlock() + inflight[dgst].refCount-- + + if inflight[dgst].refCount == 0 { + defer delete(inflight, dgst) + _, err := inflight[dgst].bw.Commit(ctx, desc) + if err != nil { + // There is a narrow race here where Commit can be called while this blob's TTL is expiring + // and its being removed from storage. In that case, the client stream will continue + // uninterruped and the blob will be pulled through on the next request, so just log it + context.GetLogger(ctx).Errorf("Error committing blob: %q", err) + } + + } + } + + var bw distribution.BlobWriter + _, ok := inflight[dgst] + if ok { + bw = inflight[dgst].bw + inflight[dgst].refCount++ + return bw, false, cleanup, nil + } + + var err error + bw, err = blobs.Create(ctx) + if err != nil { + return nil, false, nil, err + } + + inflight[dgst] = &inflightBlob{refCount: 1, bw: bw} + return bw, true, cleanup, nil +} + +func streamToStorage(ctx context.Context, remoteReader distribution.ReadSeekCloser, desc distribution.Descriptor, bw distribution.BlobWriter) error { + _, err := io.CopyN(bw, remoteReader, desc.Size) + if err != nil { + return err + } + + return nil +} + +func streamToClient(ctx context.Context, w http.ResponseWriter, desc distribution.Descriptor, bw distribution.BlobWriter) error { + setResponseHeaders(w, desc.Size, desc.MediaType, desc.Digest) + + reader, err := bw.Reader() + if err != nil { + return err + } + defer reader.Close() + teeReader := io.TeeReader(reader, w) + buf := make([]byte, 32768, 32786) + var soFar int64 + for { + rd, err := teeReader.Read(buf) + if err == nil || err == io.EOF { + soFar += int64(rd) + if soFar < desc.Size { + // buffer underflow, keep trying + continue + } + return nil + } + return err + } +} + +func (pbs proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + desc, err := pbs.localStore.Stat(ctx, dgst) + if err == nil { + return desc, err + } + + if err != distribution.ErrBlobUnknown { + return distribution.Descriptor{}, err + } + + return pbs.remoteStore.Stat(ctx, dgst) +} + +// Unsupported functions +func (pbs proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { + return distribution.Descriptor{}, distribution.ErrUnsupported +} + +func (pbs proxyBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { + return nil, distribution.ErrUnsupported +} + +func (pbs proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { + return nil, distribution.ErrUnsupported +} + +func (pbs proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { + return nil, distribution.ErrUnsupported +} + +func (pbs proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { + return nil, distribution.ErrUnsupported +} + +func (pbs proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { + return distribution.ErrUnsupported +} diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go new file mode 100644 index 000000000..65d5f9228 --- /dev/null +++ b/registry/proxy/proxyblobstore_test.go @@ -0,0 +1,231 @@ +package proxy + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/proxy/scheduler" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache/memory" + "github.com/docker/distribution/registry/storage/driver/inmemory" +) + +type statsBlobStore struct { + stats map[string]int + blobs distribution.BlobStore +} + +func (sbs statsBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { + sbs.stats["put"]++ + return sbs.blobs.Put(ctx, mediaType, p) +} + +func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { + sbs.stats["get"]++ + return sbs.blobs.Get(ctx, dgst) +} + +func (sbs statsBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { + sbs.stats["create"]++ + return sbs.blobs.Create(ctx) +} + +func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { + sbs.stats["resume"]++ + return sbs.blobs.Resume(ctx, id) +} + +func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { + sbs.stats["open"]++ + return sbs.blobs.Open(ctx, dgst) +} + +func (sbs statsBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { + sbs.stats["serveblob"]++ + return sbs.blobs.ServeBlob(ctx, w, r, dgst) +} + +func (sbs statsBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + sbs.stats["stat"]++ + return sbs.blobs.Stat(ctx, dgst) +} + +func (sbs statsBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { + sbs.stats["delete"]++ + return sbs.blobs.Delete(ctx, dgst) +} + +type testEnv struct { + inRemote []distribution.Descriptor + store proxyBlobStore + ctx context.Context +} + +func (te testEnv) LocalStats() *map[string]int { + ls := te.store.localStore.(statsBlobStore).stats + return &ls +} + +func (te testEnv) RemoteStats() *map[string]int { + rs := te.store.remoteStore.(statsBlobStore).stats + return &rs +} + +// Populate remote store and record the digests +func makeTestEnv(t *testing.T, name string) testEnv { + ctx := context.Background() + + localRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, true) + localRepo, err := localRegistry.Repository(ctx, name) + if err != nil { + t.Fatalf("unexpected error getting repo: %v", err) + } + + truthRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, false, false) + truthRepo, err := truthRegistry.Repository(ctx, name) + if err != nil { + t.Fatalf("unexpected error getting repo: %v", err) + } + + truthBlobs := statsBlobStore{ + stats: make(map[string]int), + blobs: truthRepo.Blobs(ctx), + } + + localBlobs := statsBlobStore{ + stats: make(map[string]int), + blobs: localRepo.Blobs(ctx), + } + + s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json") + + proxyBlobStore := proxyBlobStore{ + remoteStore: truthBlobs, + localStore: localBlobs, + scheduler: s, + } + + te := testEnv{ + store: proxyBlobStore, + ctx: ctx, + } + return te +} + +func populate(t *testing.T, te *testEnv, blobCount int) { + var inRemote []distribution.Descriptor + for i := 0; i < blobCount; i++ { + bytes := []byte(fmt.Sprintf("blob%d", i)) + + desc, err := te.store.remoteStore.Put(te.ctx, "", bytes) + if err != nil { + t.Errorf("Put in store") + } + inRemote = append(inRemote, desc) + } + + te.inRemote = inRemote + +} + +func TestProxyStoreStat(t *testing.T) { + te := makeTestEnv(t, "foo/bar") + remoteBlobCount := 1 + populate(t, &te, remoteBlobCount) + + localStats := te.LocalStats() + remoteStats := te.RemoteStats() + + // Stat - touches both stores + for _, d := range te.inRemote { + _, err := te.store.Stat(te.ctx, d.Digest) + if err != nil { + t.Fatalf("Error stating proxy store") + } + } + + if (*localStats)["stat"] != remoteBlobCount { + t.Errorf("Unexpected local stat count") + } + + if (*remoteStats)["stat"] != remoteBlobCount { + t.Errorf("Unexpected remote stat count") + } +} + +func TestProxyStoreServe(t *testing.T) { + te := makeTestEnv(t, "foo/bar") + remoteBlobCount := 1 + populate(t, &te, remoteBlobCount) + + localStats := te.LocalStats() + remoteStats := te.RemoteStats() + + // Serveblob - pulls through blobs + for _, dr := range te.inRemote { + w := httptest.NewRecorder() + r, err := http.NewRequest("GET", "", nil) + if err != nil { + t.Fatal(err) + } + + err = te.store.ServeBlob(te.ctx, w, r, dr.Digest) + if err != nil { + t.Fatalf(err.Error()) + } + + dl, err := digest.FromBytes(w.Body.Bytes()) + if err != nil { + t.Fatalf("Error making digest from blob") + } + if dl != dr.Digest { + t.Errorf("Mismatching blob fetch from proxy") + } + } + + if (*localStats)["stat"] != remoteBlobCount && (*localStats)["create"] != remoteBlobCount { + t.Fatalf("unexpected local stats") + } + if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount { + t.Fatalf("unexpected local stats") + } + + // Serveblob - blobs come from local + for _, dr := range te.inRemote { + w := httptest.NewRecorder() + r, err := http.NewRequest("GET", "", nil) + if err != nil { + t.Fatal(err) + } + + err = te.store.ServeBlob(te.ctx, w, r, dr.Digest) + if err != nil { + t.Fatalf(err.Error()) + } + + dl, err := digest.FromBytes(w.Body.Bytes()) + if err != nil { + t.Fatalf("Error making digest from blob") + } + if dl != dr.Digest { + t.Errorf("Mismatching blob fetch from proxy") + } + } + + // Stat to find local, but no new blobs were created + if (*localStats)["stat"] != remoteBlobCount*2 && (*localStats)["create"] != remoteBlobCount*2 { + t.Fatalf("unexpected local stats") + } + + // Remote unchanged + if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount { + fmt.Printf("\tlocal=%#v, \n\tremote=%#v\n", localStats, remoteStats) + t.Fatalf("unexpected local stats") + } + +} diff --git a/registry/proxy/proxymanifeststore.go b/registry/proxy/proxymanifeststore.go new file mode 100644 index 000000000..5b79c8ce1 --- /dev/null +++ b/registry/proxy/proxymanifeststore.go @@ -0,0 +1,155 @@ +package proxy + +import ( + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/registry/api/v2" + "github.com/docker/distribution/registry/client" + "github.com/docker/distribution/registry/proxy/scheduler" +) + +// todo(richardscothern): from cache control header or config +const repositoryTTL = time.Duration(24 * 7 * time.Hour) + +type proxyManifestStore struct { + ctx context.Context + localManifests distribution.ManifestService + remoteManifests distribution.ManifestService + repositoryName string + scheduler *scheduler.TTLExpirationScheduler +} + +var _ distribution.ManifestService = &proxyManifestStore{} + +func (pms proxyManifestStore) Exists(dgst digest.Digest) (bool, error) { + exists, err := pms.localManifests.Exists(dgst) + if err != nil { + return false, err + } + if exists { + return true, nil + } + + return pms.remoteManifests.Exists(dgst) +} + +func (pms proxyManifestStore) Get(dgst digest.Digest) (*manifest.SignedManifest, error) { + sm, err := pms.localManifests.Get(dgst) + if err == nil { + proxyMetrics.ManifestPush(uint64(len(sm.Raw))) + return sm, err + } + + sm, err = pms.remoteManifests.Get(dgst) + if err != nil { + return nil, err + } + + proxyMetrics.ManifestPull(uint64(len(sm.Raw))) + err = pms.localManifests.Put(sm) + if err != nil { + return nil, err + } + + // Schedule the repo for removal + pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL) + + // Ensure the manifest blob is cleaned up + pms.scheduler.AddBlob(dgst.String(), repositoryTTL) + + proxyMetrics.ManifestPush(uint64(len(sm.Raw))) + + return sm, err +} + +func (pms proxyManifestStore) Tags() ([]string, error) { + return pms.localManifests.Tags() +} + +func (pms proxyManifestStore) ExistsByTag(tag string) (bool, error) { + exists, err := pms.localManifests.ExistsByTag(tag) + if err != nil { + return false, err + } + if exists { + return true, nil + } + + return pms.remoteManifests.ExistsByTag(tag) +} + +func (pms proxyManifestStore) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*manifest.SignedManifest, error) { + var localDigest digest.Digest + + localManifest, err := pms.localManifests.GetByTag(tag, options...) + switch err.(type) { + case distribution.ErrManifestUnknown, distribution.ErrManifestUnknownRevision: + goto fromremote + case nil: + break + default: + return nil, err + } + + localDigest, err = manifestDigest(localManifest) + if err != nil { + return nil, err + } + +fromremote: + var sm *manifest.SignedManifest + sm, err = pms.remoteManifests.GetByTag(tag, client.AddEtagToTag(tag, localDigest.String())) + if err != nil { + return nil, err + } + + if sm == nil { + context.GetLogger(pms.ctx).Debugf("Local manifest for %q is latest, dgst=%s", tag, localDigest.String()) + return localManifest, nil + } + context.GetLogger(pms.ctx).Debugf("Updated manifest for %q, dgst=%s", tag, localDigest.String()) + + err = pms.localManifests.Put(sm) + if err != nil { + return nil, err + } + + dgst, err := manifestDigest(sm) + if err != nil { + return nil, err + } + pms.scheduler.AddBlob(dgst.String(), repositoryTTL) + pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL) + + proxyMetrics.ManifestPull(uint64(len(sm.Raw))) + proxyMetrics.ManifestPush(uint64(len(sm.Raw))) + + return sm, err +} + +func manifestDigest(sm *manifest.SignedManifest) (digest.Digest, error) { + payload, err := sm.Payload() + if err != nil { + return "", err + + } + + dgst, err := digest.FromBytes(payload) + if err != nil { + return "", err + } + + return dgst, nil +} + +func (pms proxyManifestStore) Put(manifest *manifest.SignedManifest) error { + return v2.ErrorCodeUnsupported +} + +func (pms proxyManifestStore) Delete(dgst digest.Digest) error { + return v2.ErrorCodeUnsupported +} diff --git a/registry/proxy/proxymanifeststore_test.go b/registry/proxy/proxymanifeststore_test.go new file mode 100644 index 000000000..7b9b8091c --- /dev/null +++ b/registry/proxy/proxymanifeststore_test.go @@ -0,0 +1,235 @@ +package proxy + +import ( + "io" + "testing" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/registry/proxy/scheduler" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache/memory" + "github.com/docker/distribution/registry/storage/driver/inmemory" + "github.com/docker/distribution/testutil" + "github.com/docker/libtrust" +) + +type statsManifest struct { + manifests distribution.ManifestService + stats map[string]int +} + +type manifestStoreTestEnv struct { + manifestDigest digest.Digest // digest of the signed manifest in the local storage + manifests proxyManifestStore +} + +func (te manifestStoreTestEnv) LocalStats() *map[string]int { + ls := te.manifests.localManifests.(statsManifest).stats + return &ls +} + +func (te manifestStoreTestEnv) RemoteStats() *map[string]int { + rs := te.manifests.remoteManifests.(statsManifest).stats + return &rs +} + +func (sm statsManifest) Delete(dgst digest.Digest) error { + sm.stats["delete"]++ + return sm.manifests.Delete(dgst) +} + +func (sm statsManifest) Exists(dgst digest.Digest) (bool, error) { + sm.stats["exists"]++ + return sm.manifests.Exists(dgst) +} + +func (sm statsManifest) ExistsByTag(tag string) (bool, error) { + sm.stats["existbytag"]++ + return sm.manifests.ExistsByTag(tag) +} + +func (sm statsManifest) Get(dgst digest.Digest) (*manifest.SignedManifest, error) { + sm.stats["get"]++ + return sm.manifests.Get(dgst) +} + +func (sm statsManifest) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*manifest.SignedManifest, error) { + sm.stats["getbytag"]++ + return sm.manifests.GetByTag(tag, options...) +} + +func (sm statsManifest) Put(manifest *manifest.SignedManifest) error { + sm.stats["put"]++ + return sm.manifests.Put(manifest) +} + +func (sm statsManifest) Tags() ([]string, error) { + sm.stats["tags"]++ + return sm.manifests.Tags() +} + +func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv { + ctx := context.Background() + truthRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, false, false) + truthRepo, err := truthRegistry.Repository(ctx, name) + if err != nil { + t.Fatalf("unexpected error getting repo: %v", err) + } + tr, err := truthRepo.Manifests(ctx) + if err != nil { + t.Fatal(err.Error()) + } + truthManifests := statsManifest{ + manifests: tr, + stats: make(map[string]int), + } + + manifestDigest, err := populateRepo(t, ctx, truthRepo, name, tag) + if err != nil { + t.Fatalf(err.Error()) + } + + localRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, true) + localRepo, err := localRegistry.Repository(ctx, name) + if err != nil { + t.Fatalf("unexpected error getting repo: %v", err) + } + lr, err := localRepo.Manifests(ctx) + if err != nil { + t.Fatal(err.Error()) + } + + localManifests := statsManifest{ + manifests: lr, + stats: make(map[string]int), + } + + s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json") + return &manifestStoreTestEnv{ + manifestDigest: manifestDigest, + manifests: proxyManifestStore{ + ctx: ctx, + localManifests: localManifests, + remoteManifests: truthManifests, + scheduler: s, + }, + } +} + +func populateRepo(t *testing.T, ctx context.Context, repository distribution.Repository, name, tag string) (digest.Digest, error) { + m := manifest.Manifest{ + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + Name: name, + Tag: tag, + } + + for i := 0; i < 2; i++ { + wr, err := repository.Blobs(ctx).Create(ctx) + if err != nil { + t.Fatalf("unexpected error creating test upload: %v", err) + } + + rs, ts, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("unexpected error generating test layer file") + } + dgst := digest.Digest(ts) + if _, err := io.Copy(wr, rs); err != nil { + t.Fatalf("unexpected error copying to upload: %v", err) + } + + if _, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}); err != nil { + t.Fatalf("unexpected error finishing upload: %v", err) + } + } + + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + t.Fatalf("unexpected error generating private key: %v", err) + } + + sm, err := manifest.Sign(&m, pk) + if err != nil { + t.Fatalf("error signing manifest: %v", err) + } + + ms, err := repository.Manifests(ctx) + if err != nil { + t.Fatalf(err.Error()) + } + ms.Put(sm) + if err != nil { + t.Fatalf("unexpected errors putting manifest: %v", err) + } + pl, err := sm.Payload() + if err != nil { + t.Fatal(err) + } + return digest.FromBytes(pl) +} + +// TestProxyManifests contains basic acceptance tests +// for the pull-through behavior +func TestProxyManifests(t *testing.T) { + name := "foo/bar" + env := newManifestStoreTestEnv(t, name, "latest") + + localStats := env.LocalStats() + remoteStats := env.RemoteStats() + + // Stat - must check local and remote + exists, err := env.manifests.ExistsByTag("latest") + if err != nil { + t.Fatalf("Error checking existance") + } + if !exists { + t.Errorf("Unexpected non-existant manifest") + } + + if (*localStats)["existbytag"] != 1 && (*remoteStats)["existbytag"] != 1 { + t.Errorf("Unexpected exists count") + } + + // Get - should succeed and pull manifest into local + _, err = env.manifests.Get(env.manifestDigest) + if err != nil { + t.Fatal(err) + } + if (*localStats)["get"] != 1 && (*remoteStats)["get"] != 1 { + t.Errorf("Unexpected get count") + } + + if (*localStats)["put"] != 1 { + t.Errorf("Expected local put") + } + + // Stat - should only go to local + exists, err = env.manifests.ExistsByTag("latest") + if err != nil { + t.Fatal(err) + } + if !exists { + t.Errorf("Unexpected non-existant manifest") + } + + if (*localStats)["existbytag"] != 2 && (*remoteStats)["existbytag"] != 1 { + t.Errorf("Unexpected exists count") + + } + + // Get - should get from remote, to test freshness + _, err = env.manifests.Get(env.manifestDigest) + if err != nil { + t.Fatal(err) + } + + if (*remoteStats)["get"] != 2 && (*remoteStats)["existsbytag"] != 1 && (*localStats)["put"] != 1 { + t.Errorf("Unexpected get count") + } + +} diff --git a/registry/proxy/proxymetrics.go b/registry/proxy/proxymetrics.go new file mode 100644 index 000000000..d3d84d786 --- /dev/null +++ b/registry/proxy/proxymetrics.go @@ -0,0 +1,74 @@ +package proxy + +import ( + "expvar" + "sync/atomic" +) + +// Metrics is used to hold metric counters +// related to the proxy +type Metrics struct { + Requests uint64 + Hits uint64 + Misses uint64 + BytesPulled uint64 + BytesPushed uint64 +} + +type proxyMetricsCollector struct { + blobMetrics Metrics + manifestMetrics Metrics +} + +// BlobPull tracks metrics about blobs pulled into the cache +func (pmc *proxyMetricsCollector) BlobPull(bytesPulled uint64) { + atomic.AddUint64(&pmc.blobMetrics.Misses, 1) + atomic.AddUint64(&pmc.blobMetrics.BytesPulled, bytesPulled) +} + +// BlobPush tracks metrics about blobs pushed to clients +func (pmc *proxyMetricsCollector) BlobPush(bytesPushed uint64) { + atomic.AddUint64(&pmc.blobMetrics.Requests, 1) + atomic.AddUint64(&pmc.blobMetrics.Hits, 1) + atomic.AddUint64(&pmc.blobMetrics.BytesPushed, bytesPushed) +} + +// ManifestPull tracks metrics related to Manifests pulled into the cache +func (pmc *proxyMetricsCollector) ManifestPull(bytesPulled uint64) { + atomic.AddUint64(&pmc.manifestMetrics.Misses, 1) + atomic.AddUint64(&pmc.manifestMetrics.BytesPulled, bytesPulled) +} + +// ManifestPush tracks metrics about manifests pushed to clients +func (pmc *proxyMetricsCollector) ManifestPush(bytesPushed uint64) { + atomic.AddUint64(&pmc.manifestMetrics.Requests, 1) + atomic.AddUint64(&pmc.manifestMetrics.Hits, 1) + atomic.AddUint64(&pmc.manifestMetrics.BytesPushed, bytesPushed) +} + +// proxyMetrics tracks metrics about the proxy cache. This is +// kept globally and made available via expvar. +var proxyMetrics = &proxyMetricsCollector{} + +func init() { + registry := expvar.Get("registry") + if registry == nil { + registry = expvar.NewMap("registry") + } + + pm := registry.(*expvar.Map).Get("proxy") + if pm == nil { + pm = &expvar.Map{} + pm.(*expvar.Map).Init() + registry.(*expvar.Map).Set("proxy", pm) + } + + pm.(*expvar.Map).Set("blobs", expvar.Func(func() interface{} { + return proxyMetrics.blobMetrics + })) + + pm.(*expvar.Map).Set("manifests", expvar.Func(func() interface{} { + return proxyMetrics.manifestMetrics + })) + +} diff --git a/registry/proxy/proxyregistry.go b/registry/proxy/proxyregistry.go new file mode 100644 index 000000000..e9dec2f70 --- /dev/null +++ b/registry/proxy/proxyregistry.go @@ -0,0 +1,139 @@ +package proxy + +import ( + "net/http" + "net/url" + + "github.com/docker/distribution" + "github.com/docker/distribution/configuration" + "github.com/docker/distribution/context" + "github.com/docker/distribution/registry/client" + "github.com/docker/distribution/registry/client/auth" + "github.com/docker/distribution/registry/client/transport" + "github.com/docker/distribution/registry/proxy/scheduler" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/driver" +) + +// proxyingRegistry fetches content from a remote registry and caches it locally +type proxyingRegistry struct { + embedded distribution.Namespace // provides local registry functionality + + scheduler *scheduler.TTLExpirationScheduler + + remoteURL string + credentialStore auth.CredentialStore + challengeManager auth.ChallengeManager +} + +// NewRegistryPullThroughCache creates a registry acting as a pull through cache +func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) { + _, err := url.Parse(config.RemoteURL) + if err != nil { + return nil, err + } + + v := storage.NewVacuum(ctx, driver) + + s := scheduler.New(ctx, driver, "/scheduler-state.json") + s.OnBlobExpire(func(digest string) error { + return v.RemoveBlob(digest) + }) + s.OnManifestExpire(func(repoName string) error { + return v.RemoveRepository(repoName) + }) + err = s.Start() + if err != nil { + return nil, err + } + + challengeManager := auth.NewSimpleChallengeManager() + cs, err := ConfigureAuth(config.RemoteURL, config.Username, config.Password, challengeManager) + if err != nil { + return nil, err + } + + return &proxyingRegistry{ + embedded: registry, + scheduler: s, + challengeManager: challengeManager, + credentialStore: cs, + remoteURL: config.RemoteURL, + }, nil +} + +func (pr *proxyingRegistry) Scope() distribution.Scope { + return distribution.GlobalScope +} + +func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) { + return pr.embedded.Repositories(ctx, repos, last) +} + +func (pr *proxyingRegistry) Repository(ctx context.Context, name string) (distribution.Repository, error) { + tr := transport.NewTransport(http.DefaultTransport, + auth.NewAuthorizer(pr.challengeManager, auth.NewTokenHandler(http.DefaultTransport, pr.credentialStore, name, "pull"))) + + localRepo, err := pr.embedded.Repository(ctx, name) + if err != nil { + return nil, err + } + localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification) + if err != nil { + return nil, err + } + + remoteRepo, err := client.NewRepository(ctx, name, pr.remoteURL, tr) + if err != nil { + return nil, err + } + + remoteManifests, err := remoteRepo.Manifests(ctx) + if err != nil { + return nil, err + } + + return &proxiedRepository{ + blobStore: proxyBlobStore{ + localStore: localRepo.Blobs(ctx), + remoteStore: remoteRepo.Blobs(ctx), + scheduler: pr.scheduler, + }, + manifests: proxyManifestStore{ + repositoryName: name, + localManifests: localManifests, // Options? + remoteManifests: remoteManifests, + ctx: ctx, + scheduler: pr.scheduler, + }, + name: name, + signatures: localRepo.Signatures(), + }, nil +} + +// proxiedRepository uses proxying blob and manifest services to serve content +// locally, or pulling it through from a remote and caching it locally if it doesn't +// already exist +type proxiedRepository struct { + blobStore distribution.BlobStore + manifests distribution.ManifestService + name string + signatures distribution.SignatureService +} + +func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { + // options + return pr.manifests, nil +} + +func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore { + return pr.blobStore +} + +func (pr *proxiedRepository) Name() string { + return pr.name +} + +func (pr *proxiedRepository) Signatures() distribution.SignatureService { + return pr.signatures +} diff --git a/registry/proxy/scheduler/scheduler.go b/registry/proxy/scheduler/scheduler.go new file mode 100644 index 000000000..056b148ad --- /dev/null +++ b/registry/proxy/scheduler/scheduler.go @@ -0,0 +1,250 @@ +package scheduler + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/docker/distribution/context" + "github.com/docker/distribution/registry/storage/driver" +) + +// onTTLExpiryFunc is called when a repositories' TTL expires +type expiryFunc func(string) error + +const ( + entryTypeBlob = iota + entryTypeManifest +) + +// schedulerEntry represents an entry in the scheduler +// fields are exported for serialization +type schedulerEntry struct { + Key string `json:"Key"` + Expiry time.Time `json:"ExpiryData"` + EntryType int `json:"EntryType"` +} + +// New returns a new instance of the scheduler +func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler { + return &TTLExpirationScheduler{ + entries: make(map[string]schedulerEntry), + addChan: make(chan schedulerEntry), + stopChan: make(chan bool), + driver: driver, + pathToStateFile: path, + ctx: ctx, + stopped: true, + } +} + +// TTLExpirationScheduler is a scheduler used to perform actions +// when TTLs expire +type TTLExpirationScheduler struct { + entries map[string]schedulerEntry + addChan chan schedulerEntry + stopChan chan bool + + driver driver.StorageDriver + ctx context.Context + pathToStateFile string + + stopped bool + + onBlobExpire expiryFunc + onManifestExpire expiryFunc +} + +// addChan allows more TTLs to be pushed to the scheduler +type addChan chan schedulerEntry + +// stopChan allows the scheduler to be stopped - used for testing. +type stopChan chan bool + +// OnBlobExpire is called when a scheduled blob's TTL expires +func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) { + ttles.onBlobExpire = f +} + +// OnManifestExpire is called when a scheduled manifest's TTL expires +func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) { + ttles.onManifestExpire = f +} + +// AddBlob schedules a blob cleanup after ttl expires +func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error { + if ttles.stopped { + return fmt.Errorf("scheduler not started") + } + ttles.add(dgst, ttl, entryTypeBlob) + return nil +} + +// AddManifest schedules a manifest cleanup after ttl expires +func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error { + if ttles.stopped { + return fmt.Errorf("scheduler not started") + } + + ttles.add(repoName, ttl, entryTypeManifest) + return nil +} + +// Start starts the scheduler +func (ttles *TTLExpirationScheduler) Start() error { + return ttles.start() +} + +func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) { + entry := schedulerEntry{ + Key: key, + Expiry: time.Now().Add(ttl), + EntryType: eType, + } + ttles.addChan <- entry +} + +func (ttles *TTLExpirationScheduler) stop() { + ttles.stopChan <- true +} + +func (ttles *TTLExpirationScheduler) start() error { + err := ttles.readState() + if err != nil { + return err + } + + if !ttles.stopped { + return fmt.Errorf("Scheduler already started") + } + + context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...") + ttles.stopped = false + go ttles.mainloop() + + return nil +} + +// mainloop uses a select statement to listen for events. Most of its time +// is spent in waiting on a TTL to expire but can be interrupted when TTLs +// are added. +func (ttles *TTLExpirationScheduler) mainloop() { + for { + if ttles.stopped { + return + } + + nextEntry, ttl := nextExpiringEntry(ttles.entries) + if len(ttles.entries) == 0 { + context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Nothing to do, sleeping...") + } else { + context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Sleeping for %s until cleanup of %s", ttl, nextEntry.Key) + } + + select { + case <-time.After(ttl): + var f expiryFunc + + switch nextEntry.EntryType { + case entryTypeBlob: + f = ttles.onBlobExpire + case entryTypeManifest: + f = ttles.onManifestExpire + default: + f = func(repoName string) error { + return fmt.Errorf("Unexpected scheduler entry type") + } + } + + if err := f(nextEntry.Key); err != nil { + context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", nextEntry.Key, err) + } + + delete(ttles.entries, nextEntry.Key) + if err := ttles.writeState(); err != nil { + context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) + } + case entry := <-ttles.addChan: + context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now())) + ttles.entries[entry.Key] = entry + if err := ttles.writeState(); err != nil { + context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) + } + break + + case <-ttles.stopChan: + if err := ttles.writeState(); err != nil { + context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) + } + ttles.stopped = true + } + } +} + +func nextExpiringEntry(entries map[string]schedulerEntry) (*schedulerEntry, time.Duration) { + if len(entries) == 0 { + return nil, 24 * time.Hour + } + + // todo:(richardscothern) this is a primitive o(n) algorithm + // but n will never be *that* big and it's all in memory. Investigate + // time.AfterFunc for heap based expiries + + first := true + var nextEntry schedulerEntry + for _, entry := range entries { + if first { + nextEntry = entry + first = false + continue + } + if entry.Expiry.Before(nextEntry.Expiry) { + nextEntry = entry + } + } + + // Dates may be from the past if the scheduler has + // been restarted, set their ttl to 0 + if nextEntry.Expiry.Before(time.Now()) { + nextEntry.Expiry = time.Now() + return &nextEntry, 0 + } + + return &nextEntry, nextEntry.Expiry.Sub(time.Now()) +} + +func (ttles *TTLExpirationScheduler) writeState() error { + jsonBytes, err := json.Marshal(ttles.entries) + if err != nil { + return err + } + + err = ttles.driver.PutContent(ttles.ctx, ttles.pathToStateFile, jsonBytes) + if err != nil { + return err + } + return nil +} + +func (ttles *TTLExpirationScheduler) readState() error { + if _, err := ttles.driver.Stat(ttles.ctx, ttles.pathToStateFile); err != nil { + switch err := err.(type) { + case driver.PathNotFoundError: + return nil + default: + return err + } + } + + bytes, err := ttles.driver.GetContent(ttles.ctx, ttles.pathToStateFile) + if err != nil { + return err + } + + err = json.Unmarshal(bytes, &ttles.entries) + if err != nil { + return err + } + + return nil +} diff --git a/registry/proxy/scheduler/scheduler_test.go b/registry/proxy/scheduler/scheduler_test.go new file mode 100644 index 000000000..fb5479f01 --- /dev/null +++ b/registry/proxy/scheduler/scheduler_test.go @@ -0,0 +1,165 @@ +package scheduler + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/docker/distribution/context" + "github.com/docker/distribution/registry/storage/driver/inmemory" +) + +func TestSchedule(t *testing.T) { + timeUnit := time.Millisecond + remainingRepos := map[string]bool{ + "testBlob1": true, + "testBlob2": true, + "ch00": true, + } + + s := New(context.Background(), inmemory.New(), "/ttl") + deleteFunc := func(repoName string) error { + if len(remainingRepos) == 0 { + t.Fatalf("Incorrect expiry count") + } + _, ok := remainingRepos[repoName] + if !ok { + t.Fatalf("Trying to remove nonexistant repo: %s", repoName) + } + fmt.Println("removing", repoName) + delete(remainingRepos, repoName) + + return nil + } + s.onBlobExpire = deleteFunc + err := s.start() + if err != nil { + t.Fatalf("Error starting ttlExpirationScheduler: %s", err) + } + + s.add("testBlob1", 3*timeUnit, entryTypeBlob) + s.add("testBlob2", 1*timeUnit, entryTypeBlob) + + func() { + s.add("ch00", 1*timeUnit, entryTypeBlob) + + }() + + // Ensure all repos are deleted + <-time.After(50 * timeUnit) + if len(remainingRepos) != 0 { + t.Fatalf("Repositories remaining: %#v", remainingRepos) + } +} + +func TestRestoreOld(t *testing.T) { + remainingRepos := map[string]bool{ + "testBlob1": true, + "oldRepo": true, + } + + deleteFunc := func(repoName string) error { + if repoName == "oldRepo" && len(remainingRepos) == 3 { + t.Errorf("oldRepo should be removed first") + } + _, ok := remainingRepos[repoName] + if !ok { + t.Fatalf("Trying to remove nonexistant repo: %s", repoName) + } + delete(remainingRepos, repoName) + return nil + } + + timeUnit := time.Millisecond + serialized, err := json.Marshal(&map[string]schedulerEntry{ + "testBlob1": { + Expiry: time.Now().Add(1 * timeUnit), + Key: "testBlob1", + EntryType: 0, + }, + "oldRepo": { + Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first + Key: "oldRepo", + EntryType: 0, + }, + }) + if err != nil { + t.Fatalf("Error serializing test data: %s", err.Error()) + } + + ctx := context.Background() + pathToStatFile := "/ttl" + fs := inmemory.New() + err = fs.PutContent(ctx, pathToStatFile, serialized) + if err != nil { + t.Fatal("Unable to write serialized data to fs") + } + s := New(context.Background(), fs, "/ttl") + s.onBlobExpire = deleteFunc + err = s.start() + if err != nil { + t.Fatalf("Error starting ttlExpirationScheduler: %s", err) + } + + <-time.After(50 * timeUnit) + if len(remainingRepos) != 0 { + t.Fatalf("Repositories remaining: %#v", remainingRepos) + } +} + +func TestStopRestore(t *testing.T) { + timeUnit := time.Millisecond + remainingRepos := map[string]bool{ + "testBlob1": true, + "testBlob2": true, + } + deleteFunc := func(repoName string) error { + delete(remainingRepos, repoName) + return nil + } + + fs := inmemory.New() + pathToStateFile := "/ttl" + s := New(context.Background(), fs, pathToStateFile) + s.onBlobExpire = deleteFunc + + err := s.start() + if err != nil { + t.Fatalf(err.Error()) + } + s.add("testBlob1", 300*timeUnit, entryTypeBlob) + s.add("testBlob2", 100*timeUnit, entryTypeBlob) + + // Start and stop before all operations complete + // state will be written to fs + s.stop() + time.Sleep(10 * time.Millisecond) + + // v2 will restore state from fs + s2 := New(context.Background(), fs, pathToStateFile) + s2.onBlobExpire = deleteFunc + err = s2.start() + if err != nil { + t.Fatalf("Error starting v2: %s", err.Error()) + } + + <-time.After(500 * timeUnit) + if len(remainingRepos) != 0 { + t.Fatalf("Repositories remaining: %#v", remainingRepos) + } + +} + +func TestDoubleStart(t *testing.T) { + s := New(context.Background(), inmemory.New(), "/ttl") + err := s.start() + if err != nil { + t.Fatalf("Unable to start scheduler") + } + fmt.Printf("%#v", s) + err = s.start() + if err == nil { + t.Fatalf("Scheduler started twice without error") + } +} diff --git a/registry/storage/blob_test.go b/registry/storage/blob_test.go index 7719bab17..a0020ed8d 100644 --- a/registry/storage/blob_test.go +++ b/registry/storage/blob_test.go @@ -33,7 +33,7 @@ func TestSimpleBlobUpload(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) + registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -193,7 +193,7 @@ func TestSimpleBlobUpload(t *testing.T) { } // Reuse state to test delete with a delete-disabled registry - registry = NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true) + registry = NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false) repository, err = registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -212,7 +212,7 @@ func TestSimpleBlobRead(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) + registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -316,7 +316,7 @@ func TestLayerUploadZeroLength(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) + registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) diff --git a/registry/storage/blobwriter.go b/registry/storage/blobwriter.go index 50da7699d..2142c37fd 100644 --- a/registry/storage/blobwriter.go +++ b/registry/storage/blobwriter.go @@ -31,6 +31,8 @@ type blobWriter struct { // implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy // LayerUpload Interface bufferedFileWriter + + resumableDigestEnabled bool } var _ distribution.BlobWriter = &blobWriter{} @@ -349,3 +351,29 @@ func (bw *blobWriter) removeResources(ctx context.Context) error { return nil } + +func (bw *blobWriter) Reader() (io.ReadCloser, error) { + // todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4 + try := 1 + for try <= 5 { + _, err := bw.bufferedFileWriter.driver.Stat(bw.ctx, bw.path) + if err == nil { + break + } + switch err.(type) { + case storagedriver.PathNotFoundError: + context.GetLogger(bw.ctx).Debugf("Nothing found on try %d, sleeping...", try) + time.Sleep(1 * time.Second) + try++ + default: + return nil, err + } + } + + readCloser, err := bw.bufferedFileWriter.driver.ReadStream(bw.ctx, bw.path, 0) + if err != nil { + return nil, err + } + + return readCloser, nil +} diff --git a/registry/storage/blobwriter_resumable.go b/registry/storage/blobwriter_resumable.go index c2ab21239..a26ac2cce 100644 --- a/registry/storage/blobwriter_resumable.go +++ b/registry/storage/blobwriter_resumable.go @@ -24,6 +24,10 @@ import ( // offset. Any unhashed bytes remaining less than the given offset are hashed // from the content uploaded so far. func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error { + if !bw.resumableDigestEnabled { + return errResumableDigestNotAvailable + } + if offset < 0 { return fmt.Errorf("cannot resume hash at negative offset: %d", offset) } @@ -143,6 +147,10 @@ func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry } func (bw *blobWriter) storeHashState(ctx context.Context) error { + if !bw.resumableDigestEnabled { + return errResumableDigestNotAvailable + } + h, ok := bw.digester.Hash().(resumable.Hash) if !ok { return errResumableDigestNotAvailable diff --git a/registry/storage/catalog_test.go b/registry/storage/catalog_test.go index 862777aae..1a1dbac58 100644 --- a/registry/storage/catalog_test.go +++ b/registry/storage/catalog_test.go @@ -22,7 +22,7 @@ func setupFS(t *testing.T) *setupEnv { d := inmemory.New() c := []byte("") ctx := context.Background() - registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true) + registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false) rootpath, _ := defaultPathMapper.path(repositoriesRootPathSpec{}) repos := []string{ diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go index e7a98bbbc..2ba62a958 100644 --- a/registry/storage/linkedblobstore.go +++ b/registry/storage/linkedblobstore.go @@ -16,11 +16,12 @@ import ( // that grant access to the global blob store. type linkedBlobStore struct { *blobStore - blobServer distribution.BlobServer - blobAccessController distribution.BlobDescriptorService - repository distribution.Repository - ctx context.Context // only to be used where context can't come through method args - deleteEnabled bool + blobServer distribution.BlobServer + blobAccessController distribution.BlobDescriptorService + repository distribution.Repository + ctx context.Context // only to be used where context can't come through method args + deleteEnabled bool + resumableDigestEnabled bool // linkPath allows one to control the repository blob link set to which // the blob store dispatches. This is required because manifest and layer @@ -189,11 +190,12 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string } bw := &blobWriter{ - blobStore: lbs, - id: uuid, - startedAt: startedAt, - digester: digest.Canonical.New(), - bufferedFileWriter: *fw, + blobStore: lbs, + id: uuid, + startedAt: startedAt, + digester: digest.Canonical.New(), + bufferedFileWriter: *fw, + resumableDigestEnabled: lbs.resumableDigestEnabled, } return bw, nil diff --git a/registry/storage/manifeststore_test.go b/registry/storage/manifeststore_test.go index 5bbbd4a2c..a4ce9149f 100644 --- a/registry/storage/manifeststore_test.go +++ b/registry/storage/manifeststore_test.go @@ -29,7 +29,7 @@ type manifestStoreTestEnv struct { func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv { ctx := context.Background() driver := inmemory.New() - registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) + registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false) repo, err := registry.Repository(ctx, name) if err != nil { @@ -348,7 +348,7 @@ func TestManifestStorage(t *testing.T) { t.Errorf("Deleted manifest get returned non-nil") } - r := NewRegistryWithDriver(ctx, env.driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true) + r := NewRegistryWithDriver(ctx, env.driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false) repo, err := r.Repository(ctx, env.name) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) diff --git a/registry/storage/registry.go b/registry/storage/registry.go index 8149be115..c5058b801 100644 --- a/registry/storage/registry.go +++ b/registry/storage/registry.go @@ -16,6 +16,7 @@ type registry struct { statter distribution.BlobStatter // global statter service. blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider deleteEnabled bool + resumableDigestEnabled bool } // NewRegistryWithDriver creates a new registry instance from the provided @@ -23,9 +24,9 @@ type registry struct { // cheap to allocate. If redirect is true, the backend blob server will // attempt to use (StorageDriver).URLFor to serve all blobs. // -// TODO(stevvooe): This function signature is getting out of hand. Move to +// TODO(stevvooe): This function signature is getting very out of hand. Move to // functional options for instance configuration. -func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider, deleteEnabled bool, redirect bool) distribution.Namespace { +func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider, deleteEnabled bool, redirect bool, isCache bool) distribution.Namespace { // create global statter, with cache. var statter distribution.BlobDescriptorService = &blobStatter{ driver: driver, @@ -52,6 +53,7 @@ func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriv }, blobDescriptorCacheProvider: blobDescriptorCacheProvider, deleteEnabled: deleteEnabled, + resumableDigestEnabled: !isCache, } } diff --git a/registry/storage/vacuum.go b/registry/storage/vacuum.go new file mode 100644 index 000000000..46b8096b3 --- /dev/null +++ b/registry/storage/vacuum.go @@ -0,0 +1,67 @@ +package storage + +import ( + "path" + + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/storage/driver" +) + +// vacuum contains functions for cleaning up repositories and blobs +// These functions will only reliably work on strongly consistent +// storage systems. +// https://en.wikipedia.org/wiki/Consistency_model + +// NewVacuum creates a new Vacuum +func NewVacuum(ctx context.Context, driver driver.StorageDriver) Vacuum { + return Vacuum{ + ctx: ctx, + driver: driver, + pm: defaultPathMapper, + } +} + +// Vacuum removes content from the filesystem +type Vacuum struct { + pm *pathMapper + driver driver.StorageDriver + ctx context.Context +} + +// RemoveBlob removes a blob from the filesystem +func (v Vacuum) RemoveBlob(dgst string) error { + d, err := digest.ParseDigest(dgst) + if err != nil { + return err + } + + blobPath, err := v.pm.path(blobDataPathSpec{digest: d}) + if err != nil { + return err + } + context.GetLogger(v.ctx).Infof("Deleting blob: %s", blobPath) + err = v.driver.Delete(v.ctx, blobPath) + if err != nil { + return err + } + + return nil +} + +// RemoveRepository removes a repository directory from the +// filesystem +func (v Vacuum) RemoveRepository(repoName string) error { + rootForRepository, err := v.pm.path(repositoriesRootPathSpec{}) + if err != nil { + return err + } + repoDir := path.Join(rootForRepository, repoName) + context.GetLogger(v.ctx).Infof("Deleting repo: %s", repoDir) + err = v.driver.Delete(v.ctx, repoDir) + if err != nil { + return err + } + + return nil +} diff --git a/testutil/handler.go b/testutil/handler.go index cf5c1b7e2..00cd8a6ac 100644 --- a/testutil/handler.go +++ b/testutil/handler.go @@ -122,7 +122,7 @@ func (app *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Add headers of interest here for k, v := range r.Header { - if k == "Etag" { + if k == "If-None-Match" { request.Headers[k] = v } }