forked from TrueCloudLab/distribution
Merge pull request #333 from RichardScothern/purgeuploads
registry/storage: automatically purge old upload files
This commit is contained in:
commit
cce1641f9b
6 changed files with 506 additions and 0 deletions
|
@ -3,6 +3,7 @@ package handlers
|
|||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -79,6 +80,9 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
|
|||
// a health check.
|
||||
panic(err)
|
||||
}
|
||||
|
||||
startUploadPurger(app.driver, ctxu.GetLogger(app))
|
||||
|
||||
app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -549,3 +553,27 @@ func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []co
|
|||
}
|
||||
return driver, nil
|
||||
}
|
||||
|
||||
// startUploadPurger schedules a goroutine which will periodically
|
||||
// check upload directories for old files and delete them
|
||||
func startUploadPurger(storageDriver storagedriver.StorageDriver, log ctxu.Logger) {
|
||||
rand.Seed(time.Now().Unix())
|
||||
jitter := time.Duration(rand.Int()%60) * time.Minute
|
||||
|
||||
// Start with reasonable defaults
|
||||
// TODO:(richardscothern) make configurable
|
||||
purgeAge := time.Duration(7 * 24 * time.Hour)
|
||||
timeBetweenPurges := time.Duration(1 * 24 * time.Hour)
|
||||
|
||||
go func() {
|
||||
log.Infof("Starting upload purge in %s", jitter)
|
||||
time.Sleep(jitter)
|
||||
|
||||
for {
|
||||
storage.PurgeUploads(storageDriver, time.Now().Add(-purgeAge), true)
|
||||
log.Infof("Starting upload purge in %s", timeBetweenPurges)
|
||||
time.Sleep(timeBetweenPurges)
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
||||
|
|
|
@ -257,6 +257,8 @@ func (pm *pathMapper) path(spec pathSpec) (string, error) {
|
|||
offset = "" // Limit to the prefix for listing offsets.
|
||||
}
|
||||
return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "hashstates", v.alg, offset)...), nil
|
||||
case repositoriesRootPathSpec:
|
||||
return path.Join(repoPrefix...), nil
|
||||
default:
|
||||
// TODO(sday): This is an internal error. Ensure it doesn't escape (panic?).
|
||||
return "", fmt.Errorf("unknown path spec: %#v", v)
|
||||
|
@ -446,6 +448,12 @@ type uploadHashStatePathSpec struct {
|
|||
|
||||
func (uploadHashStatePathSpec) pathSpec() {}
|
||||
|
||||
// repositoriesRootPathSpec returns the root of repositories
|
||||
type repositoriesRootPathSpec struct {
|
||||
}
|
||||
|
||||
func (repositoriesRootPathSpec) pathSpec() {}
|
||||
|
||||
// digestPathComponents provides a consistent path breakdown for a given
|
||||
// digest. For a generic digest, it will be as follows:
|
||||
//
|
||||
|
|
136
registry/storage/purgeuploads.go
Normal file
136
registry/storage/purgeuploads.go
Normal file
|
@ -0,0 +1,136 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/go-uuid/uuid"
|
||||
log "github.com/Sirupsen/logrus"
|
||||
storageDriver "github.com/docker/distribution/registry/storage/driver"
|
||||
)
|
||||
|
||||
// uploadData stored the location of temporary files created during a layer upload
|
||||
// along with the date the upload was started
|
||||
type uploadData struct {
|
||||
containingDir string
|
||||
startedAt time.Time
|
||||
}
|
||||
|
||||
func newUploadData() uploadData {
|
||||
return uploadData{
|
||||
containingDir: "",
|
||||
// default to far in future to protect against missing startedat
|
||||
startedAt: time.Now().Add(time.Duration(10000 * time.Hour)),
|
||||
}
|
||||
}
|
||||
|
||||
// PurgeUploads deletes files from the upload directory
|
||||
// created before olderThan. The list of files deleted and errors
|
||||
// encountered are returned
|
||||
func PurgeUploads(driver storageDriver.StorageDriver, olderThan time.Time, actuallyDelete bool) ([]string, []error) {
|
||||
log.Infof("PurgeUploads starting: olderThan=%s, actuallyDelete=%t", olderThan, actuallyDelete)
|
||||
uploadData, errors := getOutstandingUploads(driver)
|
||||
var deleted []string
|
||||
for _, uploadData := range uploadData {
|
||||
if uploadData.startedAt.Before(olderThan) {
|
||||
var err error
|
||||
log.Infof("Upload files in %s have older date (%s) than purge date (%s). Removing upload directory.",
|
||||
uploadData.containingDir, uploadData.startedAt, olderThan)
|
||||
if actuallyDelete {
|
||||
err = driver.Delete(uploadData.containingDir)
|
||||
}
|
||||
if err == nil {
|
||||
deleted = append(deleted, uploadData.containingDir)
|
||||
} else {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("Purge uploads finished. Num deleted=%d, num errors=%d", len(deleted), len(errors))
|
||||
return deleted, errors
|
||||
}
|
||||
|
||||
// getOutstandingUploads walks the upload directory, collecting files
|
||||
// which could be eligible for deletion. The only reliable way to
|
||||
// classify the age of a file is with the date stored in the startedAt
|
||||
// file, so gather files by UUID with a date from startedAt.
|
||||
func getOutstandingUploads(driver storageDriver.StorageDriver) (map[string]uploadData, []error) {
|
||||
var errors []error
|
||||
uploads := make(map[string]uploadData, 0)
|
||||
|
||||
inUploadDir := false
|
||||
root, err := defaultPathMapper.path(repositoriesRootPathSpec{})
|
||||
if err != nil {
|
||||
return uploads, append(errors, err)
|
||||
}
|
||||
err = Walk(driver, root, func(fileInfo storageDriver.FileInfo) error {
|
||||
filePath := fileInfo.Path()
|
||||
_, file := path.Split(filePath)
|
||||
if file[0] == '_' {
|
||||
// Reserved directory
|
||||
inUploadDir = (file == "_uploads")
|
||||
|
||||
if fileInfo.IsDir() && !inUploadDir {
|
||||
return ErrSkipDir
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
uuid, isContainingDir := uUIDFromPath(filePath)
|
||||
if uuid == "" {
|
||||
// Cannot reliably delete
|
||||
return nil
|
||||
}
|
||||
ud, ok := uploads[uuid]
|
||||
if !ok {
|
||||
ud = newUploadData()
|
||||
}
|
||||
if isContainingDir {
|
||||
ud.containingDir = filePath
|
||||
}
|
||||
if file == "startedat" {
|
||||
if t, err := readStartedAtFile(driver, filePath); err == nil {
|
||||
ud.startedAt = t
|
||||
} else {
|
||||
errors = pushError(errors, filePath, err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
uploads[uuid] = ud
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
errors = pushError(errors, root, err)
|
||||
}
|
||||
return uploads, errors
|
||||
}
|
||||
|
||||
// uUIDFromPath extracts the upload UUID from a given path
|
||||
// If the UUID is the last path component, this is the containing
|
||||
// directory for all upload files
|
||||
func uUIDFromPath(path string) (string, bool) {
|
||||
components := strings.Split(path, "/")
|
||||
for i := len(components) - 1; i >= 0; i-- {
|
||||
if uuid := uuid.Parse(components[i]); uuid != nil {
|
||||
return uuid.String(), i == len(components)-1
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
// readStartedAtFile reads the date from an upload's startedAtFile
|
||||
func readStartedAtFile(driver storageDriver.StorageDriver, path string) (time.Time, error) {
|
||||
startedAtBytes, err := driver.GetContent(path)
|
||||
if err != nil {
|
||||
return time.Now(), err
|
||||
}
|
||||
startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
|
||||
if err != nil {
|
||||
return time.Now(), err
|
||||
}
|
||||
return startedAt, nil
|
||||
}
|
165
registry/storage/purgeuploads_test.go
Normal file
165
registry/storage/purgeuploads_test.go
Normal file
|
@ -0,0 +1,165 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"path"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/go-uuid/uuid"
|
||||
"github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||
)
|
||||
|
||||
var pm = defaultPathMapper
|
||||
|
||||
func testUploadFS(t *testing.T, numUploads int, repoName string, startedAt time.Time) driver.StorageDriver {
|
||||
d := inmemory.New()
|
||||
for i := 0; i < numUploads; i++ {
|
||||
addUploads(t, d, uuid.New(), repoName, startedAt)
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
func addUploads(t *testing.T, d driver.StorageDriver, uploadID, repo string, startedAt time.Time) {
|
||||
dataPath, err := pm.path(uploadDataPathSpec{name: repo, uuid: uploadID})
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to resolve path")
|
||||
}
|
||||
if err := d.PutContent(dataPath, []byte("")); err != nil {
|
||||
t.Fatalf("Unable to write data file")
|
||||
}
|
||||
|
||||
startedAtPath, err := pm.path(uploadStartedAtPathSpec{name: repo, uuid: uploadID})
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to resolve path")
|
||||
}
|
||||
|
||||
if d.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
|
||||
t.Fatalf("Unable to write startedAt file")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestPurgeGather(t *testing.T) {
|
||||
uploadCount := 5
|
||||
fs := testUploadFS(t, uploadCount, "test-repo", time.Now())
|
||||
uploadData, errs := getOutstandingUploads(fs)
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("Unexepected errors: %q", errs)
|
||||
}
|
||||
if len(uploadData) != uploadCount {
|
||||
t.Errorf("Unexpected upload file count: %d != %d", uploadCount, len(uploadData))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPurgeNone(t *testing.T) {
|
||||
fs := testUploadFS(t, 10, "test-repo", time.Now())
|
||||
oneHourAgo := time.Now().Add(-1 * time.Hour)
|
||||
deleted, errs := PurgeUploads(fs, oneHourAgo, true)
|
||||
if len(errs) != 0 {
|
||||
t.Error("Unexpected errors", errs)
|
||||
}
|
||||
if len(deleted) != 0 {
|
||||
t.Errorf("Unexpectedly deleted files for time: %s", oneHourAgo)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPurgeAll(t *testing.T) {
|
||||
uploadCount := 10
|
||||
oneHourAgo := time.Now().Add(-1 * time.Hour)
|
||||
fs := testUploadFS(t, uploadCount, "test-repo", oneHourAgo)
|
||||
|
||||
// Ensure > 1 repos are purged
|
||||
addUploads(t, fs, uuid.New(), "test-repo2", oneHourAgo)
|
||||
uploadCount++
|
||||
|
||||
deleted, errs := PurgeUploads(fs, time.Now(), true)
|
||||
if len(errs) != 0 {
|
||||
t.Error("Unexpected errors:", errs)
|
||||
}
|
||||
fileCount := uploadCount
|
||||
if len(deleted) != fileCount {
|
||||
t.Errorf("Unexpectedly deleted file count %d != %d",
|
||||
len(deleted), fileCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPurgeSome(t *testing.T) {
|
||||
oldUploadCount := 5
|
||||
oneHourAgo := time.Now().Add(-1 * time.Hour)
|
||||
fs := testUploadFS(t, oldUploadCount, "library/test-repo", oneHourAgo)
|
||||
|
||||
newUploadCount := 4
|
||||
|
||||
for i := 0; i < newUploadCount; i++ {
|
||||
addUploads(t, fs, uuid.New(), "test-repo", time.Now().Add(1*time.Hour))
|
||||
}
|
||||
|
||||
deleted, errs := PurgeUploads(fs, time.Now(), true)
|
||||
if len(errs) != 0 {
|
||||
t.Error("Unexpected errors:", errs)
|
||||
}
|
||||
if len(deleted) != oldUploadCount {
|
||||
t.Errorf("Unexpectedly deleted file count %d != %d",
|
||||
len(deleted), oldUploadCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPurgeOnlyUploads(t *testing.T) {
|
||||
oldUploadCount := 5
|
||||
oneHourAgo := time.Now().Add(-1 * time.Hour)
|
||||
fs := testUploadFS(t, oldUploadCount, "test-repo", oneHourAgo)
|
||||
|
||||
// Create a directory tree outside _uploads and ensure
|
||||
// these files aren't deleted.
|
||||
dataPath, err := pm.path(uploadDataPathSpec{name: "test-repo", uuid: uuid.New()})
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
nonUploadPath := strings.Replace(dataPath, "_upload", "_important", -1)
|
||||
if strings.Index(nonUploadPath, "_upload") != -1 {
|
||||
t.Fatalf("Non-upload path not created correctly")
|
||||
}
|
||||
|
||||
nonUploadFile := path.Join(nonUploadPath, "file")
|
||||
if err = fs.PutContent(nonUploadFile, []byte("")); err != nil {
|
||||
t.Fatalf("Unable to write data file")
|
||||
}
|
||||
|
||||
deleted, errs := PurgeUploads(fs, time.Now(), true)
|
||||
if len(errs) != 0 {
|
||||
t.Error("Unexpected errors", errs)
|
||||
}
|
||||
for _, file := range deleted {
|
||||
if strings.Index(file, "_upload") == -1 {
|
||||
t.Errorf("Non-upload file deleted")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPurgeMissingStartedAt(t *testing.T) {
|
||||
oneHourAgo := time.Now().Add(-1 * time.Hour)
|
||||
fs := testUploadFS(t, 1, "test-repo", oneHourAgo)
|
||||
err := Walk(fs, "/", func(fileInfo driver.FileInfo) error {
|
||||
filePath := fileInfo.Path()
|
||||
_, file := path.Split(filePath)
|
||||
|
||||
if file == "startedat" {
|
||||
if err := fs.Delete(filePath); err != nil {
|
||||
t.Fatalf("Unable to delete startedat file: %s", filePath)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error during Walk: %s ", err.Error())
|
||||
}
|
||||
deleted, errs := PurgeUploads(fs, time.Now(), true)
|
||||
if len(errs) > 0 {
|
||||
t.Errorf("Unexpected errors")
|
||||
}
|
||||
if len(deleted) > 0 {
|
||||
t.Errorf("Files unexpectedly deleted: %s", deleted)
|
||||
}
|
||||
}
|
50
registry/storage/walk.go
Normal file
50
registry/storage/walk.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
storageDriver "github.com/docker/distribution/registry/storage/driver"
|
||||
)
|
||||
|
||||
// SkipDir is used as a return value from onFileFunc to indicate that
|
||||
// the directory named in the call is to be skipped. It is not returned
|
||||
// as an error by any function.
|
||||
var ErrSkipDir = errors.New("skip this directory")
|
||||
|
||||
// WalkFn is called once per file by Walk
|
||||
// If the returned error is ErrSkipDir and fileInfo refers
|
||||
// to a directory, the directory will not be entered and Walk
|
||||
// will continue the traversal. Otherwise Walk will return
|
||||
type WalkFn func(fileInfo storageDriver.FileInfo) error
|
||||
|
||||
// Walk traverses a filesystem defined within driver, starting
|
||||
// from the given path, calling f on each file
|
||||
func Walk(driver storageDriver.StorageDriver, from string, f WalkFn) error {
|
||||
children, err := driver.List(from)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, child := range children {
|
||||
fileInfo, err := driver.Stat(child)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = f(fileInfo)
|
||||
skipDir := (err == ErrSkipDir)
|
||||
if err != nil && !skipDir {
|
||||
return err
|
||||
}
|
||||
|
||||
if fileInfo.IsDir() && !skipDir {
|
||||
Walk(driver, child, f)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// pushError formats an error type given a path and an error
|
||||
// and pushes it to a slice of errors
|
||||
func pushError(errors []error, path string, err error) []error {
|
||||
return append(errors, fmt.Errorf("%s: %s", path, err))
|
||||
}
|
119
registry/storage/walk_test.go
Normal file
119
registry/storage/walk_test.go
Normal file
|
@ -0,0 +1,119 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||
)
|
||||
|
||||
func testFS(t *testing.T) (driver.StorageDriver, map[string]string) {
|
||||
d := inmemory.New()
|
||||
c := []byte("")
|
||||
if err := d.PutContent("/a/b/c/d", c); err != nil {
|
||||
t.Fatalf("Unable to put to inmemory fs")
|
||||
}
|
||||
if err := d.PutContent("/a/b/c/e", c); err != nil {
|
||||
t.Fatalf("Unable to put to inmemory fs")
|
||||
}
|
||||
|
||||
expected := map[string]string{
|
||||
"/a": "dir",
|
||||
"/a/b": "dir",
|
||||
"/a/b/c": "dir",
|
||||
"/a/b/c/d": "file",
|
||||
"/a/b/c/e": "file",
|
||||
}
|
||||
|
||||
return d, expected
|
||||
}
|
||||
|
||||
func TestWalkErrors(t *testing.T) {
|
||||
d, expected := testFS(t)
|
||||
fileCount := len(expected)
|
||||
err := Walk(d, "", func(fileInfo driver.FileInfo) error {
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
t.Error("Expected invalid root err")
|
||||
}
|
||||
|
||||
err = Walk(d, "/", func(fileInfo driver.FileInfo) error {
|
||||
// error on the 2nd file
|
||||
if fileInfo.Path() == "/a/b" {
|
||||
return fmt.Errorf("Early termination")
|
||||
}
|
||||
delete(expected, fileInfo.Path())
|
||||
return nil
|
||||
})
|
||||
if len(expected) != fileCount-1 {
|
||||
t.Error("Walk failed to terminate with error")
|
||||
}
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
|
||||
err = Walk(d, "/nonexistant", func(fileInfo driver.FileInfo) error {
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
t.Errorf("Expected missing file err")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestWalk(t *testing.T) {
|
||||
d, expected := testFS(t)
|
||||
err := Walk(d, "/", func(fileInfo driver.FileInfo) error {
|
||||
filePath := fileInfo.Path()
|
||||
filetype, ok := expected[filePath]
|
||||
if !ok {
|
||||
t.Fatalf("Unexpected file in walk: %q", filePath)
|
||||
}
|
||||
|
||||
if fileInfo.IsDir() {
|
||||
if filetype != "dir" {
|
||||
t.Errorf("Unexpected file type: %q", filePath)
|
||||
}
|
||||
} else {
|
||||
if filetype != "file" {
|
||||
t.Errorf("Unexpected file type: %q", filePath)
|
||||
}
|
||||
}
|
||||
delete(expected, filePath)
|
||||
return nil
|
||||
})
|
||||
if len(expected) > 0 {
|
||||
t.Errorf("Missed files in walk: %q", expected)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestWalkSkipDir(t *testing.T) {
|
||||
d, expected := testFS(t)
|
||||
err := Walk(d, "/", func(fileInfo driver.FileInfo) error {
|
||||
filePath := fileInfo.Path()
|
||||
if filePath == "/a/b" {
|
||||
// skip processing /a/b/c and /a/b/c/d
|
||||
return ErrSkipDir
|
||||
}
|
||||
delete(expected, filePath)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
if _, ok := expected["/a/b/c"]; !ok {
|
||||
t.Errorf("/a/b/c not skipped")
|
||||
}
|
||||
if _, ok := expected["/a/b/c/d"]; !ok {
|
||||
t.Errorf("/a/b/c/d not skipped")
|
||||
}
|
||||
if _, ok := expected["/a/b/c/e"]; !ok {
|
||||
t.Errorf("/a/b/c/e not skipped")
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in a new issue