package proxy import ( "context" "io" "net/http" "strconv" "sync" "time" "github.com/opencontainers/go-digest" "github.com/distribution/distribution/v3" dcontext "github.com/distribution/distribution/v3/context" "github.com/distribution/distribution/v3/reference" "github.com/distribution/distribution/v3/registry/proxy/scheduler" ) type proxyBlobStore struct { localStore distribution.BlobStore remoteStore distribution.BlobService scheduler *scheduler.TTLExpirationScheduler ttl *time.Duration repositoryName reference.Named authChallenger authChallenger } var _ distribution.BlobStore = &proxyBlobStore{} // inflight tracks currently downloading blobs var inflight = make(map[digest.Digest]struct{}) // mu protects inflight var mu sync.Mutex func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) { w.Header().Set("Content-Length", strconv.FormatInt(length, 10)) w.Header().Set("Content-Type", mediaType) w.Header().Set("Docker-Content-Digest", digest.String()) w.Header().Set("Etag", digest.String()) } func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) { desc, err := pbs.remoteStore.Stat(ctx, dgst) if err != nil { return distribution.Descriptor{}, err } if w, ok := writer.(http.ResponseWriter); ok { setResponseHeaders(w, desc.Size, desc.MediaType, dgst) } remoteReader, err := pbs.remoteStore.Open(ctx, dgst) if err != nil { return distribution.Descriptor{}, err } defer remoteReader.Close() _, err = io.CopyN(writer, remoteReader, desc.Size) if err != nil { return distribution.Descriptor{}, err } proxyMetrics.BlobPush(uint64(desc.Size)) return desc, nil } func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) { localDesc, err := pbs.localStore.Stat(ctx, dgst) if err != nil { // Stat can report a zero sized file here if it's checked between creation // and population. Return nil error, and continue return false, nil } proxyMetrics.BlobPush(uint64(localDesc.Size)) return true, pbs.localStore.ServeBlob(ctx, w, r, dgst) } func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error { defer func() { mu.Lock() delete(inflight, dgst) mu.Unlock() }() var desc distribution.Descriptor var err error var bw distribution.BlobWriter bw, err = pbs.localStore.Create(ctx) if err != nil { return err } desc, err = pbs.copyContent(ctx, dgst, bw) if err != nil { return err } _, err = bw.Commit(ctx, desc) if err != nil { return err } return nil } func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { served, err := pbs.serveLocal(ctx, w, r, dgst) if err != nil { dcontext.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error()) return err } if served { return nil } if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil { return err } mu.Lock() _, ok := inflight[dgst] if ok { mu.Unlock() _, err := pbs.copyContent(ctx, dgst, w) return err } inflight[dgst] = struct{}{} mu.Unlock() // storeLocalCtx will be independent with ctx, because ctx it used to fetch remote image. // There would be a situation, that is pulling remote bytes ends before pbs.storeLocal( 'Copy', 'Commit' ...) // Then the registry fails to cache the layer, even though the layer had been served to client. storeLocalCtx, cancel := context.WithCancel(context.Background()) go func(dgst digest.Digest) { defer cancel() if err := pbs.storeLocal(storeLocalCtx, dgst); err != nil { dcontext.GetLogger(storeLocalCtx).Errorf("Error committing to storage: %s", err.Error()) } blobRef, err := reference.WithDigest(pbs.repositoryName, dgst) if err != nil { dcontext.GetLogger(storeLocalCtx).Errorf("Error creating reference: %s", err) return } if pbs.scheduler != nil && pbs.ttl != nil { pbs.scheduler.AddBlob(blobRef, *pbs.ttl) } }(dgst) _, err = pbs.copyContent(ctx, dgst, w) if err != nil { cancel() return err } return nil } func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { desc, err := pbs.localStore.Stat(ctx, dgst) if err == nil { return desc, err } if err != distribution.ErrBlobUnknown { return distribution.Descriptor{}, err } if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil { return distribution.Descriptor{}, err } return pbs.remoteStore.Stat(ctx, dgst) } func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { blob, err := pbs.localStore.Get(ctx, dgst) if err == nil { return blob, nil } if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil { return []byte{}, err } blob, err = pbs.remoteStore.Get(ctx, dgst) if err != nil { return []byte{}, err } _, err = pbs.localStore.Put(ctx, "", blob) if err != nil { return []byte{}, err } return blob, nil } // Unsupported functions func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { return distribution.Descriptor{}, distribution.ErrUnsupported } func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { return nil, distribution.ErrUnsupported } func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { return nil, distribution.ErrUnsupported } func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) { return distribution.Descriptor{}, distribution.ErrUnsupported } func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) { return nil, distribution.ErrUnsupported } func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { return distribution.ErrUnsupported }