distribution/registry/proxy/proxyregistry.go
Cory Snider d0f5aa670b Move context package internal
Our context package predates the establishment of current best practices
regarding context usage and it shows. It encourages bad practices such
as using contexts to propagate non-request-scoped values like the
application version and using string-typed keys for context values. Move
the package internal to remove it from the API surface of
distribution/v3@v3.0.0 so we are free to iterate on it without being
constrained by compatibility.

Signed-off-by: Cory Snider <csnider@mirantis.com>
2023-10-27 10:58:37 -04:00

285 lines
7.3 KiB
Go

package proxy
import (
"context"
"fmt"
"net/http"
"net/url"
"sync"
"time"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/configuration"
"github.com/distribution/distribution/v3/internal/client"
"github.com/distribution/distribution/v3/internal/client/auth"
"github.com/distribution/distribution/v3/internal/client/auth/challenge"
"github.com/distribution/distribution/v3/internal/client/transport"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/registry/proxy/scheduler"
"github.com/distribution/distribution/v3/registry/storage"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference"
)
var repositoryTTL = 24 * 7 * time.Hour
// proxyingRegistry fetches content from a remote registry and caches it locally
type proxyingRegistry struct {
embedded distribution.Namespace // provides local registry functionality
scheduler *scheduler.TTLExpirationScheduler
ttl *time.Duration
remoteURL url.URL
authChallenger authChallenger
}
// NewRegistryPullThroughCache creates a registry acting as a pull through cache
func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) {
remoteURL, err := url.Parse(config.RemoteURL)
if err != nil {
return nil, err
}
v := storage.NewVacuum(ctx, driver)
var s *scheduler.TTLExpirationScheduler
var ttl *time.Duration
if config.TTL == nil {
// Default TTL is 7 days
ttl = &repositoryTTL
} else if *config.TTL > 0 {
ttl = config.TTL
} else {
// TTL is disabled, never expire
ttl = nil
}
if ttl != nil {
s = scheduler.New(ctx, driver, "/scheduler-state.json")
s.OnBlobExpire(func(ref reference.Reference) error {
var r reference.Canonical
var ok bool
if r, ok = ref.(reference.Canonical); !ok {
return fmt.Errorf("unexpected reference type : %T", ref)
}
repo, err := registry.Repository(ctx, r)
if err != nil {
return err
}
blobs := repo.Blobs(ctx)
// Clear the repository reference and descriptor caches
err = blobs.Delete(ctx, r.Digest())
if err != nil {
return err
}
err = v.RemoveBlob(r.Digest().String())
if err != nil {
return err
}
return nil
})
s.OnManifestExpire(func(ref reference.Reference) error {
var r reference.Canonical
var ok bool
if r, ok = ref.(reference.Canonical); !ok {
return fmt.Errorf("unexpected reference type : %T", ref)
}
repo, err := registry.Repository(ctx, r)
if err != nil {
return err
}
manifests, err := repo.Manifests(ctx)
if err != nil {
return err
}
err = manifests.Delete(ctx, r.Digest())
if err != nil {
return err
}
return nil
})
err = s.Start()
if err != nil {
return nil, err
}
}
cs, err := configureAuth(config.Username, config.Password, config.RemoteURL)
if err != nil {
return nil, err
}
return &proxyingRegistry{
embedded: registry,
scheduler: s,
ttl: ttl,
remoteURL: *remoteURL,
authChallenger: &remoteAuthChallenger{
remoteURL: *remoteURL,
cm: challenge.NewSimpleManager(),
cs: cs,
},
}, nil
}
func (pr *proxyingRegistry) Scope() distribution.Scope {
return distribution.GlobalScope
}
func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
return pr.embedded.Repositories(ctx, repos, last)
}
func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named) (distribution.Repository, error) {
c := pr.authChallenger
tkopts := auth.TokenHandlerOptions{
Transport: http.DefaultTransport,
Credentials: c.credentialStore(),
Scopes: []auth.Scope{
auth.RepositoryScope{
Repository: name.Name(),
Actions: []string{"pull"},
},
},
Logger: dcontext.GetLogger(ctx),
}
tr := transport.NewTransport(http.DefaultTransport,
auth.NewAuthorizer(c.challengeManager(),
auth.NewTokenHandlerWithOptions(tkopts)))
localRepo, err := pr.embedded.Repository(ctx, name)
if err != nil {
return nil, err
}
localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification())
if err != nil {
return nil, err
}
remoteRepo, err := client.NewRepository(name, pr.remoteURL.String(), tr)
if err != nil {
return nil, err
}
remoteManifests, err := remoteRepo.Manifests(ctx)
if err != nil {
return nil, err
}
return &proxiedRepository{
blobStore: &proxyBlobStore{
localStore: localRepo.Blobs(ctx),
remoteStore: remoteRepo.Blobs(ctx),
scheduler: pr.scheduler,
ttl: pr.ttl,
repositoryName: name,
authChallenger: pr.authChallenger,
},
manifests: &proxyManifestStore{
repositoryName: name,
localManifests: localManifests, // Options?
remoteManifests: remoteManifests,
ctx: ctx,
scheduler: pr.scheduler,
ttl: pr.ttl,
authChallenger: pr.authChallenger,
},
name: name,
tags: &proxyTagService{
localTags: localRepo.Tags(ctx),
remoteTags: remoteRepo.Tags(ctx),
authChallenger: pr.authChallenger,
},
}, nil
}
func (pr *proxyingRegistry) Blobs() distribution.BlobEnumerator {
return pr.embedded.Blobs()
}
func (pr *proxyingRegistry) BlobStatter() distribution.BlobStatter {
return pr.embedded.BlobStatter()
}
// authChallenger encapsulates a request to the upstream to establish credential challenges
type authChallenger interface {
tryEstablishChallenges(context.Context) error
challengeManager() challenge.Manager
credentialStore() auth.CredentialStore
}
type remoteAuthChallenger struct {
remoteURL url.URL
sync.Mutex
cm challenge.Manager
cs auth.CredentialStore
}
func (r *remoteAuthChallenger) credentialStore() auth.CredentialStore {
return r.cs
}
func (r *remoteAuthChallenger) challengeManager() challenge.Manager {
return r.cm
}
// tryEstablishChallenges will attempt to get a challenge type for the upstream if none currently exist
func (r *remoteAuthChallenger) tryEstablishChallenges(ctx context.Context) error {
r.Lock()
defer r.Unlock()
remoteURL := r.remoteURL
remoteURL.Path = "/v2/"
challenges, err := r.cm.GetChallenges(remoteURL)
if err != nil {
return err
}
if len(challenges) > 0 {
return nil
}
// establish challenge type with upstream
if err := ping(r.cm, remoteURL.String(), challengeHeader); err != nil {
return err
}
dcontext.GetLogger(ctx).Infof("Challenge established with upstream : %s %s", remoteURL, r.cm)
return nil
}
// proxiedRepository uses proxying blob and manifest services to serve content
// locally, or pulling it through from a remote and caching it locally if it doesn't
// already exist
type proxiedRepository struct {
blobStore distribution.BlobStore
manifests distribution.ManifestService
name reference.Named
tags distribution.TagService
}
func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
return pr.manifests, nil
}
func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore {
return pr.blobStore
}
func (pr *proxiedRepository) Named() reference.Named {
return pr.name
}
func (pr *proxiedRepository) Tags(ctx context.Context) distribution.TagService {
return pr.tags
}