avoid redundant blob fetching
Signed-off-by: baojiangnan <baojn1998@163.com>
This commit is contained in:
parent
02e2231e60
commit
abfc675939
2 changed files with 52 additions and 40 deletions
|
@ -76,23 +76,26 @@ func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter
|
|||
return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error {
|
||||
defer func() {
|
||||
mu.Lock()
|
||||
delete(inflight, dgst)
|
||||
mu.Unlock()
|
||||
}()
|
||||
|
||||
var desc distribution.Descriptor
|
||||
var err error
|
||||
var bw distribution.BlobWriter
|
||||
|
||||
bw, err = pbs.localStore.Create(ctx)
|
||||
func (pbs *proxyBlobStore) serveRemote(ctx context.Context, w http.ResponseWriter, dgst digest.Digest) error {
|
||||
bw, err := pbs.localStore.Create(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
desc, err = pbs.copyContent(ctx, dgst, bw)
|
||||
desc, err := pbs.remoteStore.Stat(ctx, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer remoteReader.Close()
|
||||
|
||||
// use TeeReader to avoid redundant blob fetching.
|
||||
teeReader := io.TeeReader(remoteReader, bw)
|
||||
_, err = io.CopyN(w, teeReader, desc.Size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -102,7 +105,14 @@ func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) e
|
|||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
|
||||
if err != nil {
|
||||
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
pbs.scheduler.AddBlob(blobRef, repositoryTTL)
|
||||
return err
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
|
||||
|
@ -130,31 +140,13 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter,
|
|||
inflight[dgst] = struct{}{}
|
||||
mu.Unlock()
|
||||
|
||||
// storeLocalCtx will be independent with ctx, because ctx it used to fetch remote image.
|
||||
// There would be a situation, that is pulling remote bytes ends before pbs.storeLocal( 'Copy', 'Commit' ...)
|
||||
// Then the registry fails to cache the layer, even though the layer had been served to client.
|
||||
storeLocalCtx, cancel := context.WithCancel(context.Background())
|
||||
go func(dgst digest.Digest) {
|
||||
defer cancel()
|
||||
if err := pbs.storeLocal(storeLocalCtx, dgst); err != nil {
|
||||
dcontext.GetLogger(storeLocalCtx).Errorf("Error committing to storage: %s", err.Error())
|
||||
}
|
||||
defer func() {
|
||||
mu.Lock()
|
||||
delete(inflight, dgst)
|
||||
mu.Unlock()
|
||||
}()
|
||||
|
||||
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
|
||||
if err != nil {
|
||||
dcontext.GetLogger(storeLocalCtx).Errorf("Error creating reference: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
pbs.scheduler.AddBlob(blobRef, repositoryTTL)
|
||||
}(dgst)
|
||||
|
||||
_, err = pbs.copyContent(ctx, dgst, w)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return pbs.serveRemote(ctx, w, dgst)
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
|
|
|
@ -329,6 +329,12 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
|
|||
remoteStats := te.RemoteStats()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var descHitMap = map[digest.Digest]bool{}
|
||||
var hitLock sync.Mutex
|
||||
|
||||
for _, remoteBlob := range te.inRemote {
|
||||
descHitMap[remoteBlob.Digest] = true
|
||||
}
|
||||
|
||||
for i := 0; i < numClients; i++ {
|
||||
// Serveblob - pulls through blobs
|
||||
|
@ -355,6 +361,15 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
|
|||
t.Errorf("Mismatching blob fetch from proxy")
|
||||
return
|
||||
}
|
||||
|
||||
desc, err := te.store.localStore.Stat(te.ctx, remoteBlob.Digest)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
hitLock.Lock()
|
||||
delete(descHitMap, desc.Digest)
|
||||
hitLock.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -364,11 +379,16 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
|
|||
t.FailNow()
|
||||
}
|
||||
|
||||
if len(descHitMap) > 0 {
|
||||
t.Errorf("Expected hit cache at least once, but it turns out that no caches was hit")
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
remoteBlobCount := len(te.inRemote)
|
||||
sbsMu.Lock()
|
||||
if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique {
|
||||
if (*localStats)["stat"] != remoteBlobCount*numClients*2 && (*localStats)["create"] != te.numUnique {
|
||||
sbsMu.Unlock()
|
||||
t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount)
|
||||
t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount, "Got: stat:", (*localStats)["stat"], "create:", (*localStats)["create"])
|
||||
}
|
||||
sbsMu.Unlock()
|
||||
|
||||
|
|
Loading…
Reference in a new issue