From abfc67593937ce3aef8a79d7aecf1a2edb4d0a67 Mon Sep 17 00:00:00 2001 From: baojiangnan Date: Thu, 20 Jan 2022 11:52:34 +0800 Subject: [PATCH] avoid redundant blob fetching Signed-off-by: baojiangnan --- registry/proxy/proxyblobstore.go | 68 ++++++++++++--------------- registry/proxy/proxyblobstore_test.go | 24 +++++++++- 2 files changed, 52 insertions(+), 40 deletions(-) diff --git a/registry/proxy/proxyblobstore.go b/registry/proxy/proxyblobstore.go index 43f58741c..b832ea273 100644 --- a/registry/proxy/proxyblobstore.go +++ b/registry/proxy/proxyblobstore.go @@ -76,23 +76,26 @@ func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter return true, pbs.localStore.ServeBlob(ctx, w, r, dgst) } -func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error { - defer func() { - mu.Lock() - delete(inflight, dgst) - mu.Unlock() - }() - - var desc distribution.Descriptor - var err error - var bw distribution.BlobWriter - - bw, err = pbs.localStore.Create(ctx) +func (pbs *proxyBlobStore) serveRemote(ctx context.Context, w http.ResponseWriter, dgst digest.Digest) error { + bw, err := pbs.localStore.Create(ctx) if err != nil { return err } - desc, err = pbs.copyContent(ctx, dgst, bw) + desc, err := pbs.remoteStore.Stat(ctx, dgst) + if err != nil { + return err + } + + remoteReader, err := pbs.remoteStore.Open(ctx, dgst) + if err != nil { + return err + } + defer remoteReader.Close() + + // use TeeReader to avoid redundant blob fetching. + teeReader := io.TeeReader(remoteReader, bw) + _, err = io.CopyN(w, teeReader, desc.Size) if err != nil { return err } @@ -102,7 +105,14 @@ func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) e return err } - return nil + blobRef, err := reference.WithDigest(pbs.repositoryName, dgst) + if err != nil { + dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err) + return err + } + + pbs.scheduler.AddBlob(blobRef, repositoryTTL) + return err } func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { @@ -130,31 +140,13 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, inflight[dgst] = struct{}{} mu.Unlock() - // storeLocalCtx will be independent with ctx, because ctx it used to fetch remote image. - // There would be a situation, that is pulling remote bytes ends before pbs.storeLocal( 'Copy', 'Commit' ...) - // Then the registry fails to cache the layer, even though the layer had been served to client. - storeLocalCtx, cancel := context.WithCancel(context.Background()) - go func(dgst digest.Digest) { - defer cancel() - if err := pbs.storeLocal(storeLocalCtx, dgst); err != nil { - dcontext.GetLogger(storeLocalCtx).Errorf("Error committing to storage: %s", err.Error()) - } + defer func() { + mu.Lock() + delete(inflight, dgst) + mu.Unlock() + }() - blobRef, err := reference.WithDigest(pbs.repositoryName, dgst) - if err != nil { - dcontext.GetLogger(storeLocalCtx).Errorf("Error creating reference: %s", err) - return - } - - pbs.scheduler.AddBlob(blobRef, repositoryTTL) - }(dgst) - - _, err = pbs.copyContent(ctx, dgst, w) - if err != nil { - cancel() - return err - } - return nil + return pbs.serveRemote(ctx, w, dgst) } func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go index 2ba26738f..255ea1464 100644 --- a/registry/proxy/proxyblobstore_test.go +++ b/registry/proxy/proxyblobstore_test.go @@ -329,6 +329,12 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) { remoteStats := te.RemoteStats() var wg sync.WaitGroup + var descHitMap = map[digest.Digest]bool{} + var hitLock sync.Mutex + + for _, remoteBlob := range te.inRemote { + descHitMap[remoteBlob.Digest] = true + } for i := 0; i < numClients; i++ { // Serveblob - pulls through blobs @@ -355,6 +361,15 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) { t.Errorf("Mismatching blob fetch from proxy") return } + + desc, err := te.store.localStore.Stat(te.ctx, remoteBlob.Digest) + if err != nil { + continue + } + + hitLock.Lock() + delete(descHitMap, desc.Digest) + hitLock.Unlock() } }() } @@ -364,11 +379,16 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) { t.FailNow() } + if len(descHitMap) > 0 { + t.Errorf("Expected hit cache at least once, but it turns out that no caches was hit") + t.FailNow() + } + remoteBlobCount := len(te.inRemote) sbsMu.Lock() - if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique { + if (*localStats)["stat"] != remoteBlobCount*numClients*2 && (*localStats)["create"] != te.numUnique { sbsMu.Unlock() - t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount) + t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount, "Got: stat:", (*localStats)["stat"], "create:", (*localStats)["create"]) } sbsMu.Unlock()