diff --git a/registry/proxy/proxyblobstore.go b/registry/proxy/proxyblobstore.go index 278e5864d..1d7dfbc66 100644 --- a/registry/proxy/proxyblobstore.go +++ b/registry/proxy/proxyblobstore.go @@ -18,9 +18,10 @@ import ( const blobTTL = time.Duration(24 * 7 * time.Hour) type proxyBlobStore struct { - localStore distribution.BlobStore - remoteStore distribution.BlobService - scheduler *scheduler.TTLExpirationScheduler + localStore distribution.BlobStore + remoteStore distribution.BlobService + scheduler *scheduler.TTLExpirationScheduler + repositoryName reference.Named } var _ distribution.BlobStore = &proxyBlobStore{} @@ -134,7 +135,14 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, if err := pbs.storeLocal(ctx, dgst); err != nil { context.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error()) } - pbs.scheduler.AddBlob(dgst, repositoryTTL) + + blobRef, err := reference.WithDigest(pbs.repositoryName, dgst) + if err != nil { + context.GetLogger(ctx).Errorf("Error creating reference: %s", err) + return + } + + pbs.scheduler.AddBlob(blobRef, repositoryTTL) }(dgst) _, err = pbs.copyContent(ctx, dgst, w) diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go index 978f878ef..3054ef0b8 100644 --- a/registry/proxy/proxyblobstore_test.go +++ b/registry/proxy/proxyblobstore_test.go @@ -164,9 +164,10 @@ func makeTestEnv(t *testing.T, name string) *testEnv { s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json") proxyBlobStore := proxyBlobStore{ - remoteStore: truthBlobs, - localStore: localBlobs, - scheduler: s, + repositoryName: nameRef, + remoteStore: truthBlobs, + localStore: localBlobs, + scheduler: s, } te := &testEnv{ diff --git a/registry/proxy/proxymanifeststore.go b/registry/proxy/proxymanifeststore.go index e0a5ac280..0b5532d47 100644 --- a/registry/proxy/proxymanifeststore.go +++ b/registry/proxy/proxymanifeststore.go @@ -62,11 +62,17 @@ func (pms proxyManifestStore) Get(ctx context.Context, dgst digest.Digest, optio return nil, err } - // Schedule the repo for removal - pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL) + // Schedule the manifest blob for removal + repoBlob, err := reference.WithDigest(pms.repositoryName, dgst) + if err != nil { + context.GetLogger(ctx).Errorf("Error creating reference: %s", err) + return nil, err + } + pms.scheduler.AddManifest(repoBlob, repositoryTTL) // Ensure the manifest blob is cleaned up - pms.scheduler.AddBlob(dgst, repositoryTTL) + //pms.scheduler.AddBlob(blobRef, repositoryTTL) + } return manifest, err diff --git a/registry/proxy/proxymanifeststore_test.go b/registry/proxy/proxymanifeststore_test.go index 5e717bf05..00f9daf93 100644 --- a/registry/proxy/proxymanifeststore_test.go +++ b/registry/proxy/proxymanifeststore_test.go @@ -119,6 +119,7 @@ func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestE localManifests: localManifests, remoteManifests: truthManifests, scheduler: s, + repositoryName: nameRef, }, } } diff --git a/registry/proxy/proxyregistry.go b/registry/proxy/proxyregistry.go index 1b3fcf326..43c1486ec 100644 --- a/registry/proxy/proxyregistry.go +++ b/registry/proxy/proxyregistry.go @@ -4,6 +4,7 @@ import ( "net/http" "net/url" + "fmt" "github.com/docker/distribution" "github.com/docker/distribution/configuration" "github.com/docker/distribution/context" @@ -35,13 +36,56 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name } v := storage.NewVacuum(ctx, driver) - s := scheduler.New(ctx, driver, "/scheduler-state.json") - s.OnBlobExpire(func(digest string) error { - return v.RemoveBlob(digest) + s.OnBlobExpire(func(ref reference.Reference) error { + var r reference.Canonical + var ok bool + if r, ok = ref.(reference.Canonical); !ok { + return fmt.Errorf("unexpected reference type : %T", ref) + } + + repo, err := registry.Repository(ctx, r) + if err != nil { + return err + } + + blobs := repo.Blobs(ctx) + + // Clear the repository reference and descriptor caches + err = blobs.Delete(ctx, r.Digest()) + if err != nil { + return err + } + + err = v.RemoveBlob(r.Digest().String()) + if err != nil { + return err + } + + return nil }) - s.OnManifestExpire(func(repoName string) error { - return v.RemoveRepository(repoName) + + s.OnManifestExpire(func(ref reference.Reference) error { + var r reference.Canonical + var ok bool + if r, ok = ref.(reference.Canonical); !ok { + return fmt.Errorf("unexpected reference type : %T", ref) + } + + repo, err := registry.Repository(ctx, r) + if err != nil { + return err + } + + manifests, err := repo.Manifests(ctx) + if err != nil { + return err + } + err = manifests.Delete(ctx, r.Digest()) + if err != nil { + return err + } + return nil }) err = s.Start() @@ -97,11 +141,12 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named return &proxiedRepository{ blobStore: &proxyBlobStore{ - localStore: localRepo.Blobs(ctx), - remoteStore: remoteRepo.Blobs(ctx), - scheduler: pr.scheduler, + localStore: localRepo.Blobs(ctx), + remoteStore: remoteRepo.Blobs(ctx), + scheduler: pr.scheduler, + repositoryName: name, }, - manifests: proxyManifestStore{ + manifests: &proxyManifestStore{ repositoryName: name, localManifests: localManifests, // Options? remoteManifests: remoteManifests, @@ -109,7 +154,7 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named scheduler: pr.scheduler, }, name: name, - tags: proxyTagService{ + tags: &proxyTagService{ localTags: localRepo.Tags(ctx), remoteTags: remoteRepo.Tags(ctx), }, diff --git a/registry/proxy/scheduler/scheduler.go b/registry/proxy/scheduler/scheduler.go index f53349073..0c8a85348 100644 --- a/registry/proxy/scheduler/scheduler.go +++ b/registry/proxy/scheduler/scheduler.go @@ -7,13 +7,12 @@ import ( "time" "github.com/docker/distribution/context" - "github.com/docker/distribution/digest" "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/storage/driver" ) // onTTLExpiryFunc is called when a repository's TTL expires -type expiryFunc func(string) error +type expiryFunc func(reference.Reference) error const ( entryTypeBlob = iota @@ -82,19 +81,20 @@ func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) { } // AddBlob schedules a blob cleanup after ttl expires -func (ttles *TTLExpirationScheduler) AddBlob(dgst digest.Digest, ttl time.Duration) error { +func (ttles *TTLExpirationScheduler) AddBlob(blobRef reference.Canonical, ttl time.Duration) error { ttles.Lock() defer ttles.Unlock() if ttles.stopped { return fmt.Errorf("scheduler not started") } - ttles.add(dgst.String(), ttl, entryTypeBlob) + + ttles.add(blobRef, ttl, entryTypeBlob) return nil } // AddManifest schedules a manifest cleanup after ttl expires -func (ttles *TTLExpirationScheduler) AddManifest(repoName reference.Named, ttl time.Duration) error { +func (ttles *TTLExpirationScheduler) AddManifest(manifestRef reference.Canonical, ttl time.Duration) error { ttles.Lock() defer ttles.Unlock() @@ -102,7 +102,7 @@ func (ttles *TTLExpirationScheduler) AddManifest(repoName reference.Named, ttl t return fmt.Errorf("scheduler not started") } - ttles.add(repoName.Name(), ttl, entryTypeManifest) + ttles.add(manifestRef, ttl, entryTypeManifest) return nil } @@ -156,17 +156,17 @@ func (ttles *TTLExpirationScheduler) Start() error { return nil } -func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) { +func (ttles *TTLExpirationScheduler) add(r reference.Reference, ttl time.Duration, eType int) { entry := &schedulerEntry{ - Key: key, + Key: r.String(), Expiry: time.Now().Add(ttl), EntryType: eType, } context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now())) - if oldEntry, present := ttles.entries[key]; present && oldEntry.timer != nil { + if oldEntry, present := ttles.entries[entry.Key]; present && oldEntry.timer != nil { oldEntry.timer.Stop() } - ttles.entries[key] = entry + ttles.entries[entry.Key] = entry entry.timer = ttles.startTimer(entry, ttl) ttles.indexDirty = true } @@ -184,13 +184,18 @@ func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time. case entryTypeManifest: f = ttles.onManifestExpire default: - f = func(repoName string) error { - return fmt.Errorf("Unexpected scheduler entry type") + f = func(reference.Reference) error { + return fmt.Errorf("scheduler entry type") } } - if err := f(entry.Key); err != nil { - context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err) + ref, err := reference.Parse(entry.Key) + if err == nil { + if err := f(ref); err != nil { + context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err) + } + } else { + context.GetLogger(ttles.ctx).Errorf("Error unpacking reference: %s", err) } delete(ttles.entries, entry.Key) @@ -249,6 +254,5 @@ func (ttles *TTLExpirationScheduler) readState() error { if err != nil { return err } - return nil } diff --git a/registry/proxy/scheduler/scheduler_test.go b/registry/proxy/scheduler/scheduler_test.go index 00072ed2c..d4edd1b13 100644 --- a/registry/proxy/scheduler/scheduler_test.go +++ b/registry/proxy/scheduler/scheduler_test.go @@ -6,28 +6,49 @@ import ( "time" "github.com/docker/distribution/context" + "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/storage/driver/inmemory" ) +func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference.Reference) { + ref1, err := reference.Parse("testrepo@sha256:aaaaeaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + if err != nil { + t.Fatalf("could not parse reference: %v", err) + } + + ref2, err := reference.Parse("testrepo@sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + if err != nil { + t.Fatalf("could not parse reference: %v", err) + } + + ref3, err := reference.Parse("testrepo@sha256:cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc") + if err != nil { + t.Fatalf("could not parse reference: %v", err) + } + + return ref1, ref2, ref3 +} + func TestSchedule(t *testing.T) { + ref1, ref2, ref3 := testRefs(t) timeUnit := time.Millisecond remainingRepos := map[string]bool{ - "testBlob1": true, - "testBlob2": true, - "ch00": true, + ref1.String(): true, + ref2.String(): true, + ref3.String(): true, } s := New(context.Background(), inmemory.New(), "/ttl") - deleteFunc := func(repoName string) error { + deleteFunc := func(repoName reference.Reference) error { if len(remainingRepos) == 0 { t.Fatalf("Incorrect expiry count") } - _, ok := remainingRepos[repoName] + _, ok := remainingRepos[repoName.String()] if !ok { t.Fatalf("Trying to remove nonexistant repo: %s", repoName) } t.Log("removing", repoName) - delete(remainingRepos, repoName) + delete(remainingRepos, repoName.String()) return nil } @@ -37,11 +58,11 @@ func TestSchedule(t *testing.T) { t.Fatalf("Error starting ttlExpirationScheduler: %s", err) } - s.add("testBlob1", 3*timeUnit, entryTypeBlob) - s.add("testBlob2", 1*timeUnit, entryTypeBlob) + s.add(ref1, 3*timeUnit, entryTypeBlob) + s.add(ref2, 1*timeUnit, entryTypeBlob) func() { - s.add("ch00", 1*timeUnit, entryTypeBlob) + s.add(ref3, 1*timeUnit, entryTypeBlob) }() @@ -53,33 +74,34 @@ func TestSchedule(t *testing.T) { } func TestRestoreOld(t *testing.T) { + ref1, ref2, _ := testRefs(t) remainingRepos := map[string]bool{ - "testBlob1": true, - "oldRepo": true, + ref1.String(): true, + ref2.String(): true, } - deleteFunc := func(repoName string) error { - if repoName == "oldRepo" && len(remainingRepos) == 3 { - t.Errorf("oldRepo should be removed first") + deleteFunc := func(r reference.Reference) error { + if r.String() == ref1.String() && len(remainingRepos) == 2 { + t.Errorf("ref1 should be removed first") } - _, ok := remainingRepos[repoName] + _, ok := remainingRepos[r.String()] if !ok { - t.Fatalf("Trying to remove nonexistant repo: %s", repoName) + t.Fatalf("Trying to remove nonexistant repo: %s", r) } - delete(remainingRepos, repoName) + delete(remainingRepos, r.String()) return nil } timeUnit := time.Millisecond serialized, err := json.Marshal(&map[string]schedulerEntry{ - "testBlob1": { + ref1.String(): { Expiry: time.Now().Add(1 * timeUnit), - Key: "testBlob1", + Key: ref1.String(), EntryType: 0, }, - "oldRepo": { + ref2.String(): { Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first - Key: "oldRepo", + Key: ref2.String(), EntryType: 0, }, }) @@ -108,13 +130,16 @@ func TestRestoreOld(t *testing.T) { } func TestStopRestore(t *testing.T) { + ref1, ref2, _ := testRefs(t) + timeUnit := time.Millisecond remainingRepos := map[string]bool{ - "testBlob1": true, - "testBlob2": true, + ref1.String(): true, + ref2.String(): true, } - deleteFunc := func(repoName string) error { - delete(remainingRepos, repoName) + + deleteFunc := func(r reference.Reference) error { + delete(remainingRepos, r.String()) return nil } @@ -127,8 +152,8 @@ func TestStopRestore(t *testing.T) { if err != nil { t.Fatalf(err.Error()) } - s.add("testBlob1", 300*timeUnit, entryTypeBlob) - s.add("testBlob2", 100*timeUnit, entryTypeBlob) + s.add(ref1, 300*timeUnit, entryTypeBlob) + s.add(ref2, 100*timeUnit, entryTypeBlob) // Start and stop before all operations complete // state will be written to fs