Handle OCI image index and V2 manifest list during garbage collection

Signed-off-by: Anthony Ramahay <thewolt@gmail.com>
This commit is contained in:
Anthony Ramahay 2024-02-24 18:08:17 +01:00
parent bc6e81e1b9
commit 601b37d98b
3 changed files with 434 additions and 13 deletions

View file

@ -68,12 +68,14 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
return fmt.Errorf("failed to retrieve tags for digest %v: %v", dgst, err)
}
if len(tags) == 0 {
emit("manifest eligible for deletion: %s", dgst)
// fetch all tags from repository
// all of these tags could contain manifest in history
// which means that we need check (and delete) those references when deleting manifest
allTags, err := repository.Tags(ctx).All(ctx)
if err != nil {
if _, ok := err.(distribution.ErrManifestUnknownRevision); !ok {
return nil
}
return fmt.Errorf("failed to retrieve tags %v", err)
}
manifestArr = append(manifestArr, ManifestDel{Name: repoName, Digest: dgst, Tags: allTags})
@ -84,18 +86,14 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
emit("%s: marking manifest %s ", repoName, dgst)
markSet[dgst] = struct{}{}
manifest, err := manifestService.Get(ctx, dgst)
if err != nil {
return fmt.Errorf("failed to retrieve manifest for digest %v: %v", dgst, err)
return markManifestReferences(dgst, manifestService, ctx, func(d digest.Digest) bool {
_, marked := markSet[d]
if !marked {
markSet[d] = struct{}{}
emit("%s: marking blob %s", repoName, d)
}
descriptors := manifest.References()
for _, descriptor := range descriptors {
markSet[descriptor.Digest] = struct{}{}
emit("%s: marking blob %s", repoName, descriptor.Digest)
}
return nil
return marked
})
})
// In certain situations such as unfinished uploads, deleting all
@ -113,6 +111,8 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
return fmt.Errorf("failed to mark: %v", err)
}
manifestArr = unmarkReferencedManifest(manifestArr, markSet)
// sweep
vacuum := NewVacuum(ctx, storageDriver)
if !opts.DryRun {
@ -149,3 +149,40 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
return err
}
// unmarkReferencedManifest filters out manifest present in markSet
func unmarkReferencedManifest(manifestArr []ManifestDel, markSet map[digest.Digest]struct{}) []ManifestDel {
filtered := make([]ManifestDel, 0)
for _, obj := range manifestArr {
if _, ok := markSet[obj.Digest]; !ok {
emit("manifest eligible for deletion: %s", obj)
filtered = append(filtered, obj)
}
}
return filtered
}
// markManifestReferences marks the manifest references
func markManifestReferences(dgst digest.Digest, manifestService distribution.ManifestService, ctx context.Context, ingester func(digest.Digest) bool) error {
manifest, err := manifestService.Get(ctx, dgst)
if err != nil {
return fmt.Errorf("failed to retrieve manifest for digest %v: %v", dgst, err)
}
descriptors := manifest.References()
for _, descriptor := range descriptors {
// do not visit references if already marked
if ingester(descriptor.Digest) {
continue
}
if ok, _ := manifestService.Exists(ctx, descriptor.Digest); ok {
err := markManifestReferences(descriptor.Digest, manifestService, ctx, ingester)
if err != nil {
return err
}
}
}
return nil
}

View file

@ -7,6 +7,7 @@ import (
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/manifest/ocischema"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/inmemory"
"github.com/distribution/distribution/v3/testutil"
@ -129,6 +130,29 @@ func uploadRandomSchema2Image(t *testing.T, repository distribution.Repository)
}
}
func uploadRandomOCIImage(t *testing.T, repository distribution.Repository) image {
randomLayers, err := testutil.CreateRandomLayers(2)
if err != nil {
t.Fatalf("%v", err)
}
digests := []digest.Digest{}
for digest := range randomLayers {
digests = append(digests, digest)
}
manifest, err := testutil.MakeOCIManifest(repository, digests)
if err != nil {
t.Fatalf("%v", err)
}
manifestDigest := uploadImage(t, repository, image{manifest: manifest, layers: randomLayers})
return image{
manifest: manifest,
manifestDigest: manifestDigest,
layers: randomLayers,
}
}
func TestNoDeletionNoEffect(t *testing.T) {
ctx := dcontext.Background()
inmemoryDriver := inmemory.New()
@ -270,6 +294,128 @@ func TestDeleteManifestIfTagNotFound(t *testing.T) {
}
}
func TestDeleteManifestIndexWithDanglingReferences(t *testing.T) {
ctx := dcontext.Background()
inmemoryDriver := inmemory.New()
registry := createRegistry(t, inmemoryDriver)
repo := makeRepository(t, registry, "deletemanifests")
manifestService, _ := repo.Manifests(ctx)
image1 := uploadRandomOCIImage(t, repo)
image2 := uploadRandomOCIImage(t, repo)
ii, _ := ocischema.FromDescriptors([]distribution.Descriptor{
{Digest: image1.manifestDigest}, {Digest: image2.manifestDigest},
}, map[string]string{})
id, err := manifestService.Put(ctx, ii)
if err != nil {
t.Fatalf("manifest upload failed: %v", err)
}
err = repo.Tags(ctx).Tag(ctx, "test", distribution.Descriptor{Digest: id})
if err != nil {
t.Fatalf("Failed to delete tag: %v", err)
}
// delete image2 => ii has a dangling reference
err = manifestService.Delete(ctx, image2.manifestDigest)
if err != nil {
t.Fatalf("Failed to delete image: %v", err)
}
before1 := allBlobs(t, registry)
before2 := allManifests(t, manifestService)
// run GC (should not remove anything because of tag)
err = MarkAndSweep(dcontext.Background(), inmemoryDriver, registry, GCOpts{
DryRun: false,
RemoveUntagged: true,
})
if err != nil {
t.Fatalf("Failed mark and sweep: %v", err)
}
after1 := allBlobs(t, registry)
after2 := allManifests(t, manifestService)
if len(before1) == len(after1) {
t.Fatalf("Garbage collection did not affect blobs storage: %d == %d", len(before1), len(after1))
}
if len(before2) != len(after2) {
t.Fatalf("Garbage collection affected manifest storage: %d != %d", len(before2), len(after2))
}
}
func TestDeleteManifestIndexIfTagNotFound(t *testing.T) {
ctx := dcontext.Background()
inmemoryDriver := inmemory.New()
registry := createRegistry(t, inmemoryDriver)
repo := makeRepository(t, registry, "deletemanifests")
manifestService, _ := repo.Manifests(ctx)
image1 := uploadRandomOCIImage(t, repo)
image2 := uploadRandomOCIImage(t, repo)
ii, _ := ocischema.FromDescriptors([]distribution.Descriptor{
{Digest: image1.manifestDigest}, {Digest: image2.manifestDigest},
}, map[string]string{})
d4, err := manifestService.Put(ctx, ii)
if err != nil {
t.Fatalf("manifest upload failed: %v", err)
}
err = repo.Tags(ctx).Tag(ctx, "test", distribution.Descriptor{Digest: d4})
if err != nil {
t.Fatalf("Failed to delete tag: %v", err)
}
before1 := allBlobs(t, registry)
before2 := allManifests(t, manifestService)
// run GC (should not remove anything because of tag)
err = MarkAndSweep(dcontext.Background(), inmemoryDriver, registry, GCOpts{
DryRun: false,
RemoveUntagged: true,
})
if err != nil {
t.Fatalf("Failed mark and sweep: %v", err)
}
beforeUntag1 := allBlobs(t, registry)
beforeUntag2 := allManifests(t, manifestService)
if len(before1) != len(beforeUntag1) {
t.Fatalf("Garbage collection affected blobs storage: %d != %d", len(before1), len(beforeUntag1))
}
if len(before2) != len(beforeUntag2) {
t.Fatalf("Garbage collection affected manifest storage: %d != %d", len(before2), len(beforeUntag2))
}
err = repo.Tags(ctx).Untag(ctx, "test")
if err != nil {
t.Fatalf("Failed to delete tag: %v", err)
}
// Run GC (removes everything because no manifests with tags exist)
err = MarkAndSweep(dcontext.Background(), inmemoryDriver, registry, GCOpts{
DryRun: false,
RemoveUntagged: true,
})
if err != nil {
t.Fatalf("Failed mark and sweep: %v", err)
}
after1 := allBlobs(t, registry)
after2 := allManifests(t, manifestService)
if len(beforeUntag1) == len(after1) {
t.Fatalf("Garbage collection did not affect blobs storage: %d == %d", len(beforeUntag1), len(after1))
}
if len(beforeUntag2) == len(after2) {
t.Fatalf("Garbage collection did not affect manifest storage: %d == %d", len(beforeUntag2), len(after2))
}
}
func TestGCWithMissingManifests(t *testing.T) {
ctx := dcontext.Background()
d := inmemory.New()
@ -479,3 +625,225 @@ func TestOrphanBlobDeleted(t *testing.T) {
}
}
}
func TestTaggedManifestlistWithUntaggedManifest(t *testing.T) {
ctx := dcontext.Background()
inmemoryDriver := inmemory.New()
registry := createRegistry(t, inmemoryDriver)
repo := makeRepository(t, registry, "foo/taggedlist/untaggedmanifest")
manifestService, err := repo.Manifests(ctx)
if err != nil {
t.Fatalf("%v", err)
}
image1 := uploadRandomSchema2Image(t, repo)
image2 := uploadRandomSchema2Image(t, repo)
// construct a manifestlist to reference manifests that is not tagged.
blobstatter := registry.BlobStatter()
manifestList, err := testutil.MakeManifestList(blobstatter, []digest.Digest{
image1.manifestDigest, image2.manifestDigest,
})
if err != nil {
t.Fatalf("Failed to make manifest list: %v", err)
}
dgst, err := manifestService.Put(ctx, manifestList)
if err != nil {
t.Fatalf("Failed to add manifest list: %v", err)
}
err = repo.Tags(ctx).Tag(ctx, "test", distribution.Descriptor{Digest: dgst})
if err != nil {
t.Fatalf("Failed to delete tag: %v", err)
}
before := allBlobs(t, registry)
// Run GC
err = MarkAndSweep(dcontext.Background(), inmemoryDriver, registry, GCOpts{
DryRun: false,
RemoveUntagged: true,
})
if err != nil {
t.Fatalf("Failed mark and sweep: %v", err)
}
after := allBlobs(t, registry)
if len(before) != len(after) {
t.Fatalf("Garbage collection affected storage: %d != %d", len(before), len(after))
}
if _, ok := after[image1.manifestDigest]; !ok {
t.Fatalf("First manifest is missing")
}
if _, ok := after[image2.manifestDigest]; !ok {
t.Fatalf("Second manifest is missing")
}
if _, ok := after[dgst]; !ok {
t.Fatalf("Manifest list is missing")
}
}
func TestUnTaggedManifestlistWithUntaggedManifest(t *testing.T) {
ctx := dcontext.Background()
inmemoryDriver := inmemory.New()
registry := createRegistry(t, inmemoryDriver)
repo := makeRepository(t, registry, "foo/untaggedlist/untaggedmanifest")
manifestService, err := repo.Manifests(ctx)
if err != nil {
t.Fatalf("%v", err)
}
image1 := uploadRandomSchema2Image(t, repo)
image2 := uploadRandomSchema2Image(t, repo)
// construct a manifestlist to reference manifests that is not tagged.
blobstatter := registry.BlobStatter()
manifestList, err := testutil.MakeManifestList(blobstatter, []digest.Digest{
image1.manifestDigest, image2.manifestDigest,
})
if err != nil {
t.Fatalf("Failed to make manifest list: %v", err)
}
_, err = manifestService.Put(ctx, manifestList)
if err != nil {
t.Fatalf("Failed to add manifest list: %v", err)
}
// Run GC
err = MarkAndSweep(dcontext.Background(), inmemoryDriver, registry, GCOpts{
DryRun: false,
RemoveUntagged: true,
})
if err != nil {
t.Fatalf("Failed mark and sweep: %v", err)
}
after := allBlobs(t, registry)
if len(after) != 0 {
t.Fatalf("Garbage collection affected storage: %d != %d", len(after), 0)
}
}
func TestUnTaggedManifestlistWithTaggedManifest(t *testing.T) {
ctx := dcontext.Background()
inmemoryDriver := inmemory.New()
registry := createRegistry(t, inmemoryDriver)
repo := makeRepository(t, registry, "foo/untaggedlist/taggedmanifest")
manifestService, err := repo.Manifests(ctx)
if err != nil {
t.Fatalf("%v", err)
}
image1 := uploadRandomSchema2Image(t, repo)
image2 := uploadRandomSchema2Image(t, repo)
err = repo.Tags(ctx).Tag(ctx, "image1", distribution.Descriptor{Digest: image1.manifestDigest})
if err != nil {
t.Fatalf("Failed to delete tag: %v", err)
}
err = repo.Tags(ctx).Tag(ctx, "image2", distribution.Descriptor{Digest: image2.manifestDigest})
if err != nil {
t.Fatalf("Failed to delete tag: %v", err)
}
// construct a manifestlist to reference manifests that is tagged.
blobstatter := registry.BlobStatter()
manifestList, err := testutil.MakeManifestList(blobstatter, []digest.Digest{
image1.manifestDigest, image2.manifestDigest,
})
if err != nil {
t.Fatalf("Failed to make manifest list: %v", err)
}
dgst, err := manifestService.Put(ctx, manifestList)
if err != nil {
t.Fatalf("Failed to add manifest list: %v", err)
}
// Run GC
err = MarkAndSweep(dcontext.Background(), inmemoryDriver, registry, GCOpts{
DryRun: false,
RemoveUntagged: true,
})
if err != nil {
t.Fatalf("Failed mark and sweep: %v", err)
}
after := allBlobs(t, registry)
afterManifests := allManifests(t, manifestService)
if _, ok := after[dgst]; ok {
t.Fatalf("Untagged manifestlist still exists")
}
if _, ok := afterManifests[image1.manifestDigest]; !ok {
t.Fatalf("First manifest is missing")
}
if _, ok := afterManifests[image2.manifestDigest]; !ok {
t.Fatalf("Second manifest is missing")
}
}
func TestTaggedManifestlistWithDeletedReference(t *testing.T) {
ctx := dcontext.Background()
inmemoryDriver := inmemory.New()
registry := createRegistry(t, inmemoryDriver)
repo := makeRepository(t, registry, "foo/untaggedlist/deleteref")
manifestService, err := repo.Manifests(ctx)
if err != nil {
t.Fatalf("%v", err)
}
image1 := uploadRandomSchema2Image(t, repo)
image2 := uploadRandomSchema2Image(t, repo)
// construct a manifestlist to reference manifests that is deleted.
blobstatter := registry.BlobStatter()
manifestList, err := testutil.MakeManifestList(blobstatter, []digest.Digest{
image1.manifestDigest, image2.manifestDigest,
})
if err != nil {
t.Fatalf("Failed to make manifest list: %v", err)
}
_, err = manifestService.Put(ctx, manifestList)
if err != nil {
t.Fatalf("Failed to add manifest list: %v", err)
}
err = manifestService.Delete(ctx, image1.manifestDigest)
if err != nil {
t.Fatalf("Failed to delete image: %v", err)
}
err = manifestService.Delete(ctx, image2.manifestDigest)
if err != nil {
t.Fatalf("Failed to delete image: %v", err)
}
// Run GC
err = MarkAndSweep(dcontext.Background(), inmemoryDriver, registry, GCOpts{
DryRun: false,
RemoveUntagged: true,
})
if err != nil {
t.Fatalf("Failed mark and sweep: %v", err)
}
after := allBlobs(t, registry)
if len(after) != 0 {
t.Fatalf("Garbage collection affected storage: %d != %d", len(after), 0)
}
}

View file

@ -1,11 +1,13 @@
package testutil
import (
"context"
"fmt"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/manifest/manifestlist"
"github.com/distribution/distribution/v3/manifest/ocischema"
"github.com/distribution/distribution/v3/manifest/schema2"
"github.com/opencontainers/go-digest"
)
@ -49,6 +51,20 @@ func MakeSchema2Manifest(repository distribution.Repository, digests []digest.Di
return nil, fmt.Errorf("unexpected error storing content in blobstore: %v", err)
}
builder := schema2.NewManifestBuilder(d, configJSON)
return makeManifest(ctx, builder, digests)
}
func MakeOCIManifest(repository distribution.Repository, digests []digest.Digest) (distribution.Manifest, error) {
ctx := dcontext.Background()
blobStore := repository.Blobs(ctx)
var configJSON []byte
builder := ocischema.NewManifestBuilder(blobStore, configJSON, make(map[string]string))
return makeManifest(ctx, builder, digests)
}
func makeManifest(ctx context.Context, builder distribution.ManifestBuilder, digests []digest.Digest) (distribution.Manifest, error) {
for _, digest := range digests {
if err := builder.AppendReference(distribution.Descriptor{Digest: digest}); err != nil {
return nil, fmt.Errorf("unexpected error building manifest: %v", err)