distribution/registry/proxy/proxyregistry.go
Sebastiaan van Stijn 3dda067747
deprecate reference package, migrate to github.com/distribution/reference
This integrates the new module, which was extracted from this repository
at commit b9b19409cf458dcb9e1253ff44ba75bd0620faa6;

    # install filter-repo (https://github.com/newren/git-filter-repo/blob/main/INSTALL.md)
    brew install git-filter-repo

    # create a temporary clone of docker
    cd ~/Projects
    git clone https://github.com/distribution/distribution.git reference
    cd reference

    # commit taken from
    git rev-parse --verify HEAD
    b9b19409cf

    # remove all code, except for general files, 'reference/', and rename to /
    git filter-repo \
      --path .github/workflows/codeql-analysis.yml \
      --path .github/workflows/fossa.yml \
      --path .golangci.yml \
      --path distribution-logo.svg \
      --path CODE-OF-CONDUCT.md \
      --path CONTRIBUTING.md \
      --path GOVERNANCE.md \
      --path README.md \
      --path LICENSE \
      --path MAINTAINERS \
      --path-glob 'reference/*.*' \
      --path-rename reference/:

    # initialize go.mod
    go mod init github.com/distribution/reference
    go mod tidy -go=1.20

This commit is based on 152af63ec5 in the main branch,
but adjusted for the 2.8 branch, with some differences:

- the Sort functions have not been kept, as they were not part of the v2 package,
  and introduced in 1052518d9f
- the ParseAnyReferenceWithSet and ShortIdentifierRegexp were kept (but deprecated)
  as removing happened in 6d4f62d7fd, which is not
  in the 2.8 branch.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2023-09-22 13:25:01 +02:00

263 lines
6.7 KiB
Go

package proxy
import (
"context"
"fmt"
"net/http"
"net/url"
"sync"
"github.com/distribution/reference"
"github.com/docker/distribution"
"github.com/docker/distribution/configuration"
dcontext "github.com/docker/distribution/context"
"github.com/docker/distribution/registry/client"
"github.com/docker/distribution/registry/client/auth"
"github.com/docker/distribution/registry/client/auth/challenge"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/distribution/registry/proxy/scheduler"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/driver"
)
// proxyingRegistry fetches content from a remote registry and caches it locally
type proxyingRegistry struct {
embedded distribution.Namespace // provides local registry functionality
scheduler *scheduler.TTLExpirationScheduler
remoteURL url.URL
authChallenger authChallenger
}
// NewRegistryPullThroughCache creates a registry acting as a pull through cache
func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) {
remoteURL, err := url.Parse(config.RemoteURL)
if err != nil {
return nil, err
}
v := storage.NewVacuum(ctx, driver)
s := scheduler.New(ctx, driver, "/scheduler-state.json")
s.OnBlobExpire(func(ref reference.Reference) error {
var r reference.Canonical
var ok bool
if r, ok = ref.(reference.Canonical); !ok {
return fmt.Errorf("unexpected reference type : %T", ref)
}
repo, err := registry.Repository(ctx, r)
if err != nil {
return err
}
blobs := repo.Blobs(ctx)
// Clear the repository reference and descriptor caches
err = blobs.Delete(ctx, r.Digest())
if err != nil {
return err
}
err = v.RemoveBlob(r.Digest().String())
if err != nil {
return err
}
return nil
})
s.OnManifestExpire(func(ref reference.Reference) error {
var r reference.Canonical
var ok bool
if r, ok = ref.(reference.Canonical); !ok {
return fmt.Errorf("unexpected reference type : %T", ref)
}
repo, err := registry.Repository(ctx, r)
if err != nil {
return err
}
manifests, err := repo.Manifests(ctx)
if err != nil {
return err
}
err = manifests.Delete(ctx, r.Digest())
if err != nil {
return err
}
return nil
})
err = s.Start()
if err != nil {
return nil, err
}
cs, err := configureAuth(config.Username, config.Password, config.RemoteURL)
if err != nil {
return nil, err
}
return &proxyingRegistry{
embedded: registry,
scheduler: s,
remoteURL: *remoteURL,
authChallenger: &remoteAuthChallenger{
remoteURL: *remoteURL,
cm: challenge.NewSimpleManager(),
cs: cs,
},
}, nil
}
func (pr *proxyingRegistry) Scope() distribution.Scope {
return distribution.GlobalScope
}
func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
return pr.embedded.Repositories(ctx, repos, last)
}
func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named) (distribution.Repository, error) {
c := pr.authChallenger
tkopts := auth.TokenHandlerOptions{
Transport: http.DefaultTransport,
Credentials: c.credentialStore(),
Scopes: []auth.Scope{
auth.RepositoryScope{
Repository: name.Name(),
Actions: []string{"pull"},
},
},
Logger: dcontext.GetLogger(ctx),
}
tr := transport.NewTransport(http.DefaultTransport,
auth.NewAuthorizer(c.challengeManager(),
auth.NewTokenHandlerWithOptions(tkopts)))
localRepo, err := pr.embedded.Repository(ctx, name)
if err != nil {
return nil, err
}
localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification())
if err != nil {
return nil, err
}
remoteRepo, err := client.NewRepository(name, pr.remoteURL.String(), tr)
if err != nil {
return nil, err
}
remoteManifests, err := remoteRepo.Manifests(ctx)
if err != nil {
return nil, err
}
return &proxiedRepository{
blobStore: &proxyBlobStore{
localStore: localRepo.Blobs(ctx),
remoteStore: remoteRepo.Blobs(ctx),
scheduler: pr.scheduler,
repositoryName: name,
authChallenger: pr.authChallenger,
},
manifests: &proxyManifestStore{
repositoryName: name,
localManifests: localManifests, // Options?
remoteManifests: remoteManifests,
ctx: ctx,
scheduler: pr.scheduler,
authChallenger: pr.authChallenger,
},
name: name,
tags: &proxyTagService{
localTags: localRepo.Tags(ctx),
remoteTags: remoteRepo.Tags(ctx),
authChallenger: pr.authChallenger,
},
}, nil
}
func (pr *proxyingRegistry) Blobs() distribution.BlobEnumerator {
return pr.embedded.Blobs()
}
func (pr *proxyingRegistry) BlobStatter() distribution.BlobStatter {
return pr.embedded.BlobStatter()
}
// authChallenger encapsulates a request to the upstream to establish credential challenges
type authChallenger interface {
tryEstablishChallenges(context.Context) error
challengeManager() challenge.Manager
credentialStore() auth.CredentialStore
}
type remoteAuthChallenger struct {
remoteURL url.URL
sync.Mutex
cm challenge.Manager
cs auth.CredentialStore
}
func (r *remoteAuthChallenger) credentialStore() auth.CredentialStore {
return r.cs
}
func (r *remoteAuthChallenger) challengeManager() challenge.Manager {
return r.cm
}
// tryEstablishChallenges will attempt to get a challenge type for the upstream if none currently exist
func (r *remoteAuthChallenger) tryEstablishChallenges(ctx context.Context) error {
r.Lock()
defer r.Unlock()
remoteURL := r.remoteURL
remoteURL.Path = "/v2/"
challenges, err := r.cm.GetChallenges(remoteURL)
if err != nil {
return err
}
if len(challenges) > 0 {
return nil
}
// establish challenge type with upstream
if err := ping(r.cm, remoteURL.String(), challengeHeader); err != nil {
return err
}
dcontext.GetLogger(ctx).Infof("Challenge established with upstream : %s %s", remoteURL, r.cm)
return nil
}
// proxiedRepository uses proxying blob and manifest services to serve content
// locally, or pulling it through from a remote and caching it locally if it doesn't
// already exist
type proxiedRepository struct {
blobStore distribution.BlobStore
manifests distribution.ManifestService
name reference.Named
tags distribution.TagService
}
func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
return pr.manifests, nil
}
func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore {
return pr.blobStore
}
func (pr *proxiedRepository) Named() reference.Named {
return pr.name
}
func (pr *proxiedRepository) Tags(ctx context.Context) distribution.TagService {
return pr.tags
}