Garbage collection speed optimizations
- improved manifest mark computational complexity from quadratic to linear - optimized manifest removal by passing precomputed set of tags instead of all Signed-off-by: Bartosz Borkowski <bartebor@wp.pl>
This commit is contained in:
parent
749f6afb45
commit
764c69ca59
1 changed files with 115 additions and 14 deletions
|
@ -3,6 +3,8 @@ package storage
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"path"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
"github.com/docker/distribution"
|
||||||
"github.com/docker/distribution/reference"
|
"github.com/docker/distribution/reference"
|
||||||
|
@ -33,6 +35,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("unable to convert Namespace to RepositoryEnumerator")
|
return fmt.Errorf("unable to convert Namespace to RepositoryEnumerator")
|
||||||
}
|
}
|
||||||
|
emit("GC mark phase %v", time.Now().String())
|
||||||
|
|
||||||
// mark
|
// mark
|
||||||
markSet := make(map[digest.Digest]struct{})
|
markSet := make(map[digest.Digest]struct{})
|
||||||
|
@ -60,23 +63,60 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
|
||||||
return fmt.Errorf("unable to convert ManifestService into ManifestEnumerator")
|
return fmt.Errorf("unable to convert ManifestService into ManifestEnumerator")
|
||||||
}
|
}
|
||||||
|
|
||||||
err = manifestEnumerator.Enumerate(ctx, func(dgst digest.Digest) error {
|
// 1. fetch all tag names
|
||||||
if opts.RemoveUntagged {
|
|
||||||
// fetch all tags where this manifest is the latest one
|
|
||||||
tags, err := repository.Tags(ctx).Lookup(ctx, distribution.Descriptor{Digest: dgst})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to retrieve tags for digest %v: %v", dgst, err)
|
|
||||||
}
|
|
||||||
if len(tags) == 0 {
|
|
||||||
emit("manifest eligible for deletion: %s", dgst)
|
|
||||||
// fetch all tags from repository
|
|
||||||
// all of these tags could contain manifest in history
|
|
||||||
// which means that we need check (and delete) those references when deleting manifest
|
|
||||||
allTags, err := repository.Tags(ctx).All(ctx)
|
allTags, err := repository.Tags(ctx).All(ctx)
|
||||||
if err != nil {
|
switch err.(type) {
|
||||||
|
case distribution.ErrRepositoryUnknown:
|
||||||
|
break
|
||||||
|
case nil:
|
||||||
|
break
|
||||||
|
default:
|
||||||
return fmt.Errorf("failed to retrieve tags %v", err)
|
return fmt.Errorf("failed to retrieve tags %v", err)
|
||||||
}
|
}
|
||||||
manifestArr = append(manifestArr, ManifestDel{Name: repoName, Digest: dgst, Tags: allTags})
|
|
||||||
|
digestUsed := make(map[digest.Digest]int)
|
||||||
|
tagDigests := make(map[string][]digest.Digest)
|
||||||
|
|
||||||
|
// 2. read each tag's _current_ digest and mark its usage; store all index links for later reference
|
||||||
|
for _, tag := range allTags {
|
||||||
|
description, err := repository.Tags(ctx).Get(ctx, tag)
|
||||||
|
switch err.(type) {
|
||||||
|
case distribution.ErrTagUnknown:
|
||||||
|
// corrupted storage; current link is missing
|
||||||
|
break
|
||||||
|
case nil:
|
||||||
|
digestUsed[description.Digest] = 1
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("failed to retrieve tag %v: %v", tag, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// tag links (historical and current)
|
||||||
|
digests, err := getDigests(ctx, storageDriver, repoName, tag)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to retrieve tag links %v: %v", tag, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if digests != nil {
|
||||||
|
tagDigests[tag] = digests
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. produce digest usage map by transposing tagsDigests
|
||||||
|
digestTags := make(map[digest.Digest][]string)
|
||||||
|
for tag, digests := range tagDigests {
|
||||||
|
for _, digest := range digests {
|
||||||
|
digestTags[digest] = append(digestTags[digest], tag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = manifestEnumerator.Enumerate(ctx, func(dgst digest.Digest) error {
|
||||||
|
if opts.RemoveUntagged {
|
||||||
|
// check if this digest is used by any tag
|
||||||
|
if _, exists := digestUsed[dgst]; !exists {
|
||||||
|
emit("manifest eligible for deletion: %s", dgst)
|
||||||
|
// add only tags linking to given digest
|
||||||
|
manifestArr = append(manifestArr, ManifestDel{Name: repoName, Digest: dgst, Tags: digestTags[dgst]})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -119,6 +159,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
|
||||||
// sweep
|
// sweep
|
||||||
vacuum := NewVacuum(ctx, storageDriver)
|
vacuum := NewVacuum(ctx, storageDriver)
|
||||||
if !opts.DryRun {
|
if !opts.DryRun {
|
||||||
|
emit("GC manifest removal phase %v", time.Now().String())
|
||||||
for _, obj := range manifestArr {
|
for _, obj := range manifestArr {
|
||||||
err = vacuum.RemoveManifest(obj.Name, obj.Digest, obj.Tags)
|
err = vacuum.RemoveManifest(obj.Name, obj.Digest, obj.Tags)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -126,6 +167,8 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
emit("GC blob scan phase %v", time.Now().String())
|
||||||
blobService := registry.Blobs()
|
blobService := registry.Blobs()
|
||||||
deleteSet := make(map[digest.Digest]struct{})
|
deleteSet := make(map[digest.Digest]struct{})
|
||||||
err = blobService.Enumerate(ctx, func(dgst digest.Digest) error {
|
err = blobService.Enumerate(ctx, func(dgst digest.Digest) error {
|
||||||
|
@ -139,6 +182,7 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
|
||||||
return fmt.Errorf("error enumerating blobs: %v", err)
|
return fmt.Errorf("error enumerating blobs: %v", err)
|
||||||
}
|
}
|
||||||
emit("\n%d blobs marked, %d blobs and %d manifests eligible for deletion", len(markSet), len(deleteSet), len(manifestArr))
|
emit("\n%d blobs marked, %d blobs and %d manifests eligible for deletion", len(markSet), len(deleteSet), len(manifestArr))
|
||||||
|
emit("GC blob removal phase %v", time.Now().String())
|
||||||
for dgst := range deleteSet {
|
for dgst := range deleteSet {
|
||||||
emit("blob eligible for deletion: %s", dgst)
|
emit("blob eligible for deletion: %s", dgst)
|
||||||
if opts.DryRun {
|
if opts.DryRun {
|
||||||
|
@ -149,6 +193,63 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
|
||||||
return fmt.Errorf("failed to delete blob %s: %v", dgst, err)
|
return fmt.Errorf("failed to delete blob %s: %v", dgst, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
emit("GC done %v", time.Now().String())
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Finds all digests given tag links to
|
||||||
|
func getDigests(ctx context.Context, storageDriver driver.StorageDriver, repoName string, tag string) ([]digest.Digest, error) {
|
||||||
|
indexPath, err := pathFor(manifestTagIndexPathSpec{
|
||||||
|
name: repoName,
|
||||||
|
tag: tag,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var descriptors []digest.Digest
|
||||||
|
|
||||||
|
err = storageDriver.Walk(ctx, indexPath, func(fileInfo driver.FileInfo) error {
|
||||||
|
if fileInfo.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
filePath := fileInfo.Path()
|
||||||
|
|
||||||
|
dir, fileName := path.Split(filePath)
|
||||||
|
if fileName != "link" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
digest, err := digestFromLinkDir(dir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
descriptors = append(descriptors, digest)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if _, ok := err.(driver.PathNotFoundError); ok {
|
||||||
|
return descriptors, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("failed to read tags %v digests: %v", tag, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return descriptors, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reconstructs a digest from a link directory
|
||||||
|
func digestFromLinkDir(dir string) (digest.Digest, error) {
|
||||||
|
dir = path.Dir(dir)
|
||||||
|
dir, hex := path.Split(dir)
|
||||||
|
dir = path.Dir(dir)
|
||||||
|
dir, algo := path.Split(dir)
|
||||||
|
|
||||||
|
dgst := digest.NewDigestFromHex(algo, hex)
|
||||||
|
return dgst, dgst.Validate()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue