diff --git a/registry/proxy/proxyblobstore.go b/registry/proxy/proxyblobstore.go index b480a1112..976dc8d7c 100644 --- a/registry/proxy/proxyblobstore.go +++ b/registry/proxy/proxyblobstore.go @@ -22,15 +22,10 @@ type proxyBlobStore struct { scheduler *scheduler.TTLExpirationScheduler } -var _ distribution.BlobStore = proxyBlobStore{} - -type inflightBlob struct { - refCount int - bw distribution.BlobWriter -} +var _ distribution.BlobStore = &proxyBlobStore{} // inflight tracks currently downloading blobs -var inflight = make(map[digest.Digest]*inflightBlob) +var inflight = make(map[digest.Digest]struct{}) // mu protects inflight var mu sync.Mutex @@ -42,140 +37,113 @@ func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, d w.Header().Set("Etag", digest.String()) } -func (pbs proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { - desc, err := pbs.localStore.Stat(ctx, dgst) - if err != nil && err != distribution.ErrBlobUnknown { - return err - } - - if err == nil { - proxyMetrics.BlobPush(uint64(desc.Size)) - return pbs.localStore.ServeBlob(ctx, w, r, dgst) - } - - desc, err = pbs.remoteStore.Stat(ctx, dgst) +func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) { + desc, err := pbs.remoteStore.Stat(ctx, dgst) if err != nil { - return err + return distribution.Descriptor{}, err + } + + if w, ok := writer.(http.ResponseWriter); ok { + setResponseHeaders(w, desc.Size, desc.MediaType, dgst) } remoteReader, err := pbs.remoteStore.Open(ctx, dgst) if err != nil { - return err + return distribution.Descriptor{}, err } - bw, isNew, cleanup, err := getOrCreateBlobWriter(ctx, pbs.localStore, desc) + _, err = io.CopyN(writer, remoteReader, desc.Size) + if err != nil { + return distribution.Descriptor{}, err + } + + proxyMetrics.BlobPush(uint64(desc.Size)) + + return desc, nil +} + +func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) { + localDesc, err := pbs.localStore.Stat(ctx, dgst) + if err != nil { + // Stat can report a zero sized file here if it's checked between creation + // and population. Return nil error, and continue + return false, nil + } + + if err == nil { + proxyMetrics.BlobPush(uint64(localDesc.Size)) + return true, pbs.localStore.ServeBlob(ctx, w, r, dgst) + } + + return false, nil + +} + +func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error { + defer func() { + mu.Lock() + delete(inflight, dgst) + mu.Unlock() + }() + + var desc distribution.Descriptor + var err error + var bw distribution.BlobWriter + + bw, err = pbs.localStore.Create(ctx) if err != nil { return err } - defer cleanup() - if isNew { - go func() { - err := streamToStorage(ctx, remoteReader, desc, bw) - if err != nil { - context.GetLogger(ctx).Error(err) - } + desc, err = pbs.copyContent(ctx, dgst, bw) + if err != nil { + return err + } - proxyMetrics.BlobPull(uint64(desc.Size)) - }() - err := streamToClient(ctx, w, desc, bw) - if err != nil { - return err - } + _, err = bw.Commit(ctx, desc) + if err != nil { + return err + } - proxyMetrics.BlobPush(uint64(desc.Size)) - pbs.scheduler.AddBlob(dgst.String(), blobTTL) + return nil +} + +func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { + served, err := pbs.serveLocal(ctx, w, r, dgst) + if err != nil { + context.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error()) + return err + } + + if served { return nil } - err = streamToClient(ctx, w, desc, bw) - if err != nil { - return err - } - proxyMetrics.BlobPush(uint64(desc.Size)) - return nil -} - -type cleanupFunc func() - -// getOrCreateBlobWriter will track which blobs are currently being downloaded and enable client requesting -// the same blob concurrently to read from the existing stream. -func getOrCreateBlobWriter(ctx context.Context, blobs distribution.BlobService, desc distribution.Descriptor) (distribution.BlobWriter, bool, cleanupFunc, error) { mu.Lock() - defer mu.Unlock() - dgst := desc.Digest - - cleanup := func() { - mu.Lock() - defer mu.Unlock() - inflight[dgst].refCount-- - - if inflight[dgst].refCount == 0 { - defer delete(inflight, dgst) - _, err := inflight[dgst].bw.Commit(ctx, desc) - if err != nil { - // There is a narrow race here where Commit can be called while this blob's TTL is expiring - // and its being removed from storage. In that case, the client stream will continue - // uninterruped and the blob will be pulled through on the next request, so just log it - context.GetLogger(ctx).Errorf("Error committing blob: %q", err) - } - - } - } - - var bw distribution.BlobWriter _, ok := inflight[dgst] if ok { - bw = inflight[dgst].bw - inflight[dgst].refCount++ - return bw, false, cleanup, nil + mu.Unlock() + _, err := pbs.copyContent(ctx, dgst, w) + return err } + inflight[dgst] = struct{}{} + mu.Unlock() - var err error - bw, err = blobs.Create(ctx) - if err != nil { - return nil, false, nil, err - } + go func(dgst digest.Digest) { + if err := pbs.storeLocal(ctx, dgst); err != nil { + context.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error()) + } + pbs.scheduler.AddBlob(dgst.String(), repositoryTTL) + }(dgst) - inflight[dgst] = &inflightBlob{refCount: 1, bw: bw} - return bw, true, cleanup, nil -} - -func streamToStorage(ctx context.Context, remoteReader distribution.ReadSeekCloser, desc distribution.Descriptor, bw distribution.BlobWriter) error { - _, err := io.CopyN(bw, remoteReader, desc.Size) + _, err = pbs.copyContent(ctx, dgst, w) if err != nil { return err } - return nil } -func streamToClient(ctx context.Context, w http.ResponseWriter, desc distribution.Descriptor, bw distribution.BlobWriter) error { - setResponseHeaders(w, desc.Size, desc.MediaType, desc.Digest) - - reader, err := bw.Reader() - if err != nil { - return err - } - defer reader.Close() - teeReader := io.TeeReader(reader, w) - buf := make([]byte, 32768, 32786) - var soFar int64 - for { - rd, err := teeReader.Read(buf) - if err == nil || err == io.EOF { - soFar += int64(rd) - if soFar < desc.Size { - // buffer underflow, keep trying - continue - } - return nil - } - return err - } -} - -func (pbs proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { +func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { desc, err := pbs.localStore.Stat(ctx, dgst) if err == nil { return desc, err @@ -189,26 +157,26 @@ func (pbs proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distrib } // Unsupported functions -func (pbs proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { +func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { return distribution.Descriptor{}, distribution.ErrUnsupported } -func (pbs proxyBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { +func (pbs *proxyBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { return nil, distribution.ErrUnsupported } -func (pbs proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { +func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { return nil, distribution.ErrUnsupported } -func (pbs proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { +func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { return nil, distribution.ErrUnsupported } -func (pbs proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { +func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { return nil, distribution.ErrUnsupported } -func (pbs proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { +func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { return distribution.ErrUnsupported } diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go index f8845ed34..a88fd8b37 100644 --- a/registry/proxy/proxyblobstore_test.go +++ b/registry/proxy/proxyblobstore_test.go @@ -1,10 +1,13 @@ package proxy import ( - "fmt" + "io/ioutil" + "math/rand" "net/http" "net/http/httptest" + "sync" "testing" + "time" "github.com/docker/distribution" "github.com/docker/distribution/context" @@ -12,75 +15,119 @@ import ( "github.com/docker/distribution/registry/proxy/scheduler" "github.com/docker/distribution/registry/storage" "github.com/docker/distribution/registry/storage/cache/memory" + "github.com/docker/distribution/registry/storage/driver/filesystem" "github.com/docker/distribution/registry/storage/driver/inmemory" ) +var sbsMu sync.Mutex + type statsBlobStore struct { stats map[string]int blobs distribution.BlobStore } func (sbs statsBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { + sbsMu.Lock() sbs.stats["put"]++ + sbsMu.Unlock() + return sbs.blobs.Put(ctx, mediaType, p) } func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { + sbsMu.Lock() sbs.stats["get"]++ + sbsMu.Unlock() + return sbs.blobs.Get(ctx, dgst) } func (sbs statsBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { + sbsMu.Lock() sbs.stats["create"]++ + sbsMu.Unlock() + return sbs.blobs.Create(ctx) } func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { + sbsMu.Lock() sbs.stats["resume"]++ + sbsMu.Unlock() + return sbs.blobs.Resume(ctx, id) } func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { + sbsMu.Lock() sbs.stats["open"]++ + sbsMu.Unlock() + return sbs.blobs.Open(ctx, dgst) } func (sbs statsBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { + sbsMu.Lock() sbs.stats["serveblob"]++ + sbsMu.Unlock() + return sbs.blobs.ServeBlob(ctx, w, r, dgst) } func (sbs statsBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + + sbsMu.Lock() sbs.stats["stat"]++ + sbsMu.Unlock() + return sbs.blobs.Stat(ctx, dgst) } func (sbs statsBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { + sbsMu.Lock() sbs.stats["delete"]++ + sbsMu.Unlock() + return sbs.blobs.Delete(ctx, dgst) } type testEnv struct { - inRemote []distribution.Descriptor - store proxyBlobStore - ctx context.Context + numUnique int + inRemote []distribution.Descriptor + store proxyBlobStore + ctx context.Context } -func (te testEnv) LocalStats() *map[string]int { +func (te *testEnv) LocalStats() *map[string]int { + sbsMu.Lock() ls := te.store.localStore.(statsBlobStore).stats + sbsMu.Unlock() return &ls } -func (te testEnv) RemoteStats() *map[string]int { +func (te *testEnv) RemoteStats() *map[string]int { + sbsMu.Lock() rs := te.store.remoteStore.(statsBlobStore).stats + sbsMu.Unlock() return &rs } // Populate remote store and record the digests -func makeTestEnv(t *testing.T, name string) testEnv { +func makeTestEnv(t *testing.T, name string) *testEnv { ctx := context.Background() - localRegistry, err := storage.NewRegistry(ctx, inmemory.New(), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption) + truthDir, err := ioutil.TempDir("", "truth") + if err != nil { + t.Fatalf("unable to create tempdir: %s", err) + } + + cacheDir, err := ioutil.TempDir("", "cache") + if err != nil { + t.Fatalf("unable to create tempdir: %s", err) + } + + // todo: create a tempfile area here + localRegistry, err := storage.NewRegistry(ctx, filesystem.New(truthDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -89,7 +136,7 @@ func makeTestEnv(t *testing.T, name string) testEnv { t.Fatalf("unexpected error getting repo: %v", err) } - truthRegistry, err := storage.NewRegistry(ctx, inmemory.New(), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider())) + truthRegistry, err := storage.NewRegistry(ctx, filesystem.New(cacheDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider())) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -116,33 +163,59 @@ func makeTestEnv(t *testing.T, name string) testEnv { scheduler: s, } - te := testEnv{ + te := &testEnv{ store: proxyBlobStore, ctx: ctx, } return te } -func populate(t *testing.T, te *testEnv, blobCount int) { - var inRemote []distribution.Descriptor - for i := 0; i < blobCount; i++ { - bytes := []byte(fmt.Sprintf("blob%d", i)) +func makeBlob(size int) []byte { + blob := make([]byte, size, size) + for i := 0; i < size; i++ { + blob[i] = byte('A' + rand.Int()%48) + } + return blob +} - desc, err := te.store.remoteStore.Put(te.ctx, "", bytes) - if err != nil { - t.Errorf("Put in store") +func init() { + rand.Seed(42) +} + +func perm(m []distribution.Descriptor) []distribution.Descriptor { + for i := 0; i < len(m); i++ { + j := rand.Intn(i + 1) + tmp := m[i] + m[i] = m[j] + m[j] = tmp + } + return m +} + +func populate(t *testing.T, te *testEnv, blobCount, size, numUnique int) { + var inRemote []distribution.Descriptor + + for i := 0; i < numUnique; i++ { + bytes := makeBlob(size) + for j := 0; j < blobCount/numUnique; j++ { + desc, err := te.store.remoteStore.Put(te.ctx, "", bytes) + if err != nil { + t.Fatalf("Put in store") + } + + inRemote = append(inRemote, desc) } - inRemote = append(inRemote, desc) } te.inRemote = inRemote - + te.numUnique = numUnique } func TestProxyStoreStat(t *testing.T) { te := makeTestEnv(t, "foo/bar") + remoteBlobCount := 1 - populate(t, &te, remoteBlobCount) + populate(t, te, remoteBlobCount, 10, 1) localStats := te.LocalStats() remoteStats := te.RemoteStats() @@ -164,43 +237,91 @@ func TestProxyStoreStat(t *testing.T) { } } -func TestProxyStoreServe(t *testing.T) { +func TestProxyStoreServeHighConcurrency(t *testing.T) { te := makeTestEnv(t, "foo/bar") - remoteBlobCount := 1 - populate(t, &te, remoteBlobCount) + blobSize := 200 + blobCount := 10 + numUnique := 1 + populate(t, te, blobCount, blobSize, numUnique) + numClients := 16 + testProxyStoreServe(t, te, numClients) +} + +func TestProxyStoreServeMany(t *testing.T) { + te := makeTestEnv(t, "foo/bar") + blobSize := 200 + blobCount := 10 + numUnique := 4 + populate(t, te, blobCount, blobSize, numUnique) + + numClients := 4 + testProxyStoreServe(t, te, numClients) +} + +// todo(richardscothern): blobCount must be smaller than num clients +func TestProxyStoreServeBig(t *testing.T) { + te := makeTestEnv(t, "foo/bar") + + blobSize := 2 << 20 + blobCount := 4 + numUnique := 2 + populate(t, te, blobCount, blobSize, numUnique) + + numClients := 4 + testProxyStoreServe(t, te, numClients) +} + +// testProxyStoreServe will create clients to consume all blobs +// populated in the truth store +func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) { localStats := te.LocalStats() remoteStats := te.RemoteStats() - // Serveblob - pulls through blobs - for _, dr := range te.inRemote { - w := httptest.NewRecorder() - r, err := http.NewRequest("GET", "", nil) - if err != nil { - t.Fatal(err) - } + var wg sync.WaitGroup - err = te.store.ServeBlob(te.ctx, w, r, dr.Digest) - if err != nil { - t.Fatalf(err.Error()) - } + for i := 0; i < numClients; i++ { + // Serveblob - pulls through blobs + wg.Add(1) + go func() { + defer wg.Done() + for _, remoteBlob := range te.inRemote { + w := httptest.NewRecorder() + r, err := http.NewRequest("GET", "", nil) + if err != nil { + t.Fatal(err) + } - dl, err := digest.FromBytes(w.Body.Bytes()) - if err != nil { - t.Fatalf("Error making digest from blob") - } - if dl != dr.Digest { - t.Errorf("Mismatching blob fetch from proxy") - } + err = te.store.ServeBlob(te.ctx, w, r, remoteBlob.Digest) + if err != nil { + t.Fatalf(err.Error()) + } + + bodyBytes := w.Body.Bytes() + localDigest, err := digest.FromBytes(bodyBytes) + if err != nil { + t.Fatalf("Error making digest from blob") + } + if localDigest != remoteBlob.Digest { + t.Fatalf("Mismatching blob fetch from proxy") + } + } + }() } - if (*localStats)["stat"] != remoteBlobCount && (*localStats)["create"] != remoteBlobCount { - t.Fatalf("unexpected local stats") - } - if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount { - t.Fatalf("unexpected local stats") + wg.Wait() + + remoteBlobCount := len(te.inRemote) + if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique { + t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount) } + // Wait for any async storage goroutines to finish + time.Sleep(3 * time.Second) + + remoteStatCount := (*remoteStats)["stat"] + remoteOpenCount := (*remoteStats)["open"] + // Serveblob - blobs come from local for _, dr := range te.inRemote { w := httptest.NewRecorder() @@ -223,15 +344,11 @@ func TestProxyStoreServe(t *testing.T) { } } - // Stat to find local, but no new blobs were created - if (*localStats)["stat"] != remoteBlobCount*2 && (*localStats)["create"] != remoteBlobCount*2 { - t.Fatalf("unexpected local stats") - } + localStats = te.LocalStats() + remoteStats = te.RemoteStats() - // Remote unchanged - if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount { - fmt.Printf("\tlocal=%#v, \n\tremote=%#v\n", localStats, remoteStats) - t.Fatalf("unexpected local stats") + // Ensure remote unchanged + if (*remoteStats)["stat"] != remoteStatCount && (*remoteStats)["open"] != remoteOpenCount { + t.Fatalf("unexpected remote stats: %#v", remoteStats) } - } diff --git a/registry/proxy/proxyregistry.go b/registry/proxy/proxyregistry.go index e9dec2f70..8a5f5ef6d 100644 --- a/registry/proxy/proxyregistry.go +++ b/registry/proxy/proxyregistry.go @@ -94,7 +94,7 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name string) (distri } return &proxiedRepository{ - blobStore: proxyBlobStore{ + blobStore: &proxyBlobStore{ localStore: localRepo.Blobs(ctx), remoteStore: remoteRepo.Blobs(ctx), scheduler: pr.scheduler,