cc23fdacff
Our registry client is not currently in a good place to be used as the reference OCI Distribution client implementation. But the registry proxy currently depends on it. Make the registry client internal to the distribution application to remove it from the API surface area (and any implied compatibility promises) of distribution/v3@v3.0.0 without breaking the proxy. Signed-off-by: Cory Snider <csnider@mirantis.com>
285 lines
7.3 KiB
Go
285 lines
7.3 KiB
Go
package proxy
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/distribution/distribution/v3"
|
|
"github.com/distribution/distribution/v3/configuration"
|
|
dcontext "github.com/distribution/distribution/v3/context"
|
|
"github.com/distribution/distribution/v3/internal/client"
|
|
"github.com/distribution/distribution/v3/internal/client/auth"
|
|
"github.com/distribution/distribution/v3/internal/client/auth/challenge"
|
|
"github.com/distribution/distribution/v3/internal/client/transport"
|
|
"github.com/distribution/distribution/v3/registry/proxy/scheduler"
|
|
"github.com/distribution/distribution/v3/registry/storage"
|
|
"github.com/distribution/distribution/v3/registry/storage/driver"
|
|
"github.com/distribution/reference"
|
|
)
|
|
|
|
var repositoryTTL = 24 * 7 * time.Hour
|
|
|
|
// proxyingRegistry fetches content from a remote registry and caches it locally
|
|
type proxyingRegistry struct {
|
|
embedded distribution.Namespace // provides local registry functionality
|
|
scheduler *scheduler.TTLExpirationScheduler
|
|
ttl *time.Duration
|
|
remoteURL url.URL
|
|
authChallenger authChallenger
|
|
}
|
|
|
|
// NewRegistryPullThroughCache creates a registry acting as a pull through cache
|
|
func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) {
|
|
remoteURL, err := url.Parse(config.RemoteURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
v := storage.NewVacuum(ctx, driver)
|
|
|
|
var s *scheduler.TTLExpirationScheduler
|
|
var ttl *time.Duration
|
|
if config.TTL == nil {
|
|
// Default TTL is 7 days
|
|
ttl = &repositoryTTL
|
|
} else if *config.TTL > 0 {
|
|
ttl = config.TTL
|
|
} else {
|
|
// TTL is disabled, never expire
|
|
ttl = nil
|
|
}
|
|
|
|
if ttl != nil {
|
|
s = scheduler.New(ctx, driver, "/scheduler-state.json")
|
|
s.OnBlobExpire(func(ref reference.Reference) error {
|
|
var r reference.Canonical
|
|
var ok bool
|
|
if r, ok = ref.(reference.Canonical); !ok {
|
|
return fmt.Errorf("unexpected reference type : %T", ref)
|
|
}
|
|
|
|
repo, err := registry.Repository(ctx, r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
blobs := repo.Blobs(ctx)
|
|
|
|
// Clear the repository reference and descriptor caches
|
|
err = blobs.Delete(ctx, r.Digest())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = v.RemoveBlob(r.Digest().String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
s.OnManifestExpire(func(ref reference.Reference) error {
|
|
var r reference.Canonical
|
|
var ok bool
|
|
if r, ok = ref.(reference.Canonical); !ok {
|
|
return fmt.Errorf("unexpected reference type : %T", ref)
|
|
}
|
|
|
|
repo, err := registry.Repository(ctx, r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
manifests, err := repo.Manifests(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = manifests.Delete(ctx, r.Digest())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
err = s.Start()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
cs, err := configureAuth(config.Username, config.Password, config.RemoteURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &proxyingRegistry{
|
|
embedded: registry,
|
|
scheduler: s,
|
|
ttl: ttl,
|
|
remoteURL: *remoteURL,
|
|
authChallenger: &remoteAuthChallenger{
|
|
remoteURL: *remoteURL,
|
|
cm: challenge.NewSimpleManager(),
|
|
cs: cs,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (pr *proxyingRegistry) Scope() distribution.Scope {
|
|
return distribution.GlobalScope
|
|
}
|
|
|
|
func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
|
|
return pr.embedded.Repositories(ctx, repos, last)
|
|
}
|
|
|
|
func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named) (distribution.Repository, error) {
|
|
c := pr.authChallenger
|
|
|
|
tkopts := auth.TokenHandlerOptions{
|
|
Transport: http.DefaultTransport,
|
|
Credentials: c.credentialStore(),
|
|
Scopes: []auth.Scope{
|
|
auth.RepositoryScope{
|
|
Repository: name.Name(),
|
|
Actions: []string{"pull"},
|
|
},
|
|
},
|
|
Logger: dcontext.GetLogger(ctx),
|
|
}
|
|
|
|
tr := transport.NewTransport(http.DefaultTransport,
|
|
auth.NewAuthorizer(c.challengeManager(),
|
|
auth.NewTokenHandlerWithOptions(tkopts)))
|
|
|
|
localRepo, err := pr.embedded.Repository(ctx, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
remoteRepo, err := client.NewRepository(name, pr.remoteURL.String(), tr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
remoteManifests, err := remoteRepo.Manifests(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &proxiedRepository{
|
|
blobStore: &proxyBlobStore{
|
|
localStore: localRepo.Blobs(ctx),
|
|
remoteStore: remoteRepo.Blobs(ctx),
|
|
scheduler: pr.scheduler,
|
|
ttl: pr.ttl,
|
|
repositoryName: name,
|
|
authChallenger: pr.authChallenger,
|
|
},
|
|
manifests: &proxyManifestStore{
|
|
repositoryName: name,
|
|
localManifests: localManifests, // Options?
|
|
remoteManifests: remoteManifests,
|
|
ctx: ctx,
|
|
scheduler: pr.scheduler,
|
|
ttl: pr.ttl,
|
|
authChallenger: pr.authChallenger,
|
|
},
|
|
name: name,
|
|
tags: &proxyTagService{
|
|
localTags: localRepo.Tags(ctx),
|
|
remoteTags: remoteRepo.Tags(ctx),
|
|
authChallenger: pr.authChallenger,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (pr *proxyingRegistry) Blobs() distribution.BlobEnumerator {
|
|
return pr.embedded.Blobs()
|
|
}
|
|
|
|
func (pr *proxyingRegistry) BlobStatter() distribution.BlobStatter {
|
|
return pr.embedded.BlobStatter()
|
|
}
|
|
|
|
// authChallenger encapsulates a request to the upstream to establish credential challenges
|
|
type authChallenger interface {
|
|
tryEstablishChallenges(context.Context) error
|
|
challengeManager() challenge.Manager
|
|
credentialStore() auth.CredentialStore
|
|
}
|
|
|
|
type remoteAuthChallenger struct {
|
|
remoteURL url.URL
|
|
sync.Mutex
|
|
cm challenge.Manager
|
|
cs auth.CredentialStore
|
|
}
|
|
|
|
func (r *remoteAuthChallenger) credentialStore() auth.CredentialStore {
|
|
return r.cs
|
|
}
|
|
|
|
func (r *remoteAuthChallenger) challengeManager() challenge.Manager {
|
|
return r.cm
|
|
}
|
|
|
|
// tryEstablishChallenges will attempt to get a challenge type for the upstream if none currently exist
|
|
func (r *remoteAuthChallenger) tryEstablishChallenges(ctx context.Context) error {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
|
|
remoteURL := r.remoteURL
|
|
remoteURL.Path = "/v2/"
|
|
challenges, err := r.cm.GetChallenges(remoteURL)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(challenges) > 0 {
|
|
return nil
|
|
}
|
|
|
|
// establish challenge type with upstream
|
|
if err := ping(r.cm, remoteURL.String(), challengeHeader); err != nil {
|
|
return err
|
|
}
|
|
|
|
dcontext.GetLogger(ctx).Infof("Challenge established with upstream : %s %s", remoteURL, r.cm)
|
|
return nil
|
|
}
|
|
|
|
// proxiedRepository uses proxying blob and manifest services to serve content
|
|
// locally, or pulling it through from a remote and caching it locally if it doesn't
|
|
// already exist
|
|
type proxiedRepository struct {
|
|
blobStore distribution.BlobStore
|
|
manifests distribution.ManifestService
|
|
name reference.Named
|
|
tags distribution.TagService
|
|
}
|
|
|
|
func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
|
|
return pr.manifests, nil
|
|
}
|
|
|
|
func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore {
|
|
return pr.blobStore
|
|
}
|
|
|
|
func (pr *proxiedRepository) Named() reference.Named {
|
|
return pr.name
|
|
}
|
|
|
|
func (pr *proxiedRepository) Tags(ctx context.Context) distribution.TagService {
|
|
return pr.tags
|
|
}
|