Add pull through cache functionality to the Registry which can be configured

with a new `proxy` section in the configuration file.

Create a new registry type which delegates storage to a proxyBlobStore
and proxyManifestStore.  These stores will pull through data if not present
locally.  proxyBlobStore takes care not to write duplicate data to disk.

Add a scheduler to cleanup expired content. The scheduler runs as a background
goroutine.  When a blob or manifest is pulled through from the remote registry,
an entry is added to the scheduler with a TTL.  When the TTL expires the
scheduler calls a pre-specified function to remove the fetched resource.

Add token authentication to the registry middleware.  Get a token at startup
and preload the credential store with the username and password supplied in the
config file.

Allow resumable digest functionality to be disabled at runtime and disable
it when the registry is a pull through cache.

Signed-off-by: Richard Scothern <richard.scothern@gmail.com>
This commit is contained in:
Richard Scothern 2015-07-29 11:12:01 -07:00
parent 9dac3cc571
commit d1cb12fa3d
24 changed files with 1682 additions and 38 deletions

View file

@ -25,6 +25,10 @@ type httpBlobUpload struct {
closed bool closed bool
} }
func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) {
panic("Not implemented")
}
func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error { func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
if resp.StatusCode == http.StatusNotFound { if resp.StatusCode == http.StatusNotFound {
return distribution.ErrBlobUploadUnknown return distribution.ErrBlobUploadUnknown

View file

@ -280,14 +280,13 @@ func (ms *manifests) GetByTag(tag string, options ...distribution.ManifestServic
} }
if _, ok := ms.etags[tag]; ok { if _, ok := ms.etags[tag]; ok {
req.Header.Set("eTag", ms.etags[tag]) req.Header.Set("If-None-Match", ms.etags[tag])
} }
resp, err := ms.client.Do(req) resp, err := ms.client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode == http.StatusNotModified { if resp.StatusCode == http.StatusNotModified {
return nil, nil return nil, nil
} else if SuccessStatus(resp.StatusCode) { } else if SuccessStatus(resp.StatusCode) {

View file

@ -463,7 +463,7 @@ func addTestManifestWithEtag(repo, reference string, content []byte, m *testutil
Method: "GET", Method: "GET",
Route: "/v2/" + repo + "/manifests/" + reference, Route: "/v2/" + repo + "/manifests/" + reference,
Headers: http.Header(map[string][]string{ Headers: http.Header(map[string][]string{
"Etag": {fmt.Sprintf(`"%s"`, dgst)}, "If-None-Match": {fmt.Sprintf(`"%s"`, dgst)},
}), }),
} }

View file

@ -20,6 +20,7 @@ import (
"github.com/docker/distribution/registry/auth" "github.com/docker/distribution/registry/auth"
registrymiddleware "github.com/docker/distribution/registry/middleware/registry" registrymiddleware "github.com/docker/distribution/registry/middleware/registry"
repositorymiddleware "github.com/docker/distribution/registry/middleware/repository" repositorymiddleware "github.com/docker/distribution/registry/middleware/repository"
"github.com/docker/distribution/registry/proxy"
"github.com/docker/distribution/registry/storage" "github.com/docker/distribution/registry/storage"
memorycache "github.com/docker/distribution/registry/storage/cache/memory" memorycache "github.com/docker/distribution/registry/storage/cache/memory"
rediscache "github.com/docker/distribution/registry/storage/cache/redis" rediscache "github.com/docker/distribution/registry/storage/cache/redis"
@ -55,6 +56,9 @@ type App struct {
} }
redis *redis.Pool redis *redis.Pool
// true if this registry is configured as a pull through cache
isCache bool
} }
// NewApp takes a configuration and returns a configured app, ready to serve // NewApp takes a configuration and returns a configured app, ready to serve
@ -65,6 +69,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
Config: configuration, Config: configuration,
Context: ctx, Context: ctx,
router: v2.RouterWithPrefix(configuration.HTTP.Prefix), router: v2.RouterWithPrefix(configuration.HTTP.Prefix),
isCache: configuration.Proxy.RemoteURL != "",
} }
app.Context = ctxu.WithLogger(app.Context, ctxu.GetLogger(app, "instance.id")) app.Context = ctxu.WithLogger(app.Context, ctxu.GetLogger(app, "instance.id"))
@ -152,10 +157,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
if app.redis == nil { if app.redis == nil {
panic("redis configuration required to use for layerinfo cache") panic("redis configuration required to use for layerinfo cache")
} }
app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis), deleteEnabled, !redirectDisabled) app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis), deleteEnabled, !redirectDisabled, app.isCache)
ctxu.GetLogger(app).Infof("using redis blob descriptor cache") ctxu.GetLogger(app).Infof("using redis blob descriptor cache")
case "inmemory": case "inmemory":
app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), deleteEnabled, !redirectDisabled) app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), deleteEnabled, !redirectDisabled, app.isCache)
ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache") ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache")
default: default:
if v != "" { if v != "" {
@ -166,10 +171,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
if app.registry == nil { if app.registry == nil {
// configure the registry if no cache section is available. // configure the registry if no cache section is available.
app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil, deleteEnabled, !redirectDisabled) app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil, deleteEnabled, !redirectDisabled, app.isCache)
} }
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"]) app.registry, err = applyRegistryMiddleware(app.Context, app.registry, configuration.Middleware["registry"])
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -185,6 +190,16 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
ctxu.GetLogger(app).Debugf("configured %q access controller", authType) ctxu.GetLogger(app).Debugf("configured %q access controller", authType)
} }
// configure as a pull through cache
if configuration.Proxy.RemoteURL != "" {
app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, configuration.Proxy)
if err != nil {
panic(err.Error())
}
app.isCache = true
ctxu.GetLogger(app).Info("Registry configured as a proxy cache to ", configuration.Proxy.RemoteURL)
}
return app return app
} }
@ -447,7 +462,7 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
repository, repository,
app.eventBridge(context, r)) app.eventBridge(context, r))
context.Repository, err = applyRepoMiddleware(context.Repository, app.Config.Middleware["repository"]) context.Repository, err = applyRepoMiddleware(context.Context, context.Repository, app.Config.Middleware["repository"])
if err != nil { if err != nil {
ctxu.GetLogger(context).Errorf("error initializing repository middleware: %v", err) ctxu.GetLogger(context).Errorf("error initializing repository middleware: %v", err)
context.Errors = append(context.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) context.Errors = append(context.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
@ -668,9 +683,9 @@ func appendCatalogAccessRecord(accessRecords []auth.Access, r *http.Request) []a
} }
// applyRegistryMiddleware wraps a registry instance with the configured middlewares // applyRegistryMiddleware wraps a registry instance with the configured middlewares
func applyRegistryMiddleware(registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) { func applyRegistryMiddleware(ctx context.Context, registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) {
for _, mw := range middlewares { for _, mw := range middlewares {
rmw, err := registrymiddleware.Get(mw.Name, mw.Options, registry) rmw, err := registrymiddleware.Get(ctx, mw.Name, mw.Options, registry)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err) return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err)
} }
@ -681,9 +696,9 @@ func applyRegistryMiddleware(registry distribution.Namespace, middlewares []conf
} }
// applyRepoMiddleware wraps a repository with the configured middlewares // applyRepoMiddleware wraps a repository with the configured middlewares
func applyRepoMiddleware(repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) { func applyRepoMiddleware(ctx context.Context, repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) {
for _, mw := range middlewares { for _, mw := range middlewares {
rmw, err := repositorymiddleware.Get(mw.Name, mw.Options, repository) rmw, err := repositorymiddleware.Get(ctx, mw.Name, mw.Options, repository)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -31,7 +31,7 @@ func TestAppDispatcher(t *testing.T) {
Context: ctx, Context: ctx,
router: v2.Router(), router: v2.Router(),
driver: driver, driver: driver,
registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), true, true), registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), true, true, false),
} }
server := httptest.NewServer(app) server := httptest.NewServer(app)
router := v2.Router() router := v2.Router()

View file

@ -4,11 +4,12 @@ import (
"fmt" "fmt"
"github.com/docker/distribution" "github.com/docker/distribution"
"github.com/docker/distribution/context"
) )
// InitFunc is the type of a RegistryMiddleware factory function and is // InitFunc is the type of a RegistryMiddleware factory function and is
// used to register the constructor for different RegistryMiddleware backends. // used to register the constructor for different RegistryMiddleware backends.
type InitFunc func(registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) type InitFunc func(ctx context.Context, registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error)
var middlewares map[string]InitFunc var middlewares map[string]InitFunc
@ -28,10 +29,10 @@ func Register(name string, initFunc InitFunc) error {
} }
// Get constructs a RegistryMiddleware with the given options using the named backend. // Get constructs a RegistryMiddleware with the given options using the named backend.
func Get(name string, options map[string]interface{}, registry distribution.Namespace) (distribution.Namespace, error) { func Get(ctx context.Context, name string, options map[string]interface{}, registry distribution.Namespace) (distribution.Namespace, error) {
if middlewares != nil { if middlewares != nil {
if initFunc, exists := middlewares[name]; exists { if initFunc, exists := middlewares[name]; exists {
return initFunc(registry, options) return initFunc(ctx, registry, options)
} }
} }

View file

@ -4,11 +4,12 @@ import (
"fmt" "fmt"
"github.com/docker/distribution" "github.com/docker/distribution"
"github.com/docker/distribution/context"
) )
// InitFunc is the type of a RepositoryMiddleware factory function and is // InitFunc is the type of a RepositoryMiddleware factory function and is
// used to register the constructor for different RepositoryMiddleware backends. // used to register the constructor for different RepositoryMiddleware backends.
type InitFunc func(repository distribution.Repository, options map[string]interface{}) (distribution.Repository, error) type InitFunc func(ctx context.Context, repository distribution.Repository, options map[string]interface{}) (distribution.Repository, error)
var middlewares map[string]InitFunc var middlewares map[string]InitFunc
@ -28,10 +29,10 @@ func Register(name string, initFunc InitFunc) error {
} }
// Get constructs a RepositoryMiddleware with the given options using the named backend. // Get constructs a RepositoryMiddleware with the given options using the named backend.
func Get(name string, options map[string]interface{}, repository distribution.Repository) (distribution.Repository, error) { func Get(ctx context.Context, name string, options map[string]interface{}, repository distribution.Repository) (distribution.Repository, error) {
if middlewares != nil { if middlewares != nil {
if initFunc, exists := middlewares[name]; exists { if initFunc, exists := middlewares[name]; exists {
return initFunc(repository, options) return initFunc(ctx, repository, options)
} }
} }

54
docs/proxy/proxyauth.go Normal file
View file

@ -0,0 +1,54 @@
package proxy
import (
"net/http"
"net/url"
"github.com/docker/distribution/registry/client/auth"
)
const tokenURL = "https://auth.docker.io/token"
type userpass struct {
username string
password string
}
type credentials struct {
creds map[string]userpass
}
func (c credentials) Basic(u *url.URL) (string, string) {
up := c.creds[u.String()]
return up.username, up.password
}
// ConfigureAuth authorizes with the upstream registry
func ConfigureAuth(remoteURL, username, password string, cm auth.ChallengeManager) (auth.CredentialStore, error) {
if err := ping(cm, remoteURL+"/v2/", "Docker-Distribution-Api-Version"); err != nil {
return nil, err
}
creds := map[string]userpass{
tokenURL: {
username: username,
password: password,
},
}
return credentials{creds: creds}, nil
}
func ping(manager auth.ChallengeManager, endpoint, versionHeader string) error {
resp, err := http.Get(endpoint)
if err != nil {
return err
}
defer resp.Body.Close()
if err := manager.AddResponse(resp); err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,214 @@
package proxy
import (
"io"
"net/http"
"strconv"
"sync"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/proxy/scheduler"
)
// todo(richardscothern): from cache control header or config file
const blobTTL = time.Duration(24 * 7 * time.Hour)
type proxyBlobStore struct {
localStore distribution.BlobStore
remoteStore distribution.BlobService
scheduler *scheduler.TTLExpirationScheduler
}
var _ distribution.BlobStore = proxyBlobStore{}
type inflightBlob struct {
refCount int
bw distribution.BlobWriter
}
// inflight tracks currently downloading blobs
var inflight = make(map[digest.Digest]*inflightBlob)
// mu protects inflight
var mu sync.Mutex
func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
w.Header().Set("Content-Type", mediaType)
w.Header().Set("Docker-Content-Digest", digest.String())
w.Header().Set("Etag", digest.String())
}
func (pbs proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
desc, err := pbs.localStore.Stat(ctx, dgst)
if err != nil && err != distribution.ErrBlobUnknown {
return err
}
if err == nil {
proxyMetrics.BlobPush(uint64(desc.Size))
return pbs.localStore.ServeBlob(ctx, w, r, dgst)
}
desc, err = pbs.remoteStore.Stat(ctx, dgst)
if err != nil {
return err
}
remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
if err != nil {
return err
}
bw, isNew, cleanup, err := getOrCreateBlobWriter(ctx, pbs.localStore, desc)
if err != nil {
return err
}
defer cleanup()
if isNew {
go func() {
err := streamToStorage(ctx, remoteReader, desc, bw)
if err != nil {
context.GetLogger(ctx).Error(err)
}
proxyMetrics.BlobPull(uint64(desc.Size))
}()
err := streamToClient(ctx, w, desc, bw)
if err != nil {
return err
}
proxyMetrics.BlobPush(uint64(desc.Size))
pbs.scheduler.AddBlob(dgst.String(), blobTTL)
return nil
}
err = streamToClient(ctx, w, desc, bw)
if err != nil {
return err
}
proxyMetrics.BlobPush(uint64(desc.Size))
return nil
}
type cleanupFunc func()
// getOrCreateBlobWriter will track which blobs are currently being downloaded and enable client requesting
// the same blob concurrently to read from the existing stream.
func getOrCreateBlobWriter(ctx context.Context, blobs distribution.BlobService, desc distribution.Descriptor) (distribution.BlobWriter, bool, cleanupFunc, error) {
mu.Lock()
defer mu.Unlock()
dgst := desc.Digest
cleanup := func() {
mu.Lock()
defer mu.Unlock()
inflight[dgst].refCount--
if inflight[dgst].refCount == 0 {
defer delete(inflight, dgst)
_, err := inflight[dgst].bw.Commit(ctx, desc)
if err != nil {
// There is a narrow race here where Commit can be called while this blob's TTL is expiring
// and its being removed from storage. In that case, the client stream will continue
// uninterruped and the blob will be pulled through on the next request, so just log it
context.GetLogger(ctx).Errorf("Error committing blob: %q", err)
}
}
}
var bw distribution.BlobWriter
_, ok := inflight[dgst]
if ok {
bw = inflight[dgst].bw
inflight[dgst].refCount++
return bw, false, cleanup, nil
}
var err error
bw, err = blobs.Create(ctx)
if err != nil {
return nil, false, nil, err
}
inflight[dgst] = &inflightBlob{refCount: 1, bw: bw}
return bw, true, cleanup, nil
}
func streamToStorage(ctx context.Context, remoteReader distribution.ReadSeekCloser, desc distribution.Descriptor, bw distribution.BlobWriter) error {
_, err := io.CopyN(bw, remoteReader, desc.Size)
if err != nil {
return err
}
return nil
}
func streamToClient(ctx context.Context, w http.ResponseWriter, desc distribution.Descriptor, bw distribution.BlobWriter) error {
setResponseHeaders(w, desc.Size, desc.MediaType, desc.Digest)
reader, err := bw.Reader()
if err != nil {
return err
}
defer reader.Close()
teeReader := io.TeeReader(reader, w)
buf := make([]byte, 32768, 32786)
var soFar int64
for {
rd, err := teeReader.Read(buf)
if err == nil || err == io.EOF {
soFar += int64(rd)
if soFar < desc.Size {
// buffer underflow, keep trying
continue
}
return nil
}
return err
}
}
func (pbs proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
desc, err := pbs.localStore.Stat(ctx, dgst)
if err == nil {
return desc, err
}
if err != distribution.ErrBlobUnknown {
return distribution.Descriptor{}, err
}
return pbs.remoteStore.Stat(ctx, dgst)
}
// Unsupported functions
func (pbs proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
return distribution.Descriptor{}, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
return distribution.ErrUnsupported
}

View file

@ -0,0 +1,231 @@
package proxy
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/proxy/scheduler"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache/memory"
"github.com/docker/distribution/registry/storage/driver/inmemory"
)
type statsBlobStore struct {
stats map[string]int
blobs distribution.BlobStore
}
func (sbs statsBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
sbs.stats["put"]++
return sbs.blobs.Put(ctx, mediaType, p)
}
func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
sbs.stats["get"]++
return sbs.blobs.Get(ctx, dgst)
}
func (sbs statsBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
sbs.stats["create"]++
return sbs.blobs.Create(ctx)
}
func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
sbs.stats["resume"]++
return sbs.blobs.Resume(ctx, id)
}
func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
sbs.stats["open"]++
return sbs.blobs.Open(ctx, dgst)
}
func (sbs statsBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
sbs.stats["serveblob"]++
return sbs.blobs.ServeBlob(ctx, w, r, dgst)
}
func (sbs statsBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
sbs.stats["stat"]++
return sbs.blobs.Stat(ctx, dgst)
}
func (sbs statsBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
sbs.stats["delete"]++
return sbs.blobs.Delete(ctx, dgst)
}
type testEnv struct {
inRemote []distribution.Descriptor
store proxyBlobStore
ctx context.Context
}
func (te testEnv) LocalStats() *map[string]int {
ls := te.store.localStore.(statsBlobStore).stats
return &ls
}
func (te testEnv) RemoteStats() *map[string]int {
rs := te.store.remoteStore.(statsBlobStore).stats
return &rs
}
// Populate remote store and record the digests
func makeTestEnv(t *testing.T, name string) testEnv {
ctx := context.Background()
localRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, true)
localRepo, err := localRegistry.Repository(ctx, name)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
truthRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, false, false)
truthRepo, err := truthRegistry.Repository(ctx, name)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
truthBlobs := statsBlobStore{
stats: make(map[string]int),
blobs: truthRepo.Blobs(ctx),
}
localBlobs := statsBlobStore{
stats: make(map[string]int),
blobs: localRepo.Blobs(ctx),
}
s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json")
proxyBlobStore := proxyBlobStore{
remoteStore: truthBlobs,
localStore: localBlobs,
scheduler: s,
}
te := testEnv{
store: proxyBlobStore,
ctx: ctx,
}
return te
}
func populate(t *testing.T, te *testEnv, blobCount int) {
var inRemote []distribution.Descriptor
for i := 0; i < blobCount; i++ {
bytes := []byte(fmt.Sprintf("blob%d", i))
desc, err := te.store.remoteStore.Put(te.ctx, "", bytes)
if err != nil {
t.Errorf("Put in store")
}
inRemote = append(inRemote, desc)
}
te.inRemote = inRemote
}
func TestProxyStoreStat(t *testing.T) {
te := makeTestEnv(t, "foo/bar")
remoteBlobCount := 1
populate(t, &te, remoteBlobCount)
localStats := te.LocalStats()
remoteStats := te.RemoteStats()
// Stat - touches both stores
for _, d := range te.inRemote {
_, err := te.store.Stat(te.ctx, d.Digest)
if err != nil {
t.Fatalf("Error stating proxy store")
}
}
if (*localStats)["stat"] != remoteBlobCount {
t.Errorf("Unexpected local stat count")
}
if (*remoteStats)["stat"] != remoteBlobCount {
t.Errorf("Unexpected remote stat count")
}
}
func TestProxyStoreServe(t *testing.T) {
te := makeTestEnv(t, "foo/bar")
remoteBlobCount := 1
populate(t, &te, remoteBlobCount)
localStats := te.LocalStats()
remoteStats := te.RemoteStats()
// Serveblob - pulls through blobs
for _, dr := range te.inRemote {
w := httptest.NewRecorder()
r, err := http.NewRequest("GET", "", nil)
if err != nil {
t.Fatal(err)
}
err = te.store.ServeBlob(te.ctx, w, r, dr.Digest)
if err != nil {
t.Fatalf(err.Error())
}
dl, err := digest.FromBytes(w.Body.Bytes())
if err != nil {
t.Fatalf("Error making digest from blob")
}
if dl != dr.Digest {
t.Errorf("Mismatching blob fetch from proxy")
}
}
if (*localStats)["stat"] != remoteBlobCount && (*localStats)["create"] != remoteBlobCount {
t.Fatalf("unexpected local stats")
}
if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount {
t.Fatalf("unexpected local stats")
}
// Serveblob - blobs come from local
for _, dr := range te.inRemote {
w := httptest.NewRecorder()
r, err := http.NewRequest("GET", "", nil)
if err != nil {
t.Fatal(err)
}
err = te.store.ServeBlob(te.ctx, w, r, dr.Digest)
if err != nil {
t.Fatalf(err.Error())
}
dl, err := digest.FromBytes(w.Body.Bytes())
if err != nil {
t.Fatalf("Error making digest from blob")
}
if dl != dr.Digest {
t.Errorf("Mismatching blob fetch from proxy")
}
}
// Stat to find local, but no new blobs were created
if (*localStats)["stat"] != remoteBlobCount*2 && (*localStats)["create"] != remoteBlobCount*2 {
t.Fatalf("unexpected local stats")
}
// Remote unchanged
if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount {
fmt.Printf("\tlocal=%#v, \n\tremote=%#v\n", localStats, remoteStats)
t.Fatalf("unexpected local stats")
}
}

View file

@ -0,0 +1,155 @@
package proxy
import (
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/client"
"github.com/docker/distribution/registry/proxy/scheduler"
)
// todo(richardscothern): from cache control header or config
const repositoryTTL = time.Duration(24 * 7 * time.Hour)
type proxyManifestStore struct {
ctx context.Context
localManifests distribution.ManifestService
remoteManifests distribution.ManifestService
repositoryName string
scheduler *scheduler.TTLExpirationScheduler
}
var _ distribution.ManifestService = &proxyManifestStore{}
func (pms proxyManifestStore) Exists(dgst digest.Digest) (bool, error) {
exists, err := pms.localManifests.Exists(dgst)
if err != nil {
return false, err
}
if exists {
return true, nil
}
return pms.remoteManifests.Exists(dgst)
}
func (pms proxyManifestStore) Get(dgst digest.Digest) (*manifest.SignedManifest, error) {
sm, err := pms.localManifests.Get(dgst)
if err == nil {
proxyMetrics.ManifestPush(uint64(len(sm.Raw)))
return sm, err
}
sm, err = pms.remoteManifests.Get(dgst)
if err != nil {
return nil, err
}
proxyMetrics.ManifestPull(uint64(len(sm.Raw)))
err = pms.localManifests.Put(sm)
if err != nil {
return nil, err
}
// Schedule the repo for removal
pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL)
// Ensure the manifest blob is cleaned up
pms.scheduler.AddBlob(dgst.String(), repositoryTTL)
proxyMetrics.ManifestPush(uint64(len(sm.Raw)))
return sm, err
}
func (pms proxyManifestStore) Tags() ([]string, error) {
return pms.localManifests.Tags()
}
func (pms proxyManifestStore) ExistsByTag(tag string) (bool, error) {
exists, err := pms.localManifests.ExistsByTag(tag)
if err != nil {
return false, err
}
if exists {
return true, nil
}
return pms.remoteManifests.ExistsByTag(tag)
}
func (pms proxyManifestStore) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*manifest.SignedManifest, error) {
var localDigest digest.Digest
localManifest, err := pms.localManifests.GetByTag(tag, options...)
switch err.(type) {
case distribution.ErrManifestUnknown, distribution.ErrManifestUnknownRevision:
goto fromremote
case nil:
break
default:
return nil, err
}
localDigest, err = manifestDigest(localManifest)
if err != nil {
return nil, err
}
fromremote:
var sm *manifest.SignedManifest
sm, err = pms.remoteManifests.GetByTag(tag, client.AddEtagToTag(tag, localDigest.String()))
if err != nil {
return nil, err
}
if sm == nil {
context.GetLogger(pms.ctx).Debugf("Local manifest for %q is latest, dgst=%s", tag, localDigest.String())
return localManifest, nil
}
context.GetLogger(pms.ctx).Debugf("Updated manifest for %q, dgst=%s", tag, localDigest.String())
err = pms.localManifests.Put(sm)
if err != nil {
return nil, err
}
dgst, err := manifestDigest(sm)
if err != nil {
return nil, err
}
pms.scheduler.AddBlob(dgst.String(), repositoryTTL)
pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL)
proxyMetrics.ManifestPull(uint64(len(sm.Raw)))
proxyMetrics.ManifestPush(uint64(len(sm.Raw)))
return sm, err
}
func manifestDigest(sm *manifest.SignedManifest) (digest.Digest, error) {
payload, err := sm.Payload()
if err != nil {
return "", err
}
dgst, err := digest.FromBytes(payload)
if err != nil {
return "", err
}
return dgst, nil
}
func (pms proxyManifestStore) Put(manifest *manifest.SignedManifest) error {
return v2.ErrorCodeUnsupported
}
func (pms proxyManifestStore) Delete(dgst digest.Digest) error {
return v2.ErrorCodeUnsupported
}

View file

@ -0,0 +1,235 @@
package proxy
import (
"io"
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/proxy/scheduler"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache/memory"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/docker/distribution/testutil"
"github.com/docker/libtrust"
)
type statsManifest struct {
manifests distribution.ManifestService
stats map[string]int
}
type manifestStoreTestEnv struct {
manifestDigest digest.Digest // digest of the signed manifest in the local storage
manifests proxyManifestStore
}
func (te manifestStoreTestEnv) LocalStats() *map[string]int {
ls := te.manifests.localManifests.(statsManifest).stats
return &ls
}
func (te manifestStoreTestEnv) RemoteStats() *map[string]int {
rs := te.manifests.remoteManifests.(statsManifest).stats
return &rs
}
func (sm statsManifest) Delete(dgst digest.Digest) error {
sm.stats["delete"]++
return sm.manifests.Delete(dgst)
}
func (sm statsManifest) Exists(dgst digest.Digest) (bool, error) {
sm.stats["exists"]++
return sm.manifests.Exists(dgst)
}
func (sm statsManifest) ExistsByTag(tag string) (bool, error) {
sm.stats["existbytag"]++
return sm.manifests.ExistsByTag(tag)
}
func (sm statsManifest) Get(dgst digest.Digest) (*manifest.SignedManifest, error) {
sm.stats["get"]++
return sm.manifests.Get(dgst)
}
func (sm statsManifest) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*manifest.SignedManifest, error) {
sm.stats["getbytag"]++
return sm.manifests.GetByTag(tag, options...)
}
func (sm statsManifest) Put(manifest *manifest.SignedManifest) error {
sm.stats["put"]++
return sm.manifests.Put(manifest)
}
func (sm statsManifest) Tags() ([]string, error) {
sm.stats["tags"]++
return sm.manifests.Tags()
}
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
ctx := context.Background()
truthRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, false, false)
truthRepo, err := truthRegistry.Repository(ctx, name)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
tr, err := truthRepo.Manifests(ctx)
if err != nil {
t.Fatal(err.Error())
}
truthManifests := statsManifest{
manifests: tr,
stats: make(map[string]int),
}
manifestDigest, err := populateRepo(t, ctx, truthRepo, name, tag)
if err != nil {
t.Fatalf(err.Error())
}
localRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, true)
localRepo, err := localRegistry.Repository(ctx, name)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
lr, err := localRepo.Manifests(ctx)
if err != nil {
t.Fatal(err.Error())
}
localManifests := statsManifest{
manifests: lr,
stats: make(map[string]int),
}
s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json")
return &manifestStoreTestEnv{
manifestDigest: manifestDigest,
manifests: proxyManifestStore{
ctx: ctx,
localManifests: localManifests,
remoteManifests: truthManifests,
scheduler: s,
},
}
}
func populateRepo(t *testing.T, ctx context.Context, repository distribution.Repository, name, tag string) (digest.Digest, error) {
m := manifest.Manifest{
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
Name: name,
Tag: tag,
}
for i := 0; i < 2; i++ {
wr, err := repository.Blobs(ctx).Create(ctx)
if err != nil {
t.Fatalf("unexpected error creating test upload: %v", err)
}
rs, ts, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("unexpected error generating test layer file")
}
dgst := digest.Digest(ts)
if _, err := io.Copy(wr, rs); err != nil {
t.Fatalf("unexpected error copying to upload: %v", err)
}
if _, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}); err != nil {
t.Fatalf("unexpected error finishing upload: %v", err)
}
}
pk, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
t.Fatalf("unexpected error generating private key: %v", err)
}
sm, err := manifest.Sign(&m, pk)
if err != nil {
t.Fatalf("error signing manifest: %v", err)
}
ms, err := repository.Manifests(ctx)
if err != nil {
t.Fatalf(err.Error())
}
ms.Put(sm)
if err != nil {
t.Fatalf("unexpected errors putting manifest: %v", err)
}
pl, err := sm.Payload()
if err != nil {
t.Fatal(err)
}
return digest.FromBytes(pl)
}
// TestProxyManifests contains basic acceptance tests
// for the pull-through behavior
func TestProxyManifests(t *testing.T) {
name := "foo/bar"
env := newManifestStoreTestEnv(t, name, "latest")
localStats := env.LocalStats()
remoteStats := env.RemoteStats()
// Stat - must check local and remote
exists, err := env.manifests.ExistsByTag("latest")
if err != nil {
t.Fatalf("Error checking existance")
}
if !exists {
t.Errorf("Unexpected non-existant manifest")
}
if (*localStats)["existbytag"] != 1 && (*remoteStats)["existbytag"] != 1 {
t.Errorf("Unexpected exists count")
}
// Get - should succeed and pull manifest into local
_, err = env.manifests.Get(env.manifestDigest)
if err != nil {
t.Fatal(err)
}
if (*localStats)["get"] != 1 && (*remoteStats)["get"] != 1 {
t.Errorf("Unexpected get count")
}
if (*localStats)["put"] != 1 {
t.Errorf("Expected local put")
}
// Stat - should only go to local
exists, err = env.manifests.ExistsByTag("latest")
if err != nil {
t.Fatal(err)
}
if !exists {
t.Errorf("Unexpected non-existant manifest")
}
if (*localStats)["existbytag"] != 2 && (*remoteStats)["existbytag"] != 1 {
t.Errorf("Unexpected exists count")
}
// Get - should get from remote, to test freshness
_, err = env.manifests.Get(env.manifestDigest)
if err != nil {
t.Fatal(err)
}
if (*remoteStats)["get"] != 2 && (*remoteStats)["existsbytag"] != 1 && (*localStats)["put"] != 1 {
t.Errorf("Unexpected get count")
}
}

View file

@ -0,0 +1,74 @@
package proxy
import (
"expvar"
"sync/atomic"
)
// Metrics is used to hold metric counters
// related to the proxy
type Metrics struct {
Requests uint64
Hits uint64
Misses uint64
BytesPulled uint64
BytesPushed uint64
}
type proxyMetricsCollector struct {
blobMetrics Metrics
manifestMetrics Metrics
}
// BlobPull tracks metrics about blobs pulled into the cache
func (pmc *proxyMetricsCollector) BlobPull(bytesPulled uint64) {
atomic.AddUint64(&pmc.blobMetrics.Misses, 1)
atomic.AddUint64(&pmc.blobMetrics.BytesPulled, bytesPulled)
}
// BlobPush tracks metrics about blobs pushed to clients
func (pmc *proxyMetricsCollector) BlobPush(bytesPushed uint64) {
atomic.AddUint64(&pmc.blobMetrics.Requests, 1)
atomic.AddUint64(&pmc.blobMetrics.Hits, 1)
atomic.AddUint64(&pmc.blobMetrics.BytesPushed, bytesPushed)
}
// ManifestPull tracks metrics related to Manifests pulled into the cache
func (pmc *proxyMetricsCollector) ManifestPull(bytesPulled uint64) {
atomic.AddUint64(&pmc.manifestMetrics.Misses, 1)
atomic.AddUint64(&pmc.manifestMetrics.BytesPulled, bytesPulled)
}
// ManifestPush tracks metrics about manifests pushed to clients
func (pmc *proxyMetricsCollector) ManifestPush(bytesPushed uint64) {
atomic.AddUint64(&pmc.manifestMetrics.Requests, 1)
atomic.AddUint64(&pmc.manifestMetrics.Hits, 1)
atomic.AddUint64(&pmc.manifestMetrics.BytesPushed, bytesPushed)
}
// proxyMetrics tracks metrics about the proxy cache. This is
// kept globally and made available via expvar.
var proxyMetrics = &proxyMetricsCollector{}
func init() {
registry := expvar.Get("registry")
if registry == nil {
registry = expvar.NewMap("registry")
}
pm := registry.(*expvar.Map).Get("proxy")
if pm == nil {
pm = &expvar.Map{}
pm.(*expvar.Map).Init()
registry.(*expvar.Map).Set("proxy", pm)
}
pm.(*expvar.Map).Set("blobs", expvar.Func(func() interface{} {
return proxyMetrics.blobMetrics
}))
pm.(*expvar.Map).Set("manifests", expvar.Func(func() interface{} {
return proxyMetrics.manifestMetrics
}))
}

139
docs/proxy/proxyregistry.go Normal file
View file

@ -0,0 +1,139 @@
package proxy
import (
"net/http"
"net/url"
"github.com/docker/distribution"
"github.com/docker/distribution/configuration"
"github.com/docker/distribution/context"
"github.com/docker/distribution/registry/client"
"github.com/docker/distribution/registry/client/auth"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/distribution/registry/proxy/scheduler"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/driver"
)
// proxyingRegistry fetches content from a remote registry and caches it locally
type proxyingRegistry struct {
embedded distribution.Namespace // provides local registry functionality
scheduler *scheduler.TTLExpirationScheduler
remoteURL string
credentialStore auth.CredentialStore
challengeManager auth.ChallengeManager
}
// NewRegistryPullThroughCache creates a registry acting as a pull through cache
func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) {
_, err := url.Parse(config.RemoteURL)
if err != nil {
return nil, err
}
v := storage.NewVacuum(ctx, driver)
s := scheduler.New(ctx, driver, "/scheduler-state.json")
s.OnBlobExpire(func(digest string) error {
return v.RemoveBlob(digest)
})
s.OnManifestExpire(func(repoName string) error {
return v.RemoveRepository(repoName)
})
err = s.Start()
if err != nil {
return nil, err
}
challengeManager := auth.NewSimpleChallengeManager()
cs, err := ConfigureAuth(config.RemoteURL, config.Username, config.Password, challengeManager)
if err != nil {
return nil, err
}
return &proxyingRegistry{
embedded: registry,
scheduler: s,
challengeManager: challengeManager,
credentialStore: cs,
remoteURL: config.RemoteURL,
}, nil
}
func (pr *proxyingRegistry) Scope() distribution.Scope {
return distribution.GlobalScope
}
func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
return pr.embedded.Repositories(ctx, repos, last)
}
func (pr *proxyingRegistry) Repository(ctx context.Context, name string) (distribution.Repository, error) {
tr := transport.NewTransport(http.DefaultTransport,
auth.NewAuthorizer(pr.challengeManager, auth.NewTokenHandler(http.DefaultTransport, pr.credentialStore, name, "pull")))
localRepo, err := pr.embedded.Repository(ctx, name)
if err != nil {
return nil, err
}
localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification)
if err != nil {
return nil, err
}
remoteRepo, err := client.NewRepository(ctx, name, pr.remoteURL, tr)
if err != nil {
return nil, err
}
remoteManifests, err := remoteRepo.Manifests(ctx)
if err != nil {
return nil, err
}
return &proxiedRepository{
blobStore: proxyBlobStore{
localStore: localRepo.Blobs(ctx),
remoteStore: remoteRepo.Blobs(ctx),
scheduler: pr.scheduler,
},
manifests: proxyManifestStore{
repositoryName: name,
localManifests: localManifests, // Options?
remoteManifests: remoteManifests,
ctx: ctx,
scheduler: pr.scheduler,
},
name: name,
signatures: localRepo.Signatures(),
}, nil
}
// proxiedRepository uses proxying blob and manifest services to serve content
// locally, or pulling it through from a remote and caching it locally if it doesn't
// already exist
type proxiedRepository struct {
blobStore distribution.BlobStore
manifests distribution.ManifestService
name string
signatures distribution.SignatureService
}
func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
// options
return pr.manifests, nil
}
func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore {
return pr.blobStore
}
func (pr *proxiedRepository) Name() string {
return pr.name
}
func (pr *proxiedRepository) Signatures() distribution.SignatureService {
return pr.signatures
}

View file

@ -0,0 +1,250 @@
package scheduler
import (
"encoding/json"
"fmt"
"time"
"github.com/docker/distribution/context"
"github.com/docker/distribution/registry/storage/driver"
)
// onTTLExpiryFunc is called when a repositories' TTL expires
type expiryFunc func(string) error
const (
entryTypeBlob = iota
entryTypeManifest
)
// schedulerEntry represents an entry in the scheduler
// fields are exported for serialization
type schedulerEntry struct {
Key string `json:"Key"`
Expiry time.Time `json:"ExpiryData"`
EntryType int `json:"EntryType"`
}
// New returns a new instance of the scheduler
func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler {
return &TTLExpirationScheduler{
entries: make(map[string]schedulerEntry),
addChan: make(chan schedulerEntry),
stopChan: make(chan bool),
driver: driver,
pathToStateFile: path,
ctx: ctx,
stopped: true,
}
}
// TTLExpirationScheduler is a scheduler used to perform actions
// when TTLs expire
type TTLExpirationScheduler struct {
entries map[string]schedulerEntry
addChan chan schedulerEntry
stopChan chan bool
driver driver.StorageDriver
ctx context.Context
pathToStateFile string
stopped bool
onBlobExpire expiryFunc
onManifestExpire expiryFunc
}
// addChan allows more TTLs to be pushed to the scheduler
type addChan chan schedulerEntry
// stopChan allows the scheduler to be stopped - used for testing.
type stopChan chan bool
// OnBlobExpire is called when a scheduled blob's TTL expires
func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) {
ttles.onBlobExpire = f
}
// OnManifestExpire is called when a scheduled manifest's TTL expires
func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
ttles.onManifestExpire = f
}
// AddBlob schedules a blob cleanup after ttl expires
func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error {
if ttles.stopped {
return fmt.Errorf("scheduler not started")
}
ttles.add(dgst, ttl, entryTypeBlob)
return nil
}
// AddManifest schedules a manifest cleanup after ttl expires
func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error {
if ttles.stopped {
return fmt.Errorf("scheduler not started")
}
ttles.add(repoName, ttl, entryTypeManifest)
return nil
}
// Start starts the scheduler
func (ttles *TTLExpirationScheduler) Start() error {
return ttles.start()
}
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
entry := schedulerEntry{
Key: key,
Expiry: time.Now().Add(ttl),
EntryType: eType,
}
ttles.addChan <- entry
}
func (ttles *TTLExpirationScheduler) stop() {
ttles.stopChan <- true
}
func (ttles *TTLExpirationScheduler) start() error {
err := ttles.readState()
if err != nil {
return err
}
if !ttles.stopped {
return fmt.Errorf("Scheduler already started")
}
context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...")
ttles.stopped = false
go ttles.mainloop()
return nil
}
// mainloop uses a select statement to listen for events. Most of its time
// is spent in waiting on a TTL to expire but can be interrupted when TTLs
// are added.
func (ttles *TTLExpirationScheduler) mainloop() {
for {
if ttles.stopped {
return
}
nextEntry, ttl := nextExpiringEntry(ttles.entries)
if len(ttles.entries) == 0 {
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Nothing to do, sleeping...")
} else {
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Sleeping for %s until cleanup of %s", ttl, nextEntry.Key)
}
select {
case <-time.After(ttl):
var f expiryFunc
switch nextEntry.EntryType {
case entryTypeBlob:
f = ttles.onBlobExpire
case entryTypeManifest:
f = ttles.onManifestExpire
default:
f = func(repoName string) error {
return fmt.Errorf("Unexpected scheduler entry type")
}
}
if err := f(nextEntry.Key); err != nil {
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", nextEntry.Key, err)
}
delete(ttles.entries, nextEntry.Key)
if err := ttles.writeState(); err != nil {
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
}
case entry := <-ttles.addChan:
context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
ttles.entries[entry.Key] = entry
if err := ttles.writeState(); err != nil {
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
}
break
case <-ttles.stopChan:
if err := ttles.writeState(); err != nil {
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
}
ttles.stopped = true
}
}
}
func nextExpiringEntry(entries map[string]schedulerEntry) (*schedulerEntry, time.Duration) {
if len(entries) == 0 {
return nil, 24 * time.Hour
}
// todo:(richardscothern) this is a primitive o(n) algorithm
// but n will never be *that* big and it's all in memory. Investigate
// time.AfterFunc for heap based expiries
first := true
var nextEntry schedulerEntry
for _, entry := range entries {
if first {
nextEntry = entry
first = false
continue
}
if entry.Expiry.Before(nextEntry.Expiry) {
nextEntry = entry
}
}
// Dates may be from the past if the scheduler has
// been restarted, set their ttl to 0
if nextEntry.Expiry.Before(time.Now()) {
nextEntry.Expiry = time.Now()
return &nextEntry, 0
}
return &nextEntry, nextEntry.Expiry.Sub(time.Now())
}
func (ttles *TTLExpirationScheduler) writeState() error {
jsonBytes, err := json.Marshal(ttles.entries)
if err != nil {
return err
}
err = ttles.driver.PutContent(ttles.ctx, ttles.pathToStateFile, jsonBytes)
if err != nil {
return err
}
return nil
}
func (ttles *TTLExpirationScheduler) readState() error {
if _, err := ttles.driver.Stat(ttles.ctx, ttles.pathToStateFile); err != nil {
switch err := err.(type) {
case driver.PathNotFoundError:
return nil
default:
return err
}
}
bytes, err := ttles.driver.GetContent(ttles.ctx, ttles.pathToStateFile)
if err != nil {
return err
}
err = json.Unmarshal(bytes, &ttles.entries)
if err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,165 @@
package scheduler
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/docker/distribution/context"
"github.com/docker/distribution/registry/storage/driver/inmemory"
)
func TestSchedule(t *testing.T) {
timeUnit := time.Millisecond
remainingRepos := map[string]bool{
"testBlob1": true,
"testBlob2": true,
"ch00": true,
}
s := New(context.Background(), inmemory.New(), "/ttl")
deleteFunc := func(repoName string) error {
if len(remainingRepos) == 0 {
t.Fatalf("Incorrect expiry count")
}
_, ok := remainingRepos[repoName]
if !ok {
t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
}
fmt.Println("removing", repoName)
delete(remainingRepos, repoName)
return nil
}
s.onBlobExpire = deleteFunc
err := s.start()
if err != nil {
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
}
s.add("testBlob1", 3*timeUnit, entryTypeBlob)
s.add("testBlob2", 1*timeUnit, entryTypeBlob)
func() {
s.add("ch00", 1*timeUnit, entryTypeBlob)
}()
// Ensure all repos are deleted
<-time.After(50 * timeUnit)
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
}
}
func TestRestoreOld(t *testing.T) {
remainingRepos := map[string]bool{
"testBlob1": true,
"oldRepo": true,
}
deleteFunc := func(repoName string) error {
if repoName == "oldRepo" && len(remainingRepos) == 3 {
t.Errorf("oldRepo should be removed first")
}
_, ok := remainingRepos[repoName]
if !ok {
t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
}
delete(remainingRepos, repoName)
return nil
}
timeUnit := time.Millisecond
serialized, err := json.Marshal(&map[string]schedulerEntry{
"testBlob1": {
Expiry: time.Now().Add(1 * timeUnit),
Key: "testBlob1",
EntryType: 0,
},
"oldRepo": {
Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first
Key: "oldRepo",
EntryType: 0,
},
})
if err != nil {
t.Fatalf("Error serializing test data: %s", err.Error())
}
ctx := context.Background()
pathToStatFile := "/ttl"
fs := inmemory.New()
err = fs.PutContent(ctx, pathToStatFile, serialized)
if err != nil {
t.Fatal("Unable to write serialized data to fs")
}
s := New(context.Background(), fs, "/ttl")
s.onBlobExpire = deleteFunc
err = s.start()
if err != nil {
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
}
<-time.After(50 * timeUnit)
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
}
}
func TestStopRestore(t *testing.T) {
timeUnit := time.Millisecond
remainingRepos := map[string]bool{
"testBlob1": true,
"testBlob2": true,
}
deleteFunc := func(repoName string) error {
delete(remainingRepos, repoName)
return nil
}
fs := inmemory.New()
pathToStateFile := "/ttl"
s := New(context.Background(), fs, pathToStateFile)
s.onBlobExpire = deleteFunc
err := s.start()
if err != nil {
t.Fatalf(err.Error())
}
s.add("testBlob1", 300*timeUnit, entryTypeBlob)
s.add("testBlob2", 100*timeUnit, entryTypeBlob)
// Start and stop before all operations complete
// state will be written to fs
s.stop()
time.Sleep(10 * time.Millisecond)
// v2 will restore state from fs
s2 := New(context.Background(), fs, pathToStateFile)
s2.onBlobExpire = deleteFunc
err = s2.start()
if err != nil {
t.Fatalf("Error starting v2: %s", err.Error())
}
<-time.After(500 * timeUnit)
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
}
}
func TestDoubleStart(t *testing.T) {
s := New(context.Background(), inmemory.New(), "/ttl")
err := s.start()
if err != nil {
t.Fatalf("Unable to start scheduler")
}
fmt.Printf("%#v", s)
err = s.start()
if err == nil {
t.Fatalf("Scheduler started twice without error")
}
}

View file

@ -33,7 +33,7 @@ func TestSimpleBlobUpload(t *testing.T) {
ctx := context.Background() ctx := context.Background()
imageName := "foo/bar" imageName := "foo/bar"
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false)
repository, err := registry.Repository(ctx, imageName) repository, err := registry.Repository(ctx, imageName)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
@ -193,7 +193,7 @@ func TestSimpleBlobUpload(t *testing.T) {
} }
// Reuse state to test delete with a delete-disabled registry // Reuse state to test delete with a delete-disabled registry
registry = NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true) registry = NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false)
repository, err = registry.Repository(ctx, imageName) repository, err = registry.Repository(ctx, imageName)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
@ -212,7 +212,7 @@ func TestSimpleBlobRead(t *testing.T) {
ctx := context.Background() ctx := context.Background()
imageName := "foo/bar" imageName := "foo/bar"
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false)
repository, err := registry.Repository(ctx, imageName) repository, err := registry.Repository(ctx, imageName)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
@ -316,7 +316,7 @@ func TestLayerUploadZeroLength(t *testing.T) {
ctx := context.Background() ctx := context.Background()
imageName := "foo/bar" imageName := "foo/bar"
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false)
repository, err := registry.Repository(ctx, imageName) repository, err := registry.Repository(ctx, imageName)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)

View file

@ -31,6 +31,8 @@ type blobWriter struct {
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy // implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
// LayerUpload Interface // LayerUpload Interface
bufferedFileWriter bufferedFileWriter
resumableDigestEnabled bool
} }
var _ distribution.BlobWriter = &blobWriter{} var _ distribution.BlobWriter = &blobWriter{}
@ -349,3 +351,29 @@ func (bw *blobWriter) removeResources(ctx context.Context) error {
return nil return nil
} }
func (bw *blobWriter) Reader() (io.ReadCloser, error) {
// todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4
try := 1
for try <= 5 {
_, err := bw.bufferedFileWriter.driver.Stat(bw.ctx, bw.path)
if err == nil {
break
}
switch err.(type) {
case storagedriver.PathNotFoundError:
context.GetLogger(bw.ctx).Debugf("Nothing found on try %d, sleeping...", try)
time.Sleep(1 * time.Second)
try++
default:
return nil, err
}
}
readCloser, err := bw.bufferedFileWriter.driver.ReadStream(bw.ctx, bw.path, 0)
if err != nil {
return nil, err
}
return readCloser, nil
}

View file

@ -24,6 +24,10 @@ import (
// offset. Any unhashed bytes remaining less than the given offset are hashed // offset. Any unhashed bytes remaining less than the given offset are hashed
// from the content uploaded so far. // from the content uploaded so far.
func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error { func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error {
if !bw.resumableDigestEnabled {
return errResumableDigestNotAvailable
}
if offset < 0 { if offset < 0 {
return fmt.Errorf("cannot resume hash at negative offset: %d", offset) return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
} }
@ -143,6 +147,10 @@ func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry
} }
func (bw *blobWriter) storeHashState(ctx context.Context) error { func (bw *blobWriter) storeHashState(ctx context.Context) error {
if !bw.resumableDigestEnabled {
return errResumableDigestNotAvailable
}
h, ok := bw.digester.Hash().(resumable.Hash) h, ok := bw.digester.Hash().(resumable.Hash)
if !ok { if !ok {
return errResumableDigestNotAvailable return errResumableDigestNotAvailable

View file

@ -22,7 +22,7 @@ func setupFS(t *testing.T) *setupEnv {
d := inmemory.New() d := inmemory.New()
c := []byte("") c := []byte("")
ctx := context.Background() ctx := context.Background()
registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true) registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false)
rootpath, _ := defaultPathMapper.path(repositoriesRootPathSpec{}) rootpath, _ := defaultPathMapper.path(repositoriesRootPathSpec{})
repos := []string{ repos := []string{

View file

@ -21,6 +21,7 @@ type linkedBlobStore struct {
repository distribution.Repository repository distribution.Repository
ctx context.Context // only to be used where context can't come through method args ctx context.Context // only to be used where context can't come through method args
deleteEnabled bool deleteEnabled bool
resumableDigestEnabled bool
// linkPath allows one to control the repository blob link set to which // linkPath allows one to control the repository blob link set to which
// the blob store dispatches. This is required because manifest and layer // the blob store dispatches. This is required because manifest and layer
@ -194,6 +195,7 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
startedAt: startedAt, startedAt: startedAt,
digester: digest.Canonical.New(), digester: digest.Canonical.New(),
bufferedFileWriter: *fw, bufferedFileWriter: *fw,
resumableDigestEnabled: lbs.resumableDigestEnabled,
} }
return bw, nil return bw, nil

View file

@ -29,7 +29,7 @@ type manifestStoreTestEnv struct {
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv { func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
ctx := context.Background() ctx := context.Background()
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false)
repo, err := registry.Repository(ctx, name) repo, err := registry.Repository(ctx, name)
if err != nil { if err != nil {
@ -348,7 +348,7 @@ func TestManifestStorage(t *testing.T) {
t.Errorf("Deleted manifest get returned non-nil") t.Errorf("Deleted manifest get returned non-nil")
} }
r := NewRegistryWithDriver(ctx, env.driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true) r := NewRegistryWithDriver(ctx, env.driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false)
repo, err := r.Repository(ctx, env.name) repo, err := r.Repository(ctx, env.name)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)

View file

@ -16,6 +16,7 @@ type registry struct {
statter distribution.BlobStatter // global statter service. statter distribution.BlobStatter // global statter service.
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
deleteEnabled bool deleteEnabled bool
resumableDigestEnabled bool
} }
// NewRegistryWithDriver creates a new registry instance from the provided // NewRegistryWithDriver creates a new registry instance from the provided
@ -23,9 +24,9 @@ type registry struct {
// cheap to allocate. If redirect is true, the backend blob server will // cheap to allocate. If redirect is true, the backend blob server will
// attempt to use (StorageDriver).URLFor to serve all blobs. // attempt to use (StorageDriver).URLFor to serve all blobs.
// //
// TODO(stevvooe): This function signature is getting out of hand. Move to // TODO(stevvooe): This function signature is getting very out of hand. Move to
// functional options for instance configuration. // functional options for instance configuration.
func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider, deleteEnabled bool, redirect bool) distribution.Namespace { func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider, deleteEnabled bool, redirect bool, isCache bool) distribution.Namespace {
// create global statter, with cache. // create global statter, with cache.
var statter distribution.BlobDescriptorService = &blobStatter{ var statter distribution.BlobDescriptorService = &blobStatter{
driver: driver, driver: driver,
@ -52,6 +53,7 @@ func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriv
}, },
blobDescriptorCacheProvider: blobDescriptorCacheProvider, blobDescriptorCacheProvider: blobDescriptorCacheProvider,
deleteEnabled: deleteEnabled, deleteEnabled: deleteEnabled,
resumableDigestEnabled: !isCache,
} }
} }

67
docs/storage/vacuum.go Normal file
View file

@ -0,0 +1,67 @@
package storage
import (
"path"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/storage/driver"
)
// vacuum contains functions for cleaning up repositories and blobs
// These functions will only reliably work on strongly consistent
// storage systems.
// https://en.wikipedia.org/wiki/Consistency_model
// NewVacuum creates a new Vacuum
func NewVacuum(ctx context.Context, driver driver.StorageDriver) Vacuum {
return Vacuum{
ctx: ctx,
driver: driver,
pm: defaultPathMapper,
}
}
// Vacuum removes content from the filesystem
type Vacuum struct {
pm *pathMapper
driver driver.StorageDriver
ctx context.Context
}
// RemoveBlob removes a blob from the filesystem
func (v Vacuum) RemoveBlob(dgst string) error {
d, err := digest.ParseDigest(dgst)
if err != nil {
return err
}
blobPath, err := v.pm.path(blobDataPathSpec{digest: d})
if err != nil {
return err
}
context.GetLogger(v.ctx).Infof("Deleting blob: %s", blobPath)
err = v.driver.Delete(v.ctx, blobPath)
if err != nil {
return err
}
return nil
}
// RemoveRepository removes a repository directory from the
// filesystem
func (v Vacuum) RemoveRepository(repoName string) error {
rootForRepository, err := v.pm.path(repositoriesRootPathSpec{})
if err != nil {
return err
}
repoDir := path.Join(rootForRepository, repoName)
context.GetLogger(v.ctx).Infof("Deleting repo: %s", repoDir)
err = v.driver.Delete(v.ctx, repoDir)
if err != nil {
return err
}
return nil
}