Merge pull request #1394 from RichardScothern/invalidate-bdc

Invalidate the blob store descriptor cache
This commit is contained in:
Richard Scothern 2016-01-28 10:04:09 -08:00
commit d793822290
8 changed files with 159 additions and 69 deletions

View file

@ -18,9 +18,10 @@ import (
const blobTTL = time.Duration(24 * 7 * time.Hour) const blobTTL = time.Duration(24 * 7 * time.Hour)
type proxyBlobStore struct { type proxyBlobStore struct {
localStore distribution.BlobStore localStore distribution.BlobStore
remoteStore distribution.BlobService remoteStore distribution.BlobService
scheduler *scheduler.TTLExpirationScheduler scheduler *scheduler.TTLExpirationScheduler
repositoryName reference.Named
} }
var _ distribution.BlobStore = &proxyBlobStore{} var _ distribution.BlobStore = &proxyBlobStore{}
@ -134,7 +135,14 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter,
if err := pbs.storeLocal(ctx, dgst); err != nil { if err := pbs.storeLocal(ctx, dgst); err != nil {
context.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error()) context.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error())
} }
pbs.scheduler.AddBlob(dgst, repositoryTTL)
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
if err != nil {
context.GetLogger(ctx).Errorf("Error creating reference: %s", err)
return
}
pbs.scheduler.AddBlob(blobRef, repositoryTTL)
}(dgst) }(dgst)
_, err = pbs.copyContent(ctx, dgst, w) _, err = pbs.copyContent(ctx, dgst, w)

View file

@ -164,9 +164,10 @@ func makeTestEnv(t *testing.T, name string) *testEnv {
s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json") s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json")
proxyBlobStore := proxyBlobStore{ proxyBlobStore := proxyBlobStore{
remoteStore: truthBlobs, repositoryName: nameRef,
localStore: localBlobs, remoteStore: truthBlobs,
scheduler: s, localStore: localBlobs,
scheduler: s,
} }
te := &testEnv{ te := &testEnv{

View file

@ -62,11 +62,17 @@ func (pms proxyManifestStore) Get(ctx context.Context, dgst digest.Digest, optio
return nil, err return nil, err
} }
// Schedule the repo for removal // Schedule the manifest blob for removal
pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL) repoBlob, err := reference.WithDigest(pms.repositoryName, dgst)
if err != nil {
context.GetLogger(ctx).Errorf("Error creating reference: %s", err)
return nil, err
}
pms.scheduler.AddManifest(repoBlob, repositoryTTL)
// Ensure the manifest blob is cleaned up // Ensure the manifest blob is cleaned up
pms.scheduler.AddBlob(dgst, repositoryTTL) //pms.scheduler.AddBlob(blobRef, repositoryTTL)
} }
return manifest, err return manifest, err

View file

@ -119,6 +119,7 @@ func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestE
localManifests: localManifests, localManifests: localManifests,
remoteManifests: truthManifests, remoteManifests: truthManifests,
scheduler: s, scheduler: s,
repositoryName: nameRef,
}, },
} }
} }

View file

@ -4,6 +4,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"fmt"
"github.com/docker/distribution" "github.com/docker/distribution"
"github.com/docker/distribution/configuration" "github.com/docker/distribution/configuration"
"github.com/docker/distribution/context" "github.com/docker/distribution/context"
@ -35,13 +36,56 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name
} }
v := storage.NewVacuum(ctx, driver) v := storage.NewVacuum(ctx, driver)
s := scheduler.New(ctx, driver, "/scheduler-state.json") s := scheduler.New(ctx, driver, "/scheduler-state.json")
s.OnBlobExpire(func(digest string) error { s.OnBlobExpire(func(ref reference.Reference) error {
return v.RemoveBlob(digest) var r reference.Canonical
var ok bool
if r, ok = ref.(reference.Canonical); !ok {
return fmt.Errorf("unexpected reference type : %T", ref)
}
repo, err := registry.Repository(ctx, r)
if err != nil {
return err
}
blobs := repo.Blobs(ctx)
// Clear the repository reference and descriptor caches
err = blobs.Delete(ctx, r.Digest())
if err != nil {
return err
}
err = v.RemoveBlob(r.Digest().String())
if err != nil {
return err
}
return nil
}) })
s.OnManifestExpire(func(repoName string) error {
return v.RemoveRepository(repoName) s.OnManifestExpire(func(ref reference.Reference) error {
var r reference.Canonical
var ok bool
if r, ok = ref.(reference.Canonical); !ok {
return fmt.Errorf("unexpected reference type : %T", ref)
}
repo, err := registry.Repository(ctx, r)
if err != nil {
return err
}
manifests, err := repo.Manifests(ctx)
if err != nil {
return err
}
err = manifests.Delete(ctx, r.Digest())
if err != nil {
return err
}
return nil
}) })
err = s.Start() err = s.Start()
@ -97,11 +141,12 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named
return &proxiedRepository{ return &proxiedRepository{
blobStore: &proxyBlobStore{ blobStore: &proxyBlobStore{
localStore: localRepo.Blobs(ctx), localStore: localRepo.Blobs(ctx),
remoteStore: remoteRepo.Blobs(ctx), remoteStore: remoteRepo.Blobs(ctx),
scheduler: pr.scheduler, scheduler: pr.scheduler,
repositoryName: name,
}, },
manifests: proxyManifestStore{ manifests: &proxyManifestStore{
repositoryName: name, repositoryName: name,
localManifests: localManifests, // Options? localManifests: localManifests, // Options?
remoteManifests: remoteManifests, remoteManifests: remoteManifests,
@ -109,7 +154,7 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named
scheduler: pr.scheduler, scheduler: pr.scheduler,
}, },
name: name, name: name,
tags: proxyTagService{ tags: &proxyTagService{
localTags: localRepo.Tags(ctx), localTags: localRepo.Tags(ctx),
remoteTags: remoteRepo.Tags(ctx), remoteTags: remoteRepo.Tags(ctx),
}, },

View file

@ -7,13 +7,12 @@ import (
"time" "time"
"github.com/docker/distribution/context" "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference" "github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver"
) )
// onTTLExpiryFunc is called when a repository's TTL expires // onTTLExpiryFunc is called when a repository's TTL expires
type expiryFunc func(string) error type expiryFunc func(reference.Reference) error
const ( const (
entryTypeBlob = iota entryTypeBlob = iota
@ -82,19 +81,20 @@ func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
} }
// AddBlob schedules a blob cleanup after ttl expires // AddBlob schedules a blob cleanup after ttl expires
func (ttles *TTLExpirationScheduler) AddBlob(dgst digest.Digest, ttl time.Duration) error { func (ttles *TTLExpirationScheduler) AddBlob(blobRef reference.Canonical, ttl time.Duration) error {
ttles.Lock() ttles.Lock()
defer ttles.Unlock() defer ttles.Unlock()
if ttles.stopped { if ttles.stopped {
return fmt.Errorf("scheduler not started") return fmt.Errorf("scheduler not started")
} }
ttles.add(dgst.String(), ttl, entryTypeBlob)
ttles.add(blobRef, ttl, entryTypeBlob)
return nil return nil
} }
// AddManifest schedules a manifest cleanup after ttl expires // AddManifest schedules a manifest cleanup after ttl expires
func (ttles *TTLExpirationScheduler) AddManifest(repoName reference.Named, ttl time.Duration) error { func (ttles *TTLExpirationScheduler) AddManifest(manifestRef reference.Canonical, ttl time.Duration) error {
ttles.Lock() ttles.Lock()
defer ttles.Unlock() defer ttles.Unlock()
@ -102,7 +102,7 @@ func (ttles *TTLExpirationScheduler) AddManifest(repoName reference.Named, ttl t
return fmt.Errorf("scheduler not started") return fmt.Errorf("scheduler not started")
} }
ttles.add(repoName.Name(), ttl, entryTypeManifest) ttles.add(manifestRef, ttl, entryTypeManifest)
return nil return nil
} }
@ -156,17 +156,17 @@ func (ttles *TTLExpirationScheduler) Start() error {
return nil return nil
} }
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) { func (ttles *TTLExpirationScheduler) add(r reference.Reference, ttl time.Duration, eType int) {
entry := &schedulerEntry{ entry := &schedulerEntry{
Key: key, Key: r.String(),
Expiry: time.Now().Add(ttl), Expiry: time.Now().Add(ttl),
EntryType: eType, EntryType: eType,
} }
context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now())) context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
if oldEntry, present := ttles.entries[key]; present && oldEntry.timer != nil { if oldEntry, present := ttles.entries[entry.Key]; present && oldEntry.timer != nil {
oldEntry.timer.Stop() oldEntry.timer.Stop()
} }
ttles.entries[key] = entry ttles.entries[entry.Key] = entry
entry.timer = ttles.startTimer(entry, ttl) entry.timer = ttles.startTimer(entry, ttl)
ttles.indexDirty = true ttles.indexDirty = true
} }
@ -184,13 +184,18 @@ func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.
case entryTypeManifest: case entryTypeManifest:
f = ttles.onManifestExpire f = ttles.onManifestExpire
default: default:
f = func(repoName string) error { f = func(reference.Reference) error {
return fmt.Errorf("Unexpected scheduler entry type") return fmt.Errorf("scheduler entry type")
} }
} }
if err := f(entry.Key); err != nil { ref, err := reference.Parse(entry.Key)
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err) if err == nil {
if err := f(ref); err != nil {
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
}
} else {
context.GetLogger(ttles.ctx).Errorf("Error unpacking reference: %s", err)
} }
delete(ttles.entries, entry.Key) delete(ttles.entries, entry.Key)
@ -249,6 +254,5 @@ func (ttles *TTLExpirationScheduler) readState() error {
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }

View file

@ -6,28 +6,49 @@ import (
"time" "time"
"github.com/docker/distribution/context" "github.com/docker/distribution/context"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage/driver/inmemory" "github.com/docker/distribution/registry/storage/driver/inmemory"
) )
func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference.Reference) {
ref1, err := reference.Parse("testrepo@sha256:aaaaeaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
if err != nil {
t.Fatalf("could not parse reference: %v", err)
}
ref2, err := reference.Parse("testrepo@sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
if err != nil {
t.Fatalf("could not parse reference: %v", err)
}
ref3, err := reference.Parse("testrepo@sha256:cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
if err != nil {
t.Fatalf("could not parse reference: %v", err)
}
return ref1, ref2, ref3
}
func TestSchedule(t *testing.T) { func TestSchedule(t *testing.T) {
ref1, ref2, ref3 := testRefs(t)
timeUnit := time.Millisecond timeUnit := time.Millisecond
remainingRepos := map[string]bool{ remainingRepos := map[string]bool{
"testBlob1": true, ref1.String(): true,
"testBlob2": true, ref2.String(): true,
"ch00": true, ref3.String(): true,
} }
s := New(context.Background(), inmemory.New(), "/ttl") s := New(context.Background(), inmemory.New(), "/ttl")
deleteFunc := func(repoName string) error { deleteFunc := func(repoName reference.Reference) error {
if len(remainingRepos) == 0 { if len(remainingRepos) == 0 {
t.Fatalf("Incorrect expiry count") t.Fatalf("Incorrect expiry count")
} }
_, ok := remainingRepos[repoName] _, ok := remainingRepos[repoName.String()]
if !ok { if !ok {
t.Fatalf("Trying to remove nonexistant repo: %s", repoName) t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
} }
t.Log("removing", repoName) t.Log("removing", repoName)
delete(remainingRepos, repoName) delete(remainingRepos, repoName.String())
return nil return nil
} }
@ -37,11 +58,11 @@ func TestSchedule(t *testing.T) {
t.Fatalf("Error starting ttlExpirationScheduler: %s", err) t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
} }
s.add("testBlob1", 3*timeUnit, entryTypeBlob) s.add(ref1, 3*timeUnit, entryTypeBlob)
s.add("testBlob2", 1*timeUnit, entryTypeBlob) s.add(ref2, 1*timeUnit, entryTypeBlob)
func() { func() {
s.add("ch00", 1*timeUnit, entryTypeBlob) s.add(ref3, 1*timeUnit, entryTypeBlob)
}() }()
@ -53,33 +74,34 @@ func TestSchedule(t *testing.T) {
} }
func TestRestoreOld(t *testing.T) { func TestRestoreOld(t *testing.T) {
ref1, ref2, _ := testRefs(t)
remainingRepos := map[string]bool{ remainingRepos := map[string]bool{
"testBlob1": true, ref1.String(): true,
"oldRepo": true, ref2.String(): true,
} }
deleteFunc := func(repoName string) error { deleteFunc := func(r reference.Reference) error {
if repoName == "oldRepo" && len(remainingRepos) == 3 { if r.String() == ref1.String() && len(remainingRepos) == 2 {
t.Errorf("oldRepo should be removed first") t.Errorf("ref1 should be removed first")
} }
_, ok := remainingRepos[repoName] _, ok := remainingRepos[r.String()]
if !ok { if !ok {
t.Fatalf("Trying to remove nonexistant repo: %s", repoName) t.Fatalf("Trying to remove nonexistant repo: %s", r)
} }
delete(remainingRepos, repoName) delete(remainingRepos, r.String())
return nil return nil
} }
timeUnit := time.Millisecond timeUnit := time.Millisecond
serialized, err := json.Marshal(&map[string]schedulerEntry{ serialized, err := json.Marshal(&map[string]schedulerEntry{
"testBlob1": { ref1.String(): {
Expiry: time.Now().Add(1 * timeUnit), Expiry: time.Now().Add(1 * timeUnit),
Key: "testBlob1", Key: ref1.String(),
EntryType: 0, EntryType: 0,
}, },
"oldRepo": { ref2.String(): {
Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first
Key: "oldRepo", Key: ref2.String(),
EntryType: 0, EntryType: 0,
}, },
}) })
@ -108,13 +130,16 @@ func TestRestoreOld(t *testing.T) {
} }
func TestStopRestore(t *testing.T) { func TestStopRestore(t *testing.T) {
ref1, ref2, _ := testRefs(t)
timeUnit := time.Millisecond timeUnit := time.Millisecond
remainingRepos := map[string]bool{ remainingRepos := map[string]bool{
"testBlob1": true, ref1.String(): true,
"testBlob2": true, ref2.String(): true,
} }
deleteFunc := func(repoName string) error {
delete(remainingRepos, repoName) deleteFunc := func(r reference.Reference) error {
delete(remainingRepos, r.String())
return nil return nil
} }
@ -127,8 +152,8 @@ func TestStopRestore(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
s.add("testBlob1", 300*timeUnit, entryTypeBlob) s.add(ref1, 300*timeUnit, entryTypeBlob)
s.add("testBlob2", 100*timeUnit, entryTypeBlob) s.add(ref2, 100*timeUnit, entryTypeBlob)
// Start and stop before all operations complete // Start and stop before all operations complete
// state will be written to fs // state will be written to fs

View file

@ -17,6 +17,7 @@ func CheckBlobDescriptorCache(t *testing.T, provider cache.BlobDescriptorCachePr
checkBlobDescriptorCacheEmptyRepository(t, ctx, provider) checkBlobDescriptorCacheEmptyRepository(t, ctx, provider)
checkBlobDescriptorCacheSetAndRead(t, ctx, provider) checkBlobDescriptorCacheSetAndRead(t, ctx, provider)
checkBlobDescriptorCacheClear(t, ctx, provider)
} }
func checkBlobDescriptorCacheEmptyRepository(t *testing.T, ctx context.Context, provider cache.BlobDescriptorCacheProvider) { func checkBlobDescriptorCacheEmptyRepository(t *testing.T, ctx context.Context, provider cache.BlobDescriptorCacheProvider) {
@ -141,10 +142,10 @@ func checkBlobDescriptorCacheSetAndRead(t *testing.T, ctx context.Context, provi
} }
} }
func checkBlobDescriptorClear(t *testing.T, ctx context.Context, provider cache.BlobDescriptorCacheProvider) { func checkBlobDescriptorCacheClear(t *testing.T, ctx context.Context, provider cache.BlobDescriptorCacheProvider) {
localDigest := digest.Digest("sha384:abc") localDigest := digest.Digest("sha384:def111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
expected := distribution.Descriptor{ expected := distribution.Descriptor{
Digest: "sha256:abc", Digest: "sha256:def1111111111111111111111111111111111111111111111111111111111111",
Size: 10, Size: 10,
MediaType: "application/octet-stream"} MediaType: "application/octet-stream"}
@ -168,12 +169,11 @@ func checkBlobDescriptorClear(t *testing.T, ctx context.Context, provider cache.
err = cache.Clear(ctx, localDigest) err = cache.Clear(ctx, localDigest)
if err != nil { if err != nil {
t.Fatalf("unexpected error deleting descriptor") t.Error(err)
} }
nonExistantDigest := digest.Digest("sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") desc, err = cache.Stat(ctx, localDigest)
err = cache.Clear(ctx, nonExistantDigest)
if err == nil { if err == nil {
t.Fatalf("expected error deleting unknown descriptor") t.Fatalf("expected error statting deleted blob: %v", err)
} }
} }