package proxy import ( "fmt" "net/http" "net/url" "sync" "github.com/docker/distribution" "github.com/docker/distribution/configuration" "github.com/docker/distribution/context" "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/client" "github.com/docker/distribution/registry/client/auth" "github.com/docker/distribution/registry/client/transport" "github.com/docker/distribution/registry/proxy/scheduler" "github.com/docker/distribution/registry/storage" "github.com/docker/distribution/registry/storage/driver" ) // proxyingRegistry fetches content from a remote registry and caches it locally type proxyingRegistry struct { embedded distribution.Namespace // provides local registry functionality scheduler *scheduler.TTLExpirationScheduler remoteURL string authChallenger authChallenger } // NewRegistryPullThroughCache creates a registry acting as a pull through cache func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) { _, err := url.Parse(config.RemoteURL) if err != nil { return nil, err } v := storage.NewVacuum(ctx, driver) s := scheduler.New(ctx, driver, "/scheduler-state.json") s.OnBlobExpire(func(ref reference.Reference) error { var r reference.Canonical var ok bool if r, ok = ref.(reference.Canonical); !ok { return fmt.Errorf("unexpected reference type : %T", ref) } repo, err := registry.Repository(ctx, r) if err != nil { return err } blobs := repo.Blobs(ctx) // Clear the repository reference and descriptor caches err = blobs.Delete(ctx, r.Digest()) if err != nil { return err } err = v.RemoveBlob(r.Digest().String()) if err != nil { return err } return nil }) s.OnManifestExpire(func(ref reference.Reference) error { var r reference.Canonical var ok bool if r, ok = ref.(reference.Canonical); !ok { return fmt.Errorf("unexpected reference type : %T", ref) } repo, err := registry.Repository(ctx, r) if err != nil { return err } manifests, err := repo.Manifests(ctx) if err != nil { return err } err = manifests.Delete(ctx, r.Digest()) if err != nil { return err } return nil }) err = s.Start() if err != nil { return nil, err } cs, err := configureAuth(config.Username, config.Password) if err != nil { return nil, err } return &proxyingRegistry{ embedded: registry, scheduler: s, remoteURL: config.RemoteURL, authChallenger: &remoteAuthChallenger{ remoteURL: config.RemoteURL, cm: auth.NewSimpleChallengeManager(), cs: cs, }, }, nil } func (pr *proxyingRegistry) Scope() distribution.Scope { return distribution.GlobalScope } func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) { return pr.embedded.Repositories(ctx, repos, last) } func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named) (distribution.Repository, error) { c := pr.authChallenger tr := transport.NewTransport(http.DefaultTransport, auth.NewAuthorizer(c.challengeManager(), auth.NewTokenHandler(http.DefaultTransport, c.credentialStore(), name.Name(), "pull"))) localRepo, err := pr.embedded.Repository(ctx, name) if err != nil { return nil, err } localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification()) if err != nil { return nil, err } remoteRepo, err := client.NewRepository(ctx, name, pr.remoteURL, tr) if err != nil { return nil, err } remoteManifests, err := remoteRepo.Manifests(ctx) if err != nil { return nil, err } return &proxiedRepository{ blobStore: &proxyBlobStore{ localStore: localRepo.Blobs(ctx), remoteStore: remoteRepo.Blobs(ctx), scheduler: pr.scheduler, repositoryName: name, authChallenger: pr.authChallenger, }, manifests: &proxyManifestStore{ repositoryName: name, localManifests: localManifests, // Options? remoteManifests: remoteManifests, ctx: ctx, scheduler: pr.scheduler, authChallenger: pr.authChallenger, }, name: name, tags: &proxyTagService{ localTags: localRepo.Tags(ctx), remoteTags: remoteRepo.Tags(ctx), authChallenger: pr.authChallenger, }, }, nil } // authChallenger encapsulates a request to the upstream to establish credential challenges type authChallenger interface { tryEstablishChallenges(context.Context) error challengeManager() auth.ChallengeManager credentialStore() auth.CredentialStore } type remoteAuthChallenger struct { remoteURL string sync.Mutex cm auth.ChallengeManager cs auth.CredentialStore } func (r *remoteAuthChallenger) credentialStore() auth.CredentialStore { return r.cs } func (r *remoteAuthChallenger) challengeManager() auth.ChallengeManager { return r.cm } // tryEstablishChallenges will attempt to get a challenge type for the upstream if none currently exist func (r *remoteAuthChallenger) tryEstablishChallenges(ctx context.Context) error { r.Lock() defer r.Unlock() remoteURL := r.remoteURL + "/v2/" challenges, err := r.cm.GetChallenges(remoteURL) if err != nil { return err } if len(challenges) > 0 { return nil } // establish challenge type with upstream if err := ping(r.cm, remoteURL, challengeHeader); err != nil { return err } context.GetLogger(ctx).Infof("Challenge established with upstream : %s %s", remoteURL, r.cm) return nil } // proxiedRepository uses proxying blob and manifest services to serve content // locally, or pulling it through from a remote and caching it locally if it doesn't // already exist type proxiedRepository struct { blobStore distribution.BlobStore manifests distribution.ManifestService name reference.Named tags distribution.TagService } func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { return pr.manifests, nil } func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore { return pr.blobStore } func (pr *proxiedRepository) Named() reference.Named { return pr.name } func (pr *proxiedRepository) Tags(ctx context.Context) distribution.TagService { return pr.tags }