package proxy import ( "context" "io" "net/http" "strconv" "sync" "time" "github.com/opencontainers/go-digest" "github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3/internal/dcontext" "github.com/distribution/distribution/v3/registry/proxy/scheduler" "github.com/distribution/reference" ) type proxyBlobStore struct { localStore distribution.BlobStore remoteStore distribution.BlobService scheduler *scheduler.TTLExpirationScheduler ttl *time.Duration repositoryName reference.Named authChallenger authChallenger } var _ distribution.BlobStore = &proxyBlobStore{} // inflight tracks currently downloading blobs var inflight = make(map[digest.Digest]struct{}) // mu protects inflight var mu sync.Mutex func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) { w.Header().Set("Content-Length", strconv.FormatInt(length, 10)) w.Header().Set("Content-Type", mediaType) w.Header().Set("Docker-Content-Digest", digest.String()) w.Header().Set("Etag", digest.String()) } func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) { desc, err := pbs.remoteStore.Stat(ctx, dgst) if err != nil { return distribution.Descriptor{}, err } if w, ok := writer.(http.ResponseWriter); ok { setResponseHeaders(w, desc.Size, desc.MediaType, dgst) } remoteReader, err := pbs.remoteStore.Open(ctx, dgst) if err != nil { return distribution.Descriptor{}, err } defer remoteReader.Close() _, err = io.CopyN(writer, remoteReader, desc.Size) if err != nil { return distribution.Descriptor{}, err } proxyMetrics.BlobPull(uint64(desc.Size)) proxyMetrics.BlobPush(uint64(desc.Size), false) return desc, nil } func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) { localDesc, err := pbs.localStore.Stat(ctx, dgst) if err != nil { // Stat can report a zero sized file here if it's checked between creation // and population. Return nil error, and continue return false, nil } proxyMetrics.BlobPush(uint64(localDesc.Size), true) return true, pbs.localStore.ServeBlob(ctx, w, r, dgst) } func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { served, err := pbs.serveLocal(ctx, w, r, dgst) if err != nil { dcontext.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error()) return err } if served { return nil } if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil { return err } mu.Lock() _, ok := inflight[dgst] if ok { // If the blob has been serving in other requests. // Will return the blob from the remote store directly. // TODO Maybe we could reuse the these blobs are serving remotely and caching locally. mu.Unlock() _, err := pbs.copyContent(ctx, dgst, w) return err } inflight[dgst] = struct{}{} mu.Unlock() defer func() { mu.Lock() delete(inflight, dgst) mu.Unlock() }() bw, err := pbs.localStore.Create(ctx) if err != nil { return err } // Serving client and storing locally over same fetching request. // This can prevent a redundant blob fetching. multiWriter := io.MultiWriter(w, bw) desc, err := pbs.copyContent(ctx, dgst, multiWriter) if err != nil { return err } _, err = bw.Commit(ctx, desc) if err != nil { return err } blobRef, err := reference.WithDigest(pbs.repositoryName, dgst) if err != nil { dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err) return err } if pbs.scheduler != nil && pbs.ttl != nil { if err := pbs.scheduler.AddBlob(blobRef, *pbs.ttl); err != nil { dcontext.GetLogger(ctx).Errorf("Error adding blob: %s", err) return err } } return nil } func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { desc, err := pbs.localStore.Stat(ctx, dgst) if err == nil { return desc, err } if err != distribution.ErrBlobUnknown { return distribution.Descriptor{}, err } if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil { return distribution.Descriptor{}, err } return pbs.remoteStore.Stat(ctx, dgst) } func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { blob, err := pbs.localStore.Get(ctx, dgst) if err == nil { return blob, nil } if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil { return []byte{}, err } blob, err = pbs.remoteStore.Get(ctx, dgst) if err != nil { return []byte{}, err } _, err = pbs.localStore.Put(ctx, "", blob) if err != nil { return []byte{}, err } return blob, nil } // Unsupported functions func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { return distribution.Descriptor{}, distribution.ErrUnsupported } func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { return nil, distribution.ErrUnsupported } func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { return nil, distribution.ErrUnsupported } func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) { return distribution.Descriptor{}, distribution.ErrUnsupported } func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) { return nil, distribution.ErrUnsupported } func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { return distribution.ErrUnsupported }