forked from TrueCloudLab/distribution
Merge pull request #3569 from justadogistaken/optimize/avoid-redundant-blob-fetching
optimize: avoid redundant blob fetching
This commit is contained in:
commit
42ce5d4d51
2 changed files with 55 additions and 54 deletions
|
@ -79,35 +79,6 @@ func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter
|
||||||
return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
|
return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error {
|
|
||||||
defer func() {
|
|
||||||
mu.Lock()
|
|
||||||
delete(inflight, dgst)
|
|
||||||
mu.Unlock()
|
|
||||||
}()
|
|
||||||
|
|
||||||
var desc distribution.Descriptor
|
|
||||||
var err error
|
|
||||||
var bw distribution.BlobWriter
|
|
||||||
|
|
||||||
bw, err = pbs.localStore.Create(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
desc, err = pbs.copyContent(ctx, dgst, bw)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = bw.Commit(ctx, desc)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
|
func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
|
||||||
served, err := pbs.serveLocal(ctx, w, r, dgst)
|
served, err := pbs.serveLocal(ctx, w, r, dgst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -126,6 +97,9 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter,
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
_, ok := inflight[dgst]
|
_, ok := inflight[dgst]
|
||||||
if ok {
|
if ok {
|
||||||
|
// If the blob has been serving in other requests.
|
||||||
|
// Will return the blob from the remote store directly.
|
||||||
|
// TODO Maybe we could reuse the these blobs are serving remotely and caching locally.
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
_, err := pbs.copyContent(ctx, dgst, w)
|
_, err := pbs.copyContent(ctx, dgst, w)
|
||||||
return err
|
return err
|
||||||
|
@ -133,33 +107,40 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter,
|
||||||
inflight[dgst] = struct{}{}
|
inflight[dgst] = struct{}{}
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
|
|
||||||
// storeLocalCtx will be independent with ctx, because ctx it used to fetch remote image.
|
defer func() {
|
||||||
// There would be a situation, that is pulling remote bytes ends before pbs.storeLocal( 'Copy', 'Commit' ...)
|
mu.Lock()
|
||||||
// Then the registry fails to cache the layer, even though the layer had been served to client.
|
delete(inflight, dgst)
|
||||||
storeLocalCtx, cancel := context.WithCancel(context.Background())
|
mu.Unlock()
|
||||||
go func(dgst digest.Digest) {
|
}()
|
||||||
defer cancel()
|
|
||||||
if err := pbs.storeLocal(storeLocalCtx, dgst); err != nil {
|
|
||||||
dcontext.GetLogger(storeLocalCtx).Errorf("Error committing to storage: %s", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
|
bw, err := pbs.localStore.Create(ctx)
|
||||||
if err != nil {
|
|
||||||
dcontext.GetLogger(storeLocalCtx).Errorf("Error creating reference: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if pbs.scheduler != nil && pbs.ttl != nil {
|
|
||||||
pbs.scheduler.AddBlob(blobRef, *pbs.ttl)
|
|
||||||
}
|
|
||||||
|
|
||||||
}(dgst)
|
|
||||||
|
|
||||||
_, err = pbs.copyContent(ctx, dgst, w)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Serving client and storing locally over same fetching request.
|
||||||
|
// This can prevent a redundant blob fetching.
|
||||||
|
multiWriter := io.MultiWriter(w, bw)
|
||||||
|
desc, err := pbs.copyContent(ctx, dgst, multiWriter)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = bw.Commit(ctx, desc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
|
||||||
|
if err != nil {
|
||||||
|
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if pbs.scheduler != nil && pbs.ttl != nil {
|
||||||
|
pbs.scheduler.AddBlob(blobRef, *pbs.ttl)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -336,6 +336,12 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
|
||||||
remoteStats := te.RemoteStats()
|
remoteStats := te.RemoteStats()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
var descHitMap = map[digest.Digest]bool{}
|
||||||
|
var hitLock sync.Mutex
|
||||||
|
|
||||||
|
for _, remoteBlob := range te.inRemote {
|
||||||
|
descHitMap[remoteBlob.Digest] = true
|
||||||
|
}
|
||||||
|
|
||||||
for i := 0; i < numClients; i++ {
|
for i := 0; i < numClients; i++ {
|
||||||
// Serveblob - pulls through blobs
|
// Serveblob - pulls through blobs
|
||||||
|
@ -362,6 +368,15 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
|
||||||
t.Errorf("Mismatching blob fetch from proxy")
|
t.Errorf("Mismatching blob fetch from proxy")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
desc, err := te.store.localStore.Stat(te.ctx, remoteBlob.Digest)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
hitLock.Lock()
|
||||||
|
delete(descHitMap, desc.Digest)
|
||||||
|
hitLock.Unlock()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -371,11 +386,16 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(descHitMap) > 0 {
|
||||||
|
t.Errorf("Expected hit cache at least once, but it turns out that no caches was hit")
|
||||||
|
t.FailNow()
|
||||||
|
}
|
||||||
|
|
||||||
remoteBlobCount := len(te.inRemote)
|
remoteBlobCount := len(te.inRemote)
|
||||||
sbsMu.Lock()
|
sbsMu.Lock()
|
||||||
if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique {
|
if (*localStats)["stat"] != remoteBlobCount*numClients*2 && (*localStats)["create"] != te.numUnique {
|
||||||
sbsMu.Unlock()
|
sbsMu.Unlock()
|
||||||
t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount)
|
t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount, "Got: stat:", (*localStats)["stat"], "create:", (*localStats)["create"])
|
||||||
}
|
}
|
||||||
sbsMu.Unlock()
|
sbsMu.Unlock()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue