distribution/registry/proxy/proxyblobstore.go

215 lines
5.5 KiB
Go
Raw Normal View History

package proxy
import (
"io"
"net/http"
"strconv"
"sync"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/proxy/scheduler"
)
// todo(richardscothern): from cache control header or config file
const blobTTL = time.Duration(24 * 7 * time.Hour)
type proxyBlobStore struct {
localStore distribution.BlobStore
remoteStore distribution.BlobService
scheduler *scheduler.TTLExpirationScheduler
}
var _ distribution.BlobStore = proxyBlobStore{}
type inflightBlob struct {
refCount int
bw distribution.BlobWriter
}
// inflight tracks currently downloading blobs
var inflight = make(map[digest.Digest]*inflightBlob)
// mu protects inflight
var mu sync.Mutex
func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
w.Header().Set("Content-Type", mediaType)
w.Header().Set("Docker-Content-Digest", digest.String())
w.Header().Set("Etag", digest.String())
}
func (pbs proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
desc, err := pbs.localStore.Stat(ctx, dgst)
if err != nil && err != distribution.ErrBlobUnknown {
return err
}
if err == nil {
proxyMetrics.BlobPush(uint64(desc.Size))
return pbs.localStore.ServeBlob(ctx, w, r, dgst)
}
desc, err = pbs.remoteStore.Stat(ctx, dgst)
if err != nil {
return err
}
remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
if err != nil {
return err
}
bw, isNew, cleanup, err := getOrCreateBlobWriter(ctx, pbs.localStore, desc)
if err != nil {
return err
}
defer cleanup()
if isNew {
go func() {
err := streamToStorage(ctx, remoteReader, desc, bw)
if err != nil {
context.GetLogger(ctx).Error(err)
}
proxyMetrics.BlobPull(uint64(desc.Size))
}()
err := streamToClient(ctx, w, desc, bw)
if err != nil {
return err
}
proxyMetrics.BlobPush(uint64(desc.Size))
pbs.scheduler.AddBlob(dgst.String(), blobTTL)
return nil
}
err = streamToClient(ctx, w, desc, bw)
if err != nil {
return err
}
proxyMetrics.BlobPush(uint64(desc.Size))
return nil
}
type cleanupFunc func()
// getOrCreateBlobWriter will track which blobs are currently being downloaded and enable client requesting
// the same blob concurrently to read from the existing stream.
func getOrCreateBlobWriter(ctx context.Context, blobs distribution.BlobService, desc distribution.Descriptor) (distribution.BlobWriter, bool, cleanupFunc, error) {
mu.Lock()
defer mu.Unlock()
dgst := desc.Digest
cleanup := func() {
mu.Lock()
defer mu.Unlock()
inflight[dgst].refCount--
if inflight[dgst].refCount == 0 {
defer delete(inflight, dgst)
_, err := inflight[dgst].bw.Commit(ctx, desc)
if err != nil {
// There is a narrow race here where Commit can be called while this blob's TTL is expiring
// and its being removed from storage. In that case, the client stream will continue
// uninterruped and the blob will be pulled through on the next request, so just log it
context.GetLogger(ctx).Errorf("Error committing blob: %q", err)
}
}
}
var bw distribution.BlobWriter
_, ok := inflight[dgst]
if ok {
bw = inflight[dgst].bw
inflight[dgst].refCount++
return bw, false, cleanup, nil
}
var err error
bw, err = blobs.Create(ctx)
if err != nil {
return nil, false, nil, err
}
inflight[dgst] = &inflightBlob{refCount: 1, bw: bw}
return bw, true, cleanup, nil
}
func streamToStorage(ctx context.Context, remoteReader distribution.ReadSeekCloser, desc distribution.Descriptor, bw distribution.BlobWriter) error {
_, err := io.CopyN(bw, remoteReader, desc.Size)
if err != nil {
return err
}
return nil
}
func streamToClient(ctx context.Context, w http.ResponseWriter, desc distribution.Descriptor, bw distribution.BlobWriter) error {
setResponseHeaders(w, desc.Size, desc.MediaType, desc.Digest)
reader, err := bw.Reader()
if err != nil {
return err
}
defer reader.Close()
teeReader := io.TeeReader(reader, w)
buf := make([]byte, 32768, 32786)
var soFar int64
for {
rd, err := teeReader.Read(buf)
if err == nil || err == io.EOF {
soFar += int64(rd)
if soFar < desc.Size {
// buffer underflow, keep trying
continue
}
return nil
}
return err
}
}
func (pbs proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
desc, err := pbs.localStore.Stat(ctx, dgst)
if err == nil {
return desc, err
}
if err != distribution.ErrBlobUnknown {
return distribution.Descriptor{}, err
}
return pbs.remoteStore.Stat(ctx, dgst)
}
// Unsupported functions
func (pbs proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
return distribution.Descriptor{}, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
return nil, distribution.ErrUnsupported
}
func (pbs proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
return distribution.ErrUnsupported
}