Merge pull request #779 from RichardScothern/pull-through-cache

Add pull through cache ability to the Registry.
This commit is contained in:
Stephen Day 2015-08-04 17:04:56 -07:00
commit 68c0706bac
24 changed files with 1682 additions and 38 deletions

View file

@ -25,6 +25,10 @@ type httpBlobUpload struct {
closed bool
}
func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) {
panic("Not implemented")
}
func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
if resp.StatusCode == http.StatusNotFound {
return distribution.ErrBlobUploadUnknown

View file

@ -280,14 +280,13 @@ func (ms *manifests) GetByTag(tag string, options ...distribution.ManifestServic
}
if _, ok := ms.etags[tag]; ok {
req.Header.Set("eTag", ms.etags[tag])
req.Header.Set("If-None-Match", ms.etags[tag])
}
resp, err := ms.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotModified {
return nil, nil
} else if SuccessStatus(resp.StatusCode) {

View file

@ -463,7 +463,7 @@ func addTestManifestWithEtag(repo, reference string, content []byte, m *testutil
Method: "GET",
Route: "/v2/" + repo + "/manifests/" + reference,
Headers: http.Header(map[string][]string{
"Etag": {fmt.Sprintf(`"%s"`, dgst)},
"If-None-Match": {fmt.Sprintf(`"%s"`, dgst)},
}),
}

View file

@ -20,6 +20,7 @@ import (
"github.com/docker/distribution/registry/auth"
registrymiddleware "github.com/docker/distribution/registry/middleware/registry"
repositorymiddleware "github.com/docker/distribution/registry/middleware/repository"
"github.com/docker/distribution/registry/proxy"
"github.com/docker/distribution/registry/storage"
memorycache "github.com/docker/distribution/registry/storage/cache/memory"
rediscache "github.com/docker/distribution/registry/storage/cache/redis"
@ -55,6 +56,9 @@ type App struct {
}
redis *redis.Pool
// true if this registry is configured as a pull through cache
isCache bool
}
// NewApp takes a configuration and returns a configured app, ready to serve
@ -65,6 +69,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
Config: configuration,
Context: ctx,
router: v2.RouterWithPrefix(configuration.HTTP.Prefix),
isCache: configuration.Proxy.RemoteURL != "",
}
app.Context = ctxu.WithLogger(app.Context, ctxu.GetLogger(app, "instance.id"))
@ -152,10 +157,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
if app.redis == nil {
panic("redis configuration required to use for layerinfo cache")
}
app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis), deleteEnabled, !redirectDisabled)
app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis), deleteEnabled, !redirectDisabled, app.isCache)
ctxu.GetLogger(app).Infof("using redis blob descriptor cache")
case "inmemory":
app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), deleteEnabled, !redirectDisabled)
app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), deleteEnabled, !redirectDisabled, app.isCache)
ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache")
default:
if v != "" {
@ -166,10 +171,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
if app.registry == nil {
// configure the registry if no cache section is available.
app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil, deleteEnabled, !redirectDisabled)
app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil, deleteEnabled, !redirectDisabled, app.isCache)
}
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])
app.registry, err = applyRegistryMiddleware(app.Context, app.registry, configuration.Middleware["registry"])
if err != nil {
panic(err)
}
@ -185,6 +190,16 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
ctxu.GetLogger(app).Debugf("configured %q access controller", authType)
}
// configure as a pull through cache
if configuration.Proxy.RemoteURL != "" {
app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, configuration.Proxy)
if err != nil {
panic(err.Error())
}
app.isCache = true
ctxu.GetLogger(app).Info("Registry configured as a proxy cache to ", configuration.Proxy.RemoteURL)
}
return app
}
@ -447,7 +462,7 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
repository,
app.eventBridge(context, r))
context.Repository, err = applyRepoMiddleware(context.Repository, app.Config.Middleware["repository"])
context.Repository, err = applyRepoMiddleware(context.Context, context.Repository, app.Config.Middleware["repository"])
if err != nil {
ctxu.GetLogger(context).Errorf("error initializing repository middleware: %v", err)
context.Errors = append(context.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
@ -668,9 +683,9 @@ func appendCatalogAccessRecord(accessRecords []auth.Access, r *http.Request) []a
}
// applyRegistryMiddleware wraps a registry instance with the configured middlewares
func applyRegistryMiddleware(registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) {
func applyRegistryMiddleware(ctx context.Context, registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) {
for _, mw := range middlewares {
rmw, err := registrymiddleware.Get(mw.Name, mw.Options, registry)
rmw, err := registrymiddleware.Get(ctx, mw.Name, mw.Options, registry)
if err != nil {
return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err)
}
@ -681,9 +696,9 @@ func applyRegistryMiddleware(registry distribution.Namespace, middlewares []conf
}
// applyRepoMiddleware wraps a repository with the configured middlewares
func applyRepoMiddleware(repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) {
func applyRepoMiddleware(ctx context.Context, repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) {
for _, mw := range middlewares {
rmw, err := repositorymiddleware.Get(mw.Name, mw.Options, repository)
rmw, err := repositorymiddleware.Get(ctx, mw.Name, mw.Options, repository)
if err != nil {
return nil, err
}

View file

@ -31,7 +31,7 @@ func TestAppDispatcher(t *testing.T) {
Context: ctx,
router: v2.Router(),
driver: driver,
registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), true, true),
registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), true, true, false),
}
server := httptest.NewServer(app)
router := v2.Router()

View file

@ -4,11 +4,12 @@ import (
"fmt"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
)
// InitFunc is the type of a RegistryMiddleware factory function and is
// used to register the constructor for different RegistryMiddleware backends.
type InitFunc func(registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error)
type InitFunc func(ctx context.Context, registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error)
var middlewares map[string]InitFunc
@ -28,10 +29,10 @@ func Register(name string, initFunc InitFunc) error {
}
// Get constructs a RegistryMiddleware with the given options using the named backend.
func Get(name string, options map[string]interface{}, registry distribution.Namespace) (distribution.Namespace, error) {
func Get(ctx context.Context, name string, options map[string]interface{}, registry distribution.Namespace) (distribution.Namespace, error) {
if middlewares != nil {
if initFunc, exists := middlewares[name]; exists {
return initFunc(registry, options)
return initFunc(ctx, registry, options)
}
}

View file

@ -4,11 +4,12 @@ import (
"fmt"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
)
// InitFunc is the type of a RepositoryMiddleware factory function and is
// used to register the constructor for different RepositoryMiddleware backends.
type InitFunc func(repository distribution.Repository, options map[string]interface{}) (distribution.Repository, error)
type InitFunc func(ctx context.Context, repository distribution.Repository, options map[string]interface{}) (distribution.Repository, error)
var middlewares map[string]InitFunc
@ -28,10 +29,10 @@ func Register(name string, initFunc InitFunc) error {
}
// Get constructs a RepositoryMiddleware with the given options using the named backend.
func Get(name string, options map[string]interface{}, repository distribution.Repository) (distribution.Repository, error) {
func Get(ctx context.Context, name string, options map[string]interface{}, repository distribution.Repository) (distribution.Repository, error) {
if middlewares != nil {
if initFunc, exists := middlewares[name]; exists {
return initFunc(repository, options)
return initFunc(ctx, repository, options)
}
}

54
docs/proxy/proxyauth.go Normal file
View file

@ -0,0 +1,54 @@
package proxy
import (
"net/http"
"net/url"
"github.com/docker/distribution/registry/client/auth"
)
const tokenURL = "https://auth.docker.io/token"
type userpass struct {
username string
password string
}
type credentials struct {
creds map[string]userpass
}
func (c credentials) Basic(u *url.URL) (string, string) {
up := c.creds[u.String()]
return up.username, up.password
}
// ConfigureAuth authorizes with the upstream registry
func ConfigureAuth(remoteURL, username, password string, cm auth.ChallengeManager) (auth.CredentialStore, error) {
if err := ping(cm, remoteURL+"/v2/", "Docker-Distribution-Api-Version"); err != nil {
return nil, err
}
creds := map[string]userpass{
tokenURL: {
username: username,
password: password,
},
}
return credentials{creds: creds}, nil
}
func ping(manager auth.ChallengeManager, endpoint, versionHeader string) error {
resp, err := http.Get(endpoint)
if err != nil {
return err
}
defer resp.Body.Close()
if err := manager.AddResponse(resp); err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,214 @@
package proxy
import (
"io"
"net/http"
"strconv"
"sync"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/proxy/scheduler"
)
// todo(richardscothern): from cache control header or config file
const blobTTL = time.Duration(24 * 7 * time.Hour)
type proxyBlobStore struct {
localStore distribution.BlobStore
remoteStore distribution.BlobService
scheduler *scheduler.TTLExpirationScheduler
}
var _ distribution.BlobStore = proxyBlobStore{}
type inflightBlob struct {
refCount int
bw distribution.BlobWriter
}
// inflight tracks currently downloading blobs
var inflight = make(map[digest.Digest]*inflightBlob)
// mu protects inflight
var mu sync.Mutex
func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
w.Header().Set("Content-Type", mediaType)
w.Header().Set("Docker-Content-Digest", digest.String())
w.Header().Set("Etag", digest.String())
}
func (pbs proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
desc, err := pbs.localStore.Stat(ctx, dgst)
if err != nil && err != distribution.ErrBlobUnknown {
return err
}
if err == nil {
proxyMetrics.BlobPush(uint64(desc.Size))
return pbs.localStore.ServeBlob(ctx, w, r, dgst)
}
desc, err = pbs.remoteStore.Stat(ctx, dgst)
if err != nil {
return err
}
remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
if err != nil {
return err
}
bw, isNew, cleanup, err := getOrCreateBlobWriter(ctx, pbs.localStore, desc)
if err != nil {
return err
}
defer cleanup()
if isNew {
go func() {
err := streamToStorage(ctx, remoteReader, desc, bw)
if err != nil {
context.GetLogger(ctx).Error(err)
}
proxyMetrics.BlobPull(uint64(desc.Size))
}()
err := streamToClient(ctx, w, desc, bw)
if err != nil {
return err
}
proxyMetrics.BlobPush(uint64(desc.Size))
pbs.scheduler.AddBlob(dgst.String(), blobTTL)
return nil
}
err = streamToClient(ctx, w, desc, bw)
if err != nil {
return err
}
proxyMetrics.BlobPush(uint64(desc.Size))
return nil
}
type cleanupFunc func()
// getOrCreateBlobWriter will track which blobs are currently being downloaded and enable client requesting
// the same blob concurrently to read from the existing stream.
func getOrCreateBlobWriter(ctx context.Context, blobs distribution.BlobService, desc distribution.Descriptor) (distribution.BlobWriter, bool, cleanupFunc, error) {
mu.Lock()
defer mu.Unlock()
dgst := desc.Digest
cleanup := func() {
mu.Lock()
defer mu.Unlock()
inflight[dgst].refCount--
if inflight[dgst].refCount == 0 {
defer delete(inflight, dgst)
_, err := inflight[dgst].bw.Commit(ctx, desc)
if err != nil {
// There is a narrow race here where Commit can be called while this blob's TTL is expiring
// and its being removed from storage. In that case, the client stream will continue
// uninterruped and the blob will be pulled through on the next request, so just log it
context.GetLogger(ctx).Errorf("Error committing blob: %q", err)
}
}
}
var bw distribution.BlobWriter
_, ok := inflight[dgst]
if ok {
bw = inflight[dgst].bw
inflight[dgst].refCount++
return bw, false, cleanup, nil
}
var err error
bw, err = blobs.Create(ctx)
if err != nil {
return nil, false, nil, err
}
inflight[dgst] = &inflightBlob{refCount: 1, bw: bw}
return bw, true, cleanup, nil
}
func streamToStorage(ctx context.Context, remoteReader distribution.ReadSeekCloser, desc distribution.Descriptor, bw distribution.BlobWriter) error {
_, err := io.CopyN(bw, remoteReader, desc.Size)
if err != nil {
return err
}
return nil
}
func streamToClient(ctx context.Context, w http.ResponseWriter, desc distribution.Descriptor, bw distribution.BlobWriter) error {
setResponseHeaders(w, desc.Size, desc.MediaType, desc.Digest)
reader, err := bw.Reader()
if err != nil {
return err
}
defer reader.Close()
teeReader := io.TeeReader(reader, w)
buf := make([]byte, 32768, 32786)
var soFar int64
for {
rd, err := teeReader.Read(buf)
if err == nil || err == io.EOF {
soFar += int64(rd)
if soFar < desc.Size {
// buffer underflow, keep trying
continue
}
return nil
}
return err
}
}
func (pbs proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
desc, err := pbs.localStore.Stat(ctx, dgst)
if err == nil {
return desc, err
}
if err != distribution.ErrBlobUnknown {
return distribution.Descriptor{}, err
}
return pbs.remoteStore.Stat(ctx, dgst)
}
// Unsupported functions
func (pbs proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
return distribution.Descriptor{}, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
return distribution.ErrUnsupported
}

View file

@ -0,0 +1,231 @@
package proxy
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/proxy/scheduler"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache/memory"
"github.com/docker/distribution/registry/storage/driver/inmemory"
)
type statsBlobStore struct {
stats map[string]int
blobs distribution.BlobStore
}
func (sbs statsBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
sbs.stats["put"]++
return sbs.blobs.Put(ctx, mediaType, p)
}
func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
sbs.stats["get"]++
return sbs.blobs.Get(ctx, dgst)
}
func (sbs statsBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
sbs.stats["create"]++
return sbs.blobs.Create(ctx)
}
func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
sbs.stats["resume"]++
return sbs.blobs.Resume(ctx, id)
}
func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
sbs.stats["open"]++
return sbs.blobs.Open(ctx, dgst)
}
func (sbs statsBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
sbs.stats["serveblob"]++
return sbs.blobs.ServeBlob(ctx, w, r, dgst)
}
func (sbs statsBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
sbs.stats["stat"]++
return sbs.blobs.Stat(ctx, dgst)
}
func (sbs statsBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
sbs.stats["delete"]++
return sbs.blobs.Delete(ctx, dgst)
}
type testEnv struct {
inRemote []distribution.Descriptor
store proxyBlobStore
ctx context.Context
}
func (te testEnv) LocalStats() *map[string]int {
ls := te.store.localStore.(statsBlobStore).stats
return &ls
}
func (te testEnv) RemoteStats() *map[string]int {
rs := te.store.remoteStore.(statsBlobStore).stats
return &rs
}
// Populate remote store and record the digests
func makeTestEnv(t *testing.T, name string) testEnv {
ctx := context.Background()
localRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, true)
localRepo, err := localRegistry.Repository(ctx, name)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
truthRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, false, false)
truthRepo, err := truthRegistry.Repository(ctx, name)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
truthBlobs := statsBlobStore{
stats: make(map[string]int),
blobs: truthRepo.Blobs(ctx),
}
localBlobs := statsBlobStore{
stats: make(map[string]int),
blobs: localRepo.Blobs(ctx),
}
s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json")
proxyBlobStore := proxyBlobStore{
remoteStore: truthBlobs,
localStore: localBlobs,
scheduler: s,
}
te := testEnv{
store: proxyBlobStore,
ctx: ctx,
}
return te
}
func populate(t *testing.T, te *testEnv, blobCount int) {
var inRemote []distribution.Descriptor
for i := 0; i < blobCount; i++ {
bytes := []byte(fmt.Sprintf("blob%d", i))
desc, err := te.store.remoteStore.Put(te.ctx, "", bytes)
if err != nil {
t.Errorf("Put in store")
}
inRemote = append(inRemote, desc)
}
te.inRemote = inRemote
}
func TestProxyStoreStat(t *testing.T) {
te := makeTestEnv(t, "foo/bar")
remoteBlobCount := 1
populate(t, &te, remoteBlobCount)
localStats := te.LocalStats()
remoteStats := te.RemoteStats()
// Stat - touches both stores
for _, d := range te.inRemote {
_, err := te.store.Stat(te.ctx, d.Digest)
if err != nil {
t.Fatalf("Error stating proxy store")
}
}
if (*localStats)["stat"] != remoteBlobCount {
t.Errorf("Unexpected local stat count")
}
if (*remoteStats)["stat"] != remoteBlobCount {
t.Errorf("Unexpected remote stat count")
}
}
func TestProxyStoreServe(t *testing.T) {
te := makeTestEnv(t, "foo/bar")
remoteBlobCount := 1
populate(t, &te, remoteBlobCount)
localStats := te.LocalStats()
remoteStats := te.RemoteStats()
// Serveblob - pulls through blobs
for _, dr := range te.inRemote {
w := httptest.NewRecorder()
r, err := http.NewRequest("GET", "", nil)
if err != nil {
t.Fatal(err)
}
err = te.store.ServeBlob(te.ctx, w, r, dr.Digest)
if err != nil {
t.Fatalf(err.Error())
}
dl, err := digest.FromBytes(w.Body.Bytes())
if err != nil {
t.Fatalf("Error making digest from blob")
}
if dl != dr.Digest {
t.Errorf("Mismatching blob fetch from proxy")
}
}
if (*localStats)["stat"] != remoteBlobCount && (*localStats)["create"] != remoteBlobCount {
t.Fatalf("unexpected local stats")
}
if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount {
t.Fatalf("unexpected local stats")
}
// Serveblob - blobs come from local
for _, dr := range te.inRemote {
w := httptest.NewRecorder()
r, err := http.NewRequest("GET", "", nil)
if err != nil {
t.Fatal(err)
}
err = te.store.ServeBlob(te.ctx, w, r, dr.Digest)
if err != nil {
t.Fatalf(err.Error())
}
dl, err := digest.FromBytes(w.Body.Bytes())
if err != nil {
t.Fatalf("Error making digest from blob")
}
if dl != dr.Digest {
t.Errorf("Mismatching blob fetch from proxy")
}
}
// Stat to find local, but no new blobs were created
if (*localStats)["stat"] != remoteBlobCount*2 && (*localStats)["create"] != remoteBlobCount*2 {
t.Fatalf("unexpected local stats")
}
// Remote unchanged
if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount {
fmt.Printf("\tlocal=%#v, \n\tremote=%#v\n", localStats, remoteStats)
t.Fatalf("unexpected local stats")
}
}

View file

@ -0,0 +1,155 @@
package proxy
import (
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/client"
"github.com/docker/distribution/registry/proxy/scheduler"
)
// todo(richardscothern): from cache control header or config
const repositoryTTL = time.Duration(24 * 7 * time.Hour)
type proxyManifestStore struct {
ctx context.Context
localManifests distribution.ManifestService
remoteManifests distribution.ManifestService
repositoryName string
scheduler *scheduler.TTLExpirationScheduler
}
var _ distribution.ManifestService = &proxyManifestStore{}
func (pms proxyManifestStore) Exists(dgst digest.Digest) (bool, error) {
exists, err := pms.localManifests.Exists(dgst)
if err != nil {
return false, err
}
if exists {
return true, nil
}
return pms.remoteManifests.Exists(dgst)
}
func (pms proxyManifestStore) Get(dgst digest.Digest) (*manifest.SignedManifest, error) {
sm, err := pms.localManifests.Get(dgst)
if err == nil {
proxyMetrics.ManifestPush(uint64(len(sm.Raw)))
return sm, err
}
sm, err = pms.remoteManifests.Get(dgst)
if err != nil {
return nil, err
}
proxyMetrics.ManifestPull(uint64(len(sm.Raw)))
err = pms.localManifests.Put(sm)
if err != nil {
return nil, err
}
// Schedule the repo for removal
pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL)
// Ensure the manifest blob is cleaned up
pms.scheduler.AddBlob(dgst.String(), repositoryTTL)
proxyMetrics.ManifestPush(uint64(len(sm.Raw)))
return sm, err
}
func (pms proxyManifestStore) Tags() ([]string, error) {
return pms.localManifests.Tags()
}
func (pms proxyManifestStore) ExistsByTag(tag string) (bool, error) {
exists, err := pms.localManifests.ExistsByTag(tag)
if err != nil {
return false, err
}
if exists {
return true, nil
}
return pms.remoteManifests.ExistsByTag(tag)
}
func (pms proxyManifestStore) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*manifest.SignedManifest, error) {
var localDigest digest.Digest
localManifest, err := pms.localManifests.GetByTag(tag, options...)
switch err.(type) {
case distribution.ErrManifestUnknown, distribution.ErrManifestUnknownRevision:
goto fromremote
case nil:
break
default:
return nil, err
}
localDigest, err = manifestDigest(localManifest)
if err != nil {
return nil, err
}
fromremote:
var sm *manifest.SignedManifest
sm, err = pms.remoteManifests.GetByTag(tag, client.AddEtagToTag(tag, localDigest.String()))
if err != nil {
return nil, err
}
if sm == nil {
context.GetLogger(pms.ctx).Debugf("Local manifest for %q is latest, dgst=%s", tag, localDigest.String())
return localManifest, nil
}
context.GetLogger(pms.ctx).Debugf("Updated manifest for %q, dgst=%s", tag, localDigest.String())
err = pms.localManifests.Put(sm)
if err != nil {
return nil, err
}
dgst, err := manifestDigest(sm)
if err != nil {
return nil, err
}
pms.scheduler.AddBlob(dgst.String(), repositoryTTL)
pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL)
proxyMetrics.ManifestPull(uint64(len(sm.Raw)))
proxyMetrics.ManifestPush(uint64(len(sm.Raw)))
return sm, err
}
func manifestDigest(sm *manifest.SignedManifest) (digest.Digest, error) {
payload, err := sm.Payload()
if err != nil {
return "", err
}
dgst, err := digest.FromBytes(payload)
if err != nil {
return "", err
}
return dgst, nil
}
func (pms proxyManifestStore) Put(manifest *manifest.SignedManifest) error {
return v2.ErrorCodeUnsupported
}
func (pms proxyManifestStore) Delete(dgst digest.Digest) error {
return v2.ErrorCodeUnsupported
}

View file

@ -0,0 +1,235 @@
package proxy
import (
"io"
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/proxy/scheduler"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache/memory"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/docker/distribution/testutil"
"github.com/docker/libtrust"
)
type statsManifest struct {
manifests distribution.ManifestService
stats map[string]int
}
type manifestStoreTestEnv struct {
manifestDigest digest.Digest // digest of the signed manifest in the local storage
manifests proxyManifestStore
}
func (te manifestStoreTestEnv) LocalStats() *map[string]int {
ls := te.manifests.localManifests.(statsManifest).stats
return &ls
}
func (te manifestStoreTestEnv) RemoteStats() *map[string]int {
rs := te.manifests.remoteManifests.(statsManifest).stats
return &rs
}
func (sm statsManifest) Delete(dgst digest.Digest) error {
sm.stats["delete"]++
return sm.manifests.Delete(dgst)
}
func (sm statsManifest) Exists(dgst digest.Digest) (bool, error) {
sm.stats["exists"]++
return sm.manifests.Exists(dgst)
}
func (sm statsManifest) ExistsByTag(tag string) (bool, error) {
sm.stats["existbytag"]++
return sm.manifests.ExistsByTag(tag)
}
func (sm statsManifest) Get(dgst digest.Digest) (*manifest.SignedManifest, error) {
sm.stats["get"]++
return sm.manifests.Get(dgst)
}
func (sm statsManifest) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*manifest.SignedManifest, error) {
sm.stats["getbytag"]++
return sm.manifests.GetByTag(tag, options...)
}
func (sm statsManifest) Put(manifest *manifest.SignedManifest) error {
sm.stats["put"]++
return sm.manifests.Put(manifest)
}
func (sm statsManifest) Tags() ([]string, error) {
sm.stats["tags"]++
return sm.manifests.Tags()
}
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
ctx := context.Background()
truthRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, false, false)
truthRepo, err := truthRegistry.Repository(ctx, name)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
tr, err := truthRepo.Manifests(ctx)
if err != nil {
t.Fatal(err.Error())
}
truthManifests := statsManifest{
manifests: tr,
stats: make(map[string]int),
}
manifestDigest, err := populateRepo(t, ctx, truthRepo, name, tag)
if err != nil {
t.Fatalf(err.Error())
}
localRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, true)
localRepo, err := localRegistry.Repository(ctx, name)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
lr, err := localRepo.Manifests(ctx)
if err != nil {
t.Fatal(err.Error())
}
localManifests := statsManifest{
manifests: lr,
stats: make(map[string]int),
}
s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json")
return &manifestStoreTestEnv{
manifestDigest: manifestDigest,
manifests: proxyManifestStore{
ctx: ctx,
localManifests: localManifests,
remoteManifests: truthManifests,
scheduler: s,
},
}
}
func populateRepo(t *testing.T, ctx context.Context, repository distribution.Repository, name, tag string) (digest.Digest, error) {
m := manifest.Manifest{
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
Name: name,
Tag: tag,
}
for i := 0; i < 2; i++ {
wr, err := repository.Blobs(ctx).Create(ctx)
if err != nil {
t.Fatalf("unexpected error creating test upload: %v", err)
}
rs, ts, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("unexpected error generating test layer file")
}
dgst := digest.Digest(ts)
if _, err := io.Copy(wr, rs); err != nil {
t.Fatalf("unexpected error copying to upload: %v", err)
}
if _, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}); err != nil {
t.Fatalf("unexpected error finishing upload: %v", err)
}
}
pk, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
t.Fatalf("unexpected error generating private key: %v", err)
}
sm, err := manifest.Sign(&m, pk)
if err != nil {
t.Fatalf("error signing manifest: %v", err)
}
ms, err := repository.Manifests(ctx)
if err != nil {
t.Fatalf(err.Error())
}
ms.Put(sm)
if err != nil {
t.Fatalf("unexpected errors putting manifest: %v", err)
}
pl, err := sm.Payload()
if err != nil {
t.Fatal(err)
}
return digest.FromBytes(pl)
}
// TestProxyManifests contains basic acceptance tests
// for the pull-through behavior
func TestProxyManifests(t *testing.T) {
name := "foo/bar"
env := newManifestStoreTestEnv(t, name, "latest")
localStats := env.LocalStats()
remoteStats := env.RemoteStats()
// Stat - must check local and remote
exists, err := env.manifests.ExistsByTag("latest")
if err != nil {
t.Fatalf("Error checking existance")
}
if !exists {
t.Errorf("Unexpected non-existant manifest")
}
if (*localStats)["existbytag"] != 1 && (*remoteStats)["existbytag"] != 1 {
t.Errorf("Unexpected exists count")
}
// Get - should succeed and pull manifest into local
_, err = env.manifests.Get(env.manifestDigest)
if err != nil {
t.Fatal(err)
}
if (*localStats)["get"] != 1 && (*remoteStats)["get"] != 1 {
t.Errorf("Unexpected get count")
}
if (*localStats)["put"] != 1 {
t.Errorf("Expected local put")
}
// Stat - should only go to local
exists, err = env.manifests.ExistsByTag("latest")
if err != nil {
t.Fatal(err)
}
if !exists {
t.Errorf("Unexpected non-existant manifest")
}
if (*localStats)["existbytag"] != 2 && (*remoteStats)["existbytag"] != 1 {
t.Errorf("Unexpected exists count")
}
// Get - should get from remote, to test freshness
_, err = env.manifests.Get(env.manifestDigest)
if err != nil {
t.Fatal(err)
}
if (*remoteStats)["get"] != 2 && (*remoteStats)["existsbytag"] != 1 && (*localStats)["put"] != 1 {
t.Errorf("Unexpected get count")
}
}

View file

@ -0,0 +1,74 @@
package proxy
import (
"expvar"
"sync/atomic"
)
// Metrics is used to hold metric counters
// related to the proxy
type Metrics struct {
Requests uint64
Hits uint64
Misses uint64
BytesPulled uint64
BytesPushed uint64
}
type proxyMetricsCollector struct {
blobMetrics Metrics
manifestMetrics Metrics
}
// BlobPull tracks metrics about blobs pulled into the cache
func (pmc *proxyMetricsCollector) BlobPull(bytesPulled uint64) {
atomic.AddUint64(&pmc.blobMetrics.Misses, 1)
atomic.AddUint64(&pmc.blobMetrics.BytesPulled, bytesPulled)
}
// BlobPush tracks metrics about blobs pushed to clients
func (pmc *proxyMetricsCollector) BlobPush(bytesPushed uint64) {
atomic.AddUint64(&pmc.blobMetrics.Requests, 1)
atomic.AddUint64(&pmc.blobMetrics.Hits, 1)
atomic.AddUint64(&pmc.blobMetrics.BytesPushed, bytesPushed)
}
// ManifestPull tracks metrics related to Manifests pulled into the cache
func (pmc *proxyMetricsCollector) ManifestPull(bytesPulled uint64) {
atomic.AddUint64(&pmc.manifestMetrics.Misses, 1)
atomic.AddUint64(&pmc.manifestMetrics.BytesPulled, bytesPulled)
}
// ManifestPush tracks metrics about manifests pushed to clients
func (pmc *proxyMetricsCollector) ManifestPush(bytesPushed uint64) {
atomic.AddUint64(&pmc.manifestMetrics.Requests, 1)
atomic.AddUint64(&pmc.manifestMetrics.Hits, 1)
atomic.AddUint64(&pmc.manifestMetrics.BytesPushed, bytesPushed)
}
// proxyMetrics tracks metrics about the proxy cache. This is
// kept globally and made available via expvar.
var proxyMetrics = &proxyMetricsCollector{}
func init() {
registry := expvar.Get("registry")
if registry == nil {
registry = expvar.NewMap("registry")
}
pm := registry.(*expvar.Map).Get("proxy")
if pm == nil {
pm = &expvar.Map{}
pm.(*expvar.Map).Init()
registry.(*expvar.Map).Set("proxy", pm)
}
pm.(*expvar.Map).Set("blobs", expvar.Func(func() interface{} {
return proxyMetrics.blobMetrics
}))
pm.(*expvar.Map).Set("manifests", expvar.Func(func() interface{} {
return proxyMetrics.manifestMetrics
}))
}

139
docs/proxy/proxyregistry.go Normal file
View file

@ -0,0 +1,139 @@
package proxy
import (
"net/http"
"net/url"
"github.com/docker/distribution"
"github.com/docker/distribution/configuration"
"github.com/docker/distribution/context"
"github.com/docker/distribution/registry/client"
"github.com/docker/distribution/registry/client/auth"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/distribution/registry/proxy/scheduler"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/driver"
)
// proxyingRegistry fetches content from a remote registry and caches it locally
type proxyingRegistry struct {
embedded distribution.Namespace // provides local registry functionality
scheduler *scheduler.TTLExpirationScheduler
remoteURL string
credentialStore auth.CredentialStore
challengeManager auth.ChallengeManager
}
// NewRegistryPullThroughCache creates a registry acting as a pull through cache
func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) {
_, err := url.Parse(config.RemoteURL)
if err != nil {
return nil, err
}
v := storage.NewVacuum(ctx, driver)
s := scheduler.New(ctx, driver, "/scheduler-state.json")
s.OnBlobExpire(func(digest string) error {
return v.RemoveBlob(digest)
})
s.OnManifestExpire(func(repoName string) error {
return v.RemoveRepository(repoName)
})
err = s.Start()
if err != nil {
return nil, err
}
challengeManager := auth.NewSimpleChallengeManager()
cs, err := ConfigureAuth(config.RemoteURL, config.Username, config.Password, challengeManager)
if err != nil {
return nil, err
}
return &proxyingRegistry{
embedded: registry,
scheduler: s,
challengeManager: challengeManager,
credentialStore: cs,
remoteURL: config.RemoteURL,
}, nil
}
func (pr *proxyingRegistry) Scope() distribution.Scope {
return distribution.GlobalScope
}
func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
return pr.embedded.Repositories(ctx, repos, last)
}
func (pr *proxyingRegistry) Repository(ctx context.Context, name string) (distribution.Repository, error) {
tr := transport.NewTransport(http.DefaultTransport,
auth.NewAuthorizer(pr.challengeManager, auth.NewTokenHandler(http.DefaultTransport, pr.credentialStore, name, "pull")))
localRepo, err := pr.embedded.Repository(ctx, name)
if err != nil {
return nil, err
}
localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification)
if err != nil {
return nil, err
}
remoteRepo, err := client.NewRepository(ctx, name, pr.remoteURL, tr)
if err != nil {
return nil, err
}
remoteManifests, err := remoteRepo.Manifests(ctx)
if err != nil {
return nil, err
}
return &proxiedRepository{
blobStore: proxyBlobStore{
localStore: localRepo.Blobs(ctx),
remoteStore: remoteRepo.Blobs(ctx),
scheduler: pr.scheduler,
},
manifests: proxyManifestStore{
repositoryName: name,
localManifests: localManifests, // Options?
remoteManifests: remoteManifests,
ctx: ctx,
scheduler: pr.scheduler,
},
name: name,
signatures: localRepo.Signatures(),
}, nil
}
// proxiedRepository uses proxying blob and manifest services to serve content
// locally, or pulling it through from a remote and caching it locally if it doesn't
// already exist
type proxiedRepository struct {
blobStore distribution.BlobStore
manifests distribution.ManifestService
name string
signatures distribution.SignatureService
}
func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
// options
return pr.manifests, nil
}
func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore {
return pr.blobStore
}
func (pr *proxiedRepository) Name() string {
return pr.name
}
func (pr *proxiedRepository) Signatures() distribution.SignatureService {
return pr.signatures
}

View file

@ -0,0 +1,250 @@
package scheduler
import (
"encoding/json"
"fmt"
"time"
"github.com/docker/distribution/context"
"github.com/docker/distribution/registry/storage/driver"
)
// onTTLExpiryFunc is called when a repositories' TTL expires
type expiryFunc func(string) error
const (
entryTypeBlob = iota
entryTypeManifest
)
// schedulerEntry represents an entry in the scheduler
// fields are exported for serialization
type schedulerEntry struct {
Key string `json:"Key"`
Expiry time.Time `json:"ExpiryData"`
EntryType int `json:"EntryType"`
}
// New returns a new instance of the scheduler
func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler {
return &TTLExpirationScheduler{
entries: make(map[string]schedulerEntry),
addChan: make(chan schedulerEntry),
stopChan: make(chan bool),
driver: driver,
pathToStateFile: path,
ctx: ctx,
stopped: true,
}
}
// TTLExpirationScheduler is a scheduler used to perform actions
// when TTLs expire
type TTLExpirationScheduler struct {
entries map[string]schedulerEntry
addChan chan schedulerEntry
stopChan chan bool
driver driver.StorageDriver
ctx context.Context
pathToStateFile string
stopped bool
onBlobExpire expiryFunc
onManifestExpire expiryFunc
}
// addChan allows more TTLs to be pushed to the scheduler
type addChan chan schedulerEntry
// stopChan allows the scheduler to be stopped - used for testing.
type stopChan chan bool
// OnBlobExpire is called when a scheduled blob's TTL expires
func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) {
ttles.onBlobExpire = f
}
// OnManifestExpire is called when a scheduled manifest's TTL expires
func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
ttles.onManifestExpire = f
}
// AddBlob schedules a blob cleanup after ttl expires
func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error {
if ttles.stopped {
return fmt.Errorf("scheduler not started")
}
ttles.add(dgst, ttl, entryTypeBlob)
return nil
}
// AddManifest schedules a manifest cleanup after ttl expires
func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error {
if ttles.stopped {
return fmt.Errorf("scheduler not started")
}
ttles.add(repoName, ttl, entryTypeManifest)
return nil
}
// Start starts the scheduler
func (ttles *TTLExpirationScheduler) Start() error {
return ttles.start()
}
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
entry := schedulerEntry{
Key: key,
Expiry: time.Now().Add(ttl),
EntryType: eType,
}
ttles.addChan <- entry
}
func (ttles *TTLExpirationScheduler) stop() {
ttles.stopChan <- true
}
func (ttles *TTLExpirationScheduler) start() error {
err := ttles.readState()
if err != nil {
return err
}
if !ttles.stopped {
return fmt.Errorf("Scheduler already started")
}
context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...")
ttles.stopped = false
go ttles.mainloop()
return nil
}
// mainloop uses a select statement to listen for events. Most of its time
// is spent in waiting on a TTL to expire but can be interrupted when TTLs
// are added.
func (ttles *TTLExpirationScheduler) mainloop() {
for {
if ttles.stopped {
return
}
nextEntry, ttl := nextExpiringEntry(ttles.entries)
if len(ttles.entries) == 0 {
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Nothing to do, sleeping...")
} else {
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Sleeping for %s until cleanup of %s", ttl, nextEntry.Key)
}
select {
case <-time.After(ttl):
var f expiryFunc
switch nextEntry.EntryType {
case entryTypeBlob:
f = ttles.onBlobExpire
case entryTypeManifest:
f = ttles.onManifestExpire
default:
f = func(repoName string) error {
return fmt.Errorf("Unexpected scheduler entry type")
}
}
if err := f(nextEntry.Key); err != nil {
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", nextEntry.Key, err)
}
delete(ttles.entries, nextEntry.Key)
if err := ttles.writeState(); err != nil {
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
}
case entry := <-ttles.addChan:
context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
ttles.entries[entry.Key] = entry
if err := ttles.writeState(); err != nil {
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
}
break
case <-ttles.stopChan:
if err := ttles.writeState(); err != nil {
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
}
ttles.stopped = true
}
}
}
func nextExpiringEntry(entries map[string]schedulerEntry) (*schedulerEntry, time.Duration) {
if len(entries) == 0 {
return nil, 24 * time.Hour
}
// todo:(richardscothern) this is a primitive o(n) algorithm
// but n will never be *that* big and it's all in memory. Investigate
// time.AfterFunc for heap based expiries
first := true
var nextEntry schedulerEntry
for _, entry := range entries {
if first {
nextEntry = entry
first = false
continue
}
if entry.Expiry.Before(nextEntry.Expiry) {
nextEntry = entry
}
}
// Dates may be from the past if the scheduler has
// been restarted, set their ttl to 0
if nextEntry.Expiry.Before(time.Now()) {
nextEntry.Expiry = time.Now()
return &nextEntry, 0
}
return &nextEntry, nextEntry.Expiry.Sub(time.Now())
}
func (ttles *TTLExpirationScheduler) writeState() error {
jsonBytes, err := json.Marshal(ttles.entries)
if err != nil {
return err
}
err = ttles.driver.PutContent(ttles.ctx, ttles.pathToStateFile, jsonBytes)
if err != nil {
return err
}
return nil
}
func (ttles *TTLExpirationScheduler) readState() error {
if _, err := ttles.driver.Stat(ttles.ctx, ttles.pathToStateFile); err != nil {
switch err := err.(type) {
case driver.PathNotFoundError:
return nil
default:
return err
}
}
bytes, err := ttles.driver.GetContent(ttles.ctx, ttles.pathToStateFile)
if err != nil {
return err
}
err = json.Unmarshal(bytes, &ttles.entries)
if err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,165 @@
package scheduler
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/docker/distribution/context"
"github.com/docker/distribution/registry/storage/driver/inmemory"
)
func TestSchedule(t *testing.T) {
timeUnit := time.Millisecond
remainingRepos := map[string]bool{
"testBlob1": true,
"testBlob2": true,
"ch00": true,
}
s := New(context.Background(), inmemory.New(), "/ttl")
deleteFunc := func(repoName string) error {
if len(remainingRepos) == 0 {
t.Fatalf("Incorrect expiry count")
}
_, ok := remainingRepos[repoName]
if !ok {
t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
}
fmt.Println("removing", repoName)
delete(remainingRepos, repoName)
return nil
}
s.onBlobExpire = deleteFunc
err := s.start()
if err != nil {
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
}
s.add("testBlob1", 3*timeUnit, entryTypeBlob)
s.add("testBlob2", 1*timeUnit, entryTypeBlob)
func() {
s.add("ch00", 1*timeUnit, entryTypeBlob)
}()
// Ensure all repos are deleted
<-time.After(50 * timeUnit)
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
}
}
func TestRestoreOld(t *testing.T) {
remainingRepos := map[string]bool{
"testBlob1": true,
"oldRepo": true,
}
deleteFunc := func(repoName string) error {
if repoName == "oldRepo" && len(remainingRepos) == 3 {
t.Errorf("oldRepo should be removed first")
}
_, ok := remainingRepos[repoName]
if !ok {
t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
}
delete(remainingRepos, repoName)
return nil
}
timeUnit := time.Millisecond
serialized, err := json.Marshal(&map[string]schedulerEntry{
"testBlob1": {
Expiry: time.Now().Add(1 * timeUnit),
Key: "testBlob1",
EntryType: 0,
},
"oldRepo": {
Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first
Key: "oldRepo",
EntryType: 0,
},
})
if err != nil {
t.Fatalf("Error serializing test data: %s", err.Error())
}
ctx := context.Background()
pathToStatFile := "/ttl"
fs := inmemory.New()
err = fs.PutContent(ctx, pathToStatFile, serialized)
if err != nil {
t.Fatal("Unable to write serialized data to fs")
}
s := New(context.Background(), fs, "/ttl")
s.onBlobExpire = deleteFunc
err = s.start()
if err != nil {
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
}
<-time.After(50 * timeUnit)
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
}
}
func TestStopRestore(t *testing.T) {
timeUnit := time.Millisecond
remainingRepos := map[string]bool{
"testBlob1": true,
"testBlob2": true,
}
deleteFunc := func(repoName string) error {
delete(remainingRepos, repoName)
return nil
}
fs := inmemory.New()
pathToStateFile := "/ttl"
s := New(context.Background(), fs, pathToStateFile)
s.onBlobExpire = deleteFunc
err := s.start()
if err != nil {
t.Fatalf(err.Error())
}
s.add("testBlob1", 300*timeUnit, entryTypeBlob)
s.add("testBlob2", 100*timeUnit, entryTypeBlob)
// Start and stop before all operations complete
// state will be written to fs
s.stop()
time.Sleep(10 * time.Millisecond)
// v2 will restore state from fs
s2 := New(context.Background(), fs, pathToStateFile)
s2.onBlobExpire = deleteFunc
err = s2.start()
if err != nil {
t.Fatalf("Error starting v2: %s", err.Error())
}
<-time.After(500 * timeUnit)
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
}
}
func TestDoubleStart(t *testing.T) {
s := New(context.Background(), inmemory.New(), "/ttl")
err := s.start()
if err != nil {
t.Fatalf("Unable to start scheduler")
}
fmt.Printf("%#v", s)
err = s.start()
if err == nil {
t.Fatalf("Scheduler started twice without error")
}
}

View file

@ -33,7 +33,7 @@ func TestSimpleBlobUpload(t *testing.T) {
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true)
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false)
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
@ -193,7 +193,7 @@ func TestSimpleBlobUpload(t *testing.T) {
}
// Reuse state to test delete with a delete-disabled registry
registry = NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true)
registry = NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false)
repository, err = registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
@ -212,7 +212,7 @@ func TestSimpleBlobRead(t *testing.T) {
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true)
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false)
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
@ -316,7 +316,7 @@ func TestLayerUploadZeroLength(t *testing.T) {
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true)
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false)
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)

View file

@ -31,6 +31,8 @@ type blobWriter struct {
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
// LayerUpload Interface
bufferedFileWriter
resumableDigestEnabled bool
}
var _ distribution.BlobWriter = &blobWriter{}
@ -349,3 +351,29 @@ func (bw *blobWriter) removeResources(ctx context.Context) error {
return nil
}
func (bw *blobWriter) Reader() (io.ReadCloser, error) {
// todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4
try := 1
for try <= 5 {
_, err := bw.bufferedFileWriter.driver.Stat(bw.ctx, bw.path)
if err == nil {
break
}
switch err.(type) {
case storagedriver.PathNotFoundError:
context.GetLogger(bw.ctx).Debugf("Nothing found on try %d, sleeping...", try)
time.Sleep(1 * time.Second)
try++
default:
return nil, err
}
}
readCloser, err := bw.bufferedFileWriter.driver.ReadStream(bw.ctx, bw.path, 0)
if err != nil {
return nil, err
}
return readCloser, nil
}

View file

@ -24,6 +24,10 @@ import (
// offset. Any unhashed bytes remaining less than the given offset are hashed
// from the content uploaded so far.
func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error {
if !bw.resumableDigestEnabled {
return errResumableDigestNotAvailable
}
if offset < 0 {
return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
}
@ -143,6 +147,10 @@ func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry
}
func (bw *blobWriter) storeHashState(ctx context.Context) error {
if !bw.resumableDigestEnabled {
return errResumableDigestNotAvailable
}
h, ok := bw.digester.Hash().(resumable.Hash)
if !ok {
return errResumableDigestNotAvailable

View file

@ -22,7 +22,7 @@ func setupFS(t *testing.T) *setupEnv {
d := inmemory.New()
c := []byte("")
ctx := context.Background()
registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true)
registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false)
rootpath, _ := defaultPathMapper.path(repositoriesRootPathSpec{})
repos := []string{

View file

@ -16,11 +16,12 @@ import (
// that grant access to the global blob store.
type linkedBlobStore struct {
*blobStore
blobServer distribution.BlobServer
blobAccessController distribution.BlobDescriptorService
repository distribution.Repository
ctx context.Context // only to be used where context can't come through method args
deleteEnabled bool
blobServer distribution.BlobServer
blobAccessController distribution.BlobDescriptorService
repository distribution.Repository
ctx context.Context // only to be used where context can't come through method args
deleteEnabled bool
resumableDigestEnabled bool
// linkPath allows one to control the repository blob link set to which
// the blob store dispatches. This is required because manifest and layer
@ -189,11 +190,12 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
}
bw := &blobWriter{
blobStore: lbs,
id: uuid,
startedAt: startedAt,
digester: digest.Canonical.New(),
bufferedFileWriter: *fw,
blobStore: lbs,
id: uuid,
startedAt: startedAt,
digester: digest.Canonical.New(),
bufferedFileWriter: *fw,
resumableDigestEnabled: lbs.resumableDigestEnabled,
}
return bw, nil

View file

@ -29,7 +29,7 @@ type manifestStoreTestEnv struct {
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
ctx := context.Background()
driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true)
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false)
repo, err := registry.Repository(ctx, name)
if err != nil {
@ -348,7 +348,7 @@ func TestManifestStorage(t *testing.T) {
t.Errorf("Deleted manifest get returned non-nil")
}
r := NewRegistryWithDriver(ctx, env.driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true)
r := NewRegistryWithDriver(ctx, env.driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false)
repo, err := r.Repository(ctx, env.name)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)

View file

@ -16,6 +16,7 @@ type registry struct {
statter distribution.BlobStatter // global statter service.
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
deleteEnabled bool
resumableDigestEnabled bool
}
// NewRegistryWithDriver creates a new registry instance from the provided
@ -23,9 +24,9 @@ type registry struct {
// cheap to allocate. If redirect is true, the backend blob server will
// attempt to use (StorageDriver).URLFor to serve all blobs.
//
// TODO(stevvooe): This function signature is getting out of hand. Move to
// TODO(stevvooe): This function signature is getting very out of hand. Move to
// functional options for instance configuration.
func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider, deleteEnabled bool, redirect bool) distribution.Namespace {
func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider, deleteEnabled bool, redirect bool, isCache bool) distribution.Namespace {
// create global statter, with cache.
var statter distribution.BlobDescriptorService = &blobStatter{
driver: driver,
@ -52,6 +53,7 @@ func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriv
},
blobDescriptorCacheProvider: blobDescriptorCacheProvider,
deleteEnabled: deleteEnabled,
resumableDigestEnabled: !isCache,
}
}

67
docs/storage/vacuum.go Normal file
View file

@ -0,0 +1,67 @@
package storage
import (
"path"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/storage/driver"
)
// vacuum contains functions for cleaning up repositories and blobs
// These functions will only reliably work on strongly consistent
// storage systems.
// https://en.wikipedia.org/wiki/Consistency_model
// NewVacuum creates a new Vacuum
func NewVacuum(ctx context.Context, driver driver.StorageDriver) Vacuum {
return Vacuum{
ctx: ctx,
driver: driver,
pm: defaultPathMapper,
}
}
// Vacuum removes content from the filesystem
type Vacuum struct {
pm *pathMapper
driver driver.StorageDriver
ctx context.Context
}
// RemoveBlob removes a blob from the filesystem
func (v Vacuum) RemoveBlob(dgst string) error {
d, err := digest.ParseDigest(dgst)
if err != nil {
return err
}
blobPath, err := v.pm.path(blobDataPathSpec{digest: d})
if err != nil {
return err
}
context.GetLogger(v.ctx).Infof("Deleting blob: %s", blobPath)
err = v.driver.Delete(v.ctx, blobPath)
if err != nil {
return err
}
return nil
}
// RemoveRepository removes a repository directory from the
// filesystem
func (v Vacuum) RemoveRepository(repoName string) error {
rootForRepository, err := v.pm.path(repositoriesRootPathSpec{})
if err != nil {
return err
}
repoDir := path.Join(rootForRepository, repoName)
context.GetLogger(v.ctx).Infof("Deleting repo: %s", repoDir)
err = v.driver.Delete(v.ctx, repoDir)
if err != nil {
return err
}
return nil
}