distribution/registry/proxy/proxyblobstore.go
Cory Snider d0f5aa670b Move context package internal
Our context package predates the establishment of current best practices
regarding context usage and it shows. It encourages bad practices such
as using contexts to propagate non-request-scoped values like the
application version and using string-typed keys for context values. Move
the package internal to remove it from the API surface of
distribution/v3@v3.0.0 so we are free to iterate on it without being
constrained by compatibility.

Signed-off-by: Cory Snider <csnider@mirantis.com>
2023-10-27 10:58:37 -04:00

209 lines
5.5 KiB
Go

package proxy
import (
"context"
"io"
"net/http"
"strconv"
"sync"
"time"
"github.com/opencontainers/go-digest"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/registry/proxy/scheduler"
"github.com/distribution/reference"
)
type proxyBlobStore struct {
localStore distribution.BlobStore
remoteStore distribution.BlobService
scheduler *scheduler.TTLExpirationScheduler
ttl *time.Duration
repositoryName reference.Named
authChallenger authChallenger
}
var _ distribution.BlobStore = &proxyBlobStore{}
// inflight tracks currently downloading blobs
var inflight = make(map[digest.Digest]struct{})
// mu protects inflight
var mu sync.Mutex
func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
w.Header().Set("Content-Type", mediaType)
w.Header().Set("Docker-Content-Digest", digest.String())
w.Header().Set("Etag", digest.String())
}
func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) {
desc, err := pbs.remoteStore.Stat(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
if w, ok := writer.(http.ResponseWriter); ok {
setResponseHeaders(w, desc.Size, desc.MediaType, dgst)
}
remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
defer remoteReader.Close()
_, err = io.CopyN(writer, remoteReader, desc.Size)
if err != nil {
return distribution.Descriptor{}, err
}
proxyMetrics.BlobPush(uint64(desc.Size))
return desc, nil
}
func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) {
localDesc, err := pbs.localStore.Stat(ctx, dgst)
if err != nil {
// Stat can report a zero sized file here if it's checked between creation
// and population. Return nil error, and continue
return false, nil
}
proxyMetrics.BlobPush(uint64(localDesc.Size))
return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
}
func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
served, err := pbs.serveLocal(ctx, w, r, dgst)
if err != nil {
dcontext.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error())
return err
}
if served {
return nil
}
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
return err
}
mu.Lock()
_, ok := inflight[dgst]
if ok {
// If the blob has been serving in other requests.
// Will return the blob from the remote store directly.
// TODO Maybe we could reuse the these blobs are serving remotely and caching locally.
mu.Unlock()
_, err := pbs.copyContent(ctx, dgst, w)
return err
}
inflight[dgst] = struct{}{}
mu.Unlock()
defer func() {
mu.Lock()
delete(inflight, dgst)
mu.Unlock()
}()
bw, err := pbs.localStore.Create(ctx)
if err != nil {
return err
}
// Serving client and storing locally over same fetching request.
// This can prevent a redundant blob fetching.
multiWriter := io.MultiWriter(w, bw)
desc, err := pbs.copyContent(ctx, dgst, multiWriter)
if err != nil {
return err
}
_, err = bw.Commit(ctx, desc)
if err != nil {
return err
}
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
if err != nil {
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
return err
}
if pbs.scheduler != nil && pbs.ttl != nil {
pbs.scheduler.AddBlob(blobRef, *pbs.ttl)
}
return nil
}
func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
desc, err := pbs.localStore.Stat(ctx, dgst)
if err == nil {
return desc, err
}
if err != distribution.ErrBlobUnknown {
return distribution.Descriptor{}, err
}
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
return distribution.Descriptor{}, err
}
return pbs.remoteStore.Stat(ctx, dgst)
}
func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
blob, err := pbs.localStore.Get(ctx, dgst)
if err == nil {
return blob, nil
}
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
return []byte{}, err
}
blob, err = pbs.remoteStore.Get(ctx, dgst)
if err != nil {
return []byte{}, err
}
_, err = pbs.localStore.Put(ctx, "", blob)
if err != nil {
return []byte{}, err
}
return blob, nil
}
// Unsupported functions
func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
return distribution.Descriptor{}, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) {
return distribution.Descriptor{}, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) {
return nil, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
return distribution.ErrUnsupported
}