distribution/registry/proxy/proxyblobstore.go

229 lines
6 KiB
Go
Raw Normal View History

package proxy
import (
"context"
"io"
"net/http"
"strconv"
"sync"
"time"
"github.com/opencontainers/go-digest"
"github.com/distribution/distribution/v3"
dcontext "github.com/distribution/distribution/v3/context"
"github.com/distribution/distribution/v3/reference"
"github.com/distribution/distribution/v3/registry/proxy/scheduler"
)
type proxyBlobStore struct {
localStore distribution.BlobStore
remoteStore distribution.BlobService
scheduler *scheduler.TTLExpirationScheduler
ttl *time.Duration
repositoryName reference.Named
authChallenger authChallenger
}
var _ distribution.BlobStore = &proxyBlobStore{}
// inflight tracks currently downloading blobs
var inflight = make(map[digest.Digest]struct{})
// mu protects inflight
var mu sync.Mutex
func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
w.Header().Set("Content-Type", mediaType)
w.Header().Set("Docker-Content-Digest", digest.String())
w.Header().Set("Etag", digest.String())
}
func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) {
desc, err := pbs.remoteStore.Stat(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
if w, ok := writer.(http.ResponseWriter); ok {
setResponseHeaders(w, desc.Size, desc.MediaType, dgst)
}
remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
defer remoteReader.Close()
_, err = io.CopyN(writer, remoteReader, desc.Size)
if err != nil {
return distribution.Descriptor{}, err
}
proxyMetrics.BlobPush(uint64(desc.Size))
return desc, nil
}
func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) {
localDesc, err := pbs.localStore.Stat(ctx, dgst)
if err != nil {
// Stat can report a zero sized file here if it's checked between creation
// and population. Return nil error, and continue
return false, nil
}
proxyMetrics.BlobPush(uint64(localDesc.Size))
return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
}
func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error {
defer func() {
mu.Lock()
delete(inflight, dgst)
mu.Unlock()
}()
var desc distribution.Descriptor
var err error
var bw distribution.BlobWriter
bw, err = pbs.localStore.Create(ctx)
if err != nil {
return err
}
desc, err = pbs.copyContent(ctx, dgst, bw)
if err != nil {
return err
}
_, err = bw.Commit(ctx, desc)
if err != nil {
return err
}
return nil
}
func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
served, err := pbs.serveLocal(ctx, w, r, dgst)
if err != nil {
dcontext.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error())
return err
}
if served {
return nil
}
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
return err
}
mu.Lock()
_, ok := inflight[dgst]
if ok {
mu.Unlock()
_, err := pbs.copyContent(ctx, dgst, w)
return err
}
inflight[dgst] = struct{}{}
mu.Unlock()
// storeLocalCtx will be independent with ctx, because ctx it used to fetch remote image.
// There would be a situation, that is pulling remote bytes ends before pbs.storeLocal( 'Copy', 'Commit' ...)
// Then the registry fails to cache the layer, even though the layer had been served to client.
storeLocalCtx, cancel := context.WithCancel(context.Background())
go func(dgst digest.Digest) {
defer cancel()
if err := pbs.storeLocal(storeLocalCtx, dgst); err != nil {
dcontext.GetLogger(storeLocalCtx).Errorf("Error committing to storage: %s", err.Error())
}
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
if err != nil {
dcontext.GetLogger(storeLocalCtx).Errorf("Error creating reference: %s", err)
return
}
if pbs.scheduler != nil && pbs.ttl != nil {
pbs.scheduler.AddBlob(blobRef, *pbs.ttl)
}
}(dgst)
_, err = pbs.copyContent(ctx, dgst, w)
if err != nil {
cancel()
return err
}
return nil
}
func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
desc, err := pbs.localStore.Stat(ctx, dgst)
if err == nil {
return desc, err
}
if err != distribution.ErrBlobUnknown {
return distribution.Descriptor{}, err
}
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
return distribution.Descriptor{}, err
}
return pbs.remoteStore.Stat(ctx, dgst)
}
func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
blob, err := pbs.localStore.Get(ctx, dgst)
if err == nil {
return blob, nil
}
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
return []byte{}, err
}
blob, err = pbs.remoteStore.Get(ctx, dgst)
if err != nil {
return []byte{}, err
}
_, err = pbs.localStore.Put(ctx, "", blob)
if err != nil {
return []byte{}, err
}
return blob, nil
}
// Unsupported functions
func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
return distribution.Descriptor{}, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) {
return distribution.Descriptor{}, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) {
return nil, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
return distribution.ErrUnsupported
}