9c88801a12
Back in the before time, the best practices surrounding usage of Context weren't quite worked out. We defined our own type to make usage easier. As this packaged was used elsewhere, it make it more and more challenging to integrate with the forked `Context` type. Now that it is available in the standard library, we can just use that one directly. To make usage more consistent, we now use `dcontext` when referring to the distribution context package. Signed-off-by: Stephen J Day <stephen.day@docker.com>
225 lines
5.6 KiB
Go
225 lines
5.6 KiB
Go
package proxy
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net/http"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/docker/distribution"
|
|
dcontext "github.com/docker/distribution/context"
|
|
"github.com/docker/distribution/reference"
|
|
"github.com/docker/distribution/registry/proxy/scheduler"
|
|
"github.com/opencontainers/go-digest"
|
|
)
|
|
|
|
// todo(richardscothern): from cache control header or config file
|
|
const blobTTL = time.Duration(24 * 7 * time.Hour)
|
|
|
|
type proxyBlobStore struct {
|
|
localStore distribution.BlobStore
|
|
remoteStore distribution.BlobService
|
|
scheduler *scheduler.TTLExpirationScheduler
|
|
repositoryName reference.Named
|
|
authChallenger authChallenger
|
|
}
|
|
|
|
var _ distribution.BlobStore = &proxyBlobStore{}
|
|
|
|
// inflight tracks currently downloading blobs
|
|
var inflight = make(map[digest.Digest]struct{})
|
|
|
|
// mu protects inflight
|
|
var mu sync.Mutex
|
|
|
|
func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
|
|
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
|
|
w.Header().Set("Content-Type", mediaType)
|
|
w.Header().Set("Docker-Content-Digest", digest.String())
|
|
w.Header().Set("Etag", digest.String())
|
|
}
|
|
|
|
func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) {
|
|
desc, err := pbs.remoteStore.Stat(ctx, dgst)
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
if w, ok := writer.(http.ResponseWriter); ok {
|
|
setResponseHeaders(w, desc.Size, desc.MediaType, dgst)
|
|
}
|
|
|
|
remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
defer remoteReader.Close()
|
|
|
|
_, err = io.CopyN(writer, remoteReader, desc.Size)
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
proxyMetrics.BlobPush(uint64(desc.Size))
|
|
|
|
return desc, nil
|
|
}
|
|
|
|
func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) {
|
|
localDesc, err := pbs.localStore.Stat(ctx, dgst)
|
|
if err != nil {
|
|
// Stat can report a zero sized file here if it's checked between creation
|
|
// and population. Return nil error, and continue
|
|
return false, nil
|
|
}
|
|
|
|
if err == nil {
|
|
proxyMetrics.BlobPush(uint64(localDesc.Size))
|
|
return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
|
|
}
|
|
|
|
return false, nil
|
|
|
|
}
|
|
|
|
func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error {
|
|
defer func() {
|
|
mu.Lock()
|
|
delete(inflight, dgst)
|
|
mu.Unlock()
|
|
}()
|
|
|
|
var desc distribution.Descriptor
|
|
var err error
|
|
var bw distribution.BlobWriter
|
|
|
|
bw, err = pbs.localStore.Create(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
desc, err = pbs.copyContent(ctx, dgst, bw)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = bw.Commit(ctx, desc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
|
|
served, err := pbs.serveLocal(ctx, w, r, dgst)
|
|
if err != nil {
|
|
dcontext.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error())
|
|
return err
|
|
}
|
|
|
|
if served {
|
|
return nil
|
|
}
|
|
|
|
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
mu.Lock()
|
|
_, ok := inflight[dgst]
|
|
if ok {
|
|
mu.Unlock()
|
|
_, err := pbs.copyContent(ctx, dgst, w)
|
|
return err
|
|
}
|
|
inflight[dgst] = struct{}{}
|
|
mu.Unlock()
|
|
|
|
go func(dgst digest.Digest) {
|
|
if err := pbs.storeLocal(ctx, dgst); err != nil {
|
|
dcontext.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error())
|
|
}
|
|
|
|
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
|
|
if err != nil {
|
|
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
|
|
return
|
|
}
|
|
|
|
pbs.scheduler.AddBlob(blobRef, repositoryTTL)
|
|
}(dgst)
|
|
|
|
_, err = pbs.copyContent(ctx, dgst, w)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
desc, err := pbs.localStore.Stat(ctx, dgst)
|
|
if err == nil {
|
|
return desc, err
|
|
}
|
|
|
|
if err != distribution.ErrBlobUnknown {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
return pbs.remoteStore.Stat(ctx, dgst)
|
|
}
|
|
|
|
func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
|
|
blob, err := pbs.localStore.Get(ctx, dgst)
|
|
if err == nil {
|
|
return blob, nil
|
|
}
|
|
|
|
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
|
|
return []byte{}, err
|
|
}
|
|
|
|
blob, err = pbs.remoteStore.Get(ctx, dgst)
|
|
if err != nil {
|
|
return []byte{}, err
|
|
}
|
|
|
|
_, err = pbs.localStore.Put(ctx, "", blob)
|
|
if err != nil {
|
|
return []byte{}, err
|
|
}
|
|
return blob, nil
|
|
}
|
|
|
|
// Unsupported functions
|
|
func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
|
|
return distribution.Descriptor{}, distribution.ErrUnsupported
|
|
}
|
|
|
|
func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
|
|
return nil, distribution.ErrUnsupported
|
|
}
|
|
|
|
func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
|
return nil, distribution.ErrUnsupported
|
|
}
|
|
|
|
func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
return distribution.Descriptor{}, distribution.ErrUnsupported
|
|
}
|
|
|
|
func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
|
|
return nil, distribution.ErrUnsupported
|
|
}
|
|
|
|
func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
|
return distribution.ErrUnsupported
|
|
}
|