Compare commits

...

18 commits

Author SHA1 Message Date
Richard Scothern
03efb43768 Merge pull request #1713 from aibaars/gcs-fix
GCS: FileWriter.Size: return offset + buffer size for Writers that are not closed
2016-05-10 11:32:14 +01:00
Arthur Baars
431e46a7f9 GCS: FileWriter.Size: return offset + buffer size for Writers that are not closed
Signed-off-by: Arthur Baars <arthur@semmle.com>
2016-05-09 15:40:49 +01:00
Richard Scothern
1e0f3b7b64 Merge pull request #1703 from aibaars/gcs-fix
GCS: FileWriter.Size: include number of buffered bytes if the FileWriter is not closed
2016-05-05 12:09:13 -07:00
Arthur Baars
93e3aa9b21 GCS: FileWriter.Size: include number of buffered bytes if the FileWriter is not closed
Signed-off-by: Arthur Baars <arthur@semmle.com>
2016-05-05 11:07:45 +01:00
Aaron Lehmann
93d76247f2 Preserve author information in schema1 manifests
When we push a schema1 manifest, we encode history information from the
image JSON into v1Compatibility strings for the respective layers. The
"author" field was not being set in these v1Compatibility strings, so if
a parent layer had an author set, it would not be preserved after
pushing through a schema1 manifest and repulling, so the image ID would
change after the pull. This change preserves the authorship information
for parent layers so that the image ID does not change.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
2016-05-04 10:35:49 -07:00
Richard Scothern
ba672e8b69 When a blob upload is committed prevent writing out hashstate in the
subsequent close.

When a blob upload is cancelled close the blobwriter before removing
upload state to ensure old hashstates don't persist.

Signed-off-by: Richard Scothern <richard.scothern@docker.com>
2016-05-04 10:35:37 -07:00
Richard Scothern
96230decb9 Add a test with a missing _manifests directory
Signed-off-by: Richard Scothern <richard.scothern@docker.com>
2016-05-04 10:35:26 -07:00
Richard Scothern
c0d3813f86 Move garbage collect code into storage package
Signed-off-by: Richard Scothern <richard.scothern@docker.com>
2016-05-04 10:35:18 -07:00
Tony Holdstock-Brown
011b7e493b Ensure GC continues marking if _manifests is nonexistent
Signed-off-by: Tony Holdstock-Brown <tony@docker.com>
2016-05-04 10:35:04 -07:00
Serge Dubrouski
0a1fcf9712 Fix wording for dry-run flag in useage message for garbage collector.
Signed-off-by: Serge Dubrouski <sergeyfd@gmail.com>
2016-05-04 10:31:44 -07:00
Anis Elleuch
ed02e88075 Sorting completed parts by part number for a better accordance with the S3 spec
Signed-off-by: Anis Elleuch <vadmeste@gmail.com>
2016-05-04 10:30:37 -07:00
Serge Dubrouski
fd5a404996 Add blobWrtiter.Close() call into blobWriter.Commit()
Signed-off-by: Serge Dubrouski <sergeyfd@gmail.com>
2016-05-04 10:30:29 -07:00
jhaohai
3f538cac90 add cn-north-1 to valid check
Signed-off-by: jhaohai <jhaohai@foxmail.com>
2016-05-04 10:30:18 -07:00
Stefan Majewsky
3330cc567e wait for DLO segments to show up when Close()ing the writer
Not just when Commit()ing the result. This fixes some errors I observed
when the layer (i.e. the DLO) is Stat()ed immediately after closing,
and reports the wrong file size because the container listing is not
yet up-to-date.

Signed-off-by: Stefan Majewsky <stefan.majewsky@sap.com>
2016-05-04 10:30:08 -07:00
Aaron Lehmann
775d0968cc Use correct media type for config blob in schema2 manifest
The schema2 manifest builder fills in this part of the manifest based on
the descriptor it gets back from BlobIngester's Put method. It passes
the correct media type to Put, but Put ends up replacing this value with
application/octet-stream in its return value.

This commit works around the issue in the manifest builder. Arguably Put
should not be changing the media type in its return value, but this
commit is a targeted fix to keep it very low-risk for possible inclusion
in Docker 1.11.

Fixes #1621 (but maybe we should open a separate issue for the media
type behavior in the distribution client, and the unnecessary stat).

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
2016-05-04 10:28:54 -07:00
Arien Holthuizen
64a9727f11 Only check validity of S3 region if not using custom endpoint
Signed-off-by: Arien Holthuizen <aholthuizen@schubergphilis.com>
2016-05-04 10:28:31 -07:00
Tony Holdstock-Brown
dafb59f4ab Ensure we log io.Copy errors and bytes copied/total in uploads
Signed-off-by: Tony Holdstock-Brown <tony@docker.com>
2016-05-04 10:28:12 -07:00
Richard Scothern
3f7fa41272 Update AUTHORS 2016-04-13 10:27:36 -07:00
14 changed files with 220 additions and 100 deletions

View file

@ -1,4 +1,5 @@
Aaron Lehmann <aaron.lehmann@docker.com>
Aaron Schlesinger <aschlesinger@deis.com>
Aaron Vinson <avinson.public@gmail.com>
Adam Enger <adamenger@gmail.com>
Adrian Mouat <adrian.mouat@gmail.com>
@ -27,6 +28,7 @@ burnettk <burnettk@gmail.com>
Carson A <ca@carsonoid.net>
Chris Dillon <squarism@gmail.com>
Daisuke Fujita <dtanshi45@gmail.com>
Daniel Huhn <daniel@danielhuhn.de>
Darren Shepherd <darren@rancher.com>
Dave Trombley <dave.trombley@gmail.com>
Dave Tucker <dt@docker.com>
@ -59,6 +61,7 @@ Jeff Nickoloff <jeff@allingeek.com>
Jessie Frazelle <jessie@docker.com>
Jianqing Wang <tsing@jianqing.org>
John Starks <jostarks@microsoft.com>
Jon Johnson <jonjohnson@google.com>
Jon Poler <jonathan.poler@apcera.com>
Jonathan Boulle <jonathanboulle@gmail.com>
Jordan Liggitt <jliggitt@redhat.com>
@ -103,6 +106,7 @@ Shawn Falkner-Horine <dreadpirateshawn@gmail.com>
Shreyas Karnik <karnik.shreyas@gmail.com>
Simon Thulbourn <simon+github@thulbourn.com>
Spencer Rinehart <anubis@overthemonkey.com>
Stefan Majewsky <stefan.majewsky@sap.com>
Stefan Weil <sw@weilnetz.de>
Stephen J Day <stephen.day@docker.com>
Sungho Moon <sungho.moon@navercorp.com>
@ -114,6 +118,7 @@ Thomas Sjögren <konstruktoid@users.noreply.github.com>
Tianon Gravi <admwiggin@gmail.com>
Tibor Vass <teabee89@gmail.com>
Tonis Tiigi <tonistiigi@gmail.com>
Tony Holdstock-Brown <tony@docker.com>
Trevor Pounds <trevor.pounds@gmail.com>
Troels Thomsen <troels@thomsen.io>
Vincent Batts <vbatts@redhat.com>

View file

@ -110,7 +110,8 @@ func (mb *configManifestBuilder) Build(ctx context.Context) (m distribution.Mani
ContainerConfig struct {
Cmd []string
} `json:"container_config,omitempty"`
ThrowAway bool `json:"throwaway,omitempty"`
Author string `json:"author,omitempty"`
ThrowAway bool `json:"throwaway,omitempty"`
}
fsLayerList := make([]FSLayer, len(img.History))
@ -145,6 +146,7 @@ func (mb *configManifestBuilder) Build(ctx context.Context) (m distribution.Mani
Parent: parent,
Comment: h.Comment,
Created: h.Created,
Author: h.Author,
}
v1Compatibility.ContainerConfig.Cmd = []string{img.History[i].CreatedBy}
if h.EmptyLayer {

View file

@ -163,6 +163,7 @@ func TestConfigBuilder(t *testing.T) {
"empty_layer": true
},
{
"author": "Alyssa P. Hacker \u003calyspdev@example.com\u003e",
"created": "2015-11-04T23:06:32.083868454Z",
"created_by": "/bin/sh -c dd if=/dev/zero of=/file bs=1024 count=1024"
},
@ -252,8 +253,8 @@ func TestConfigBuilder(t *testing.T) {
}
expectedV1Compatibility := []string{
`{"architecture":"amd64","config":{"AttachStderr":false,"AttachStdin":false,"AttachStdout":false,"Cmd":["/bin/sh","-c","echo hi"],"Domainname":"","Entrypoint":null,"Env":["PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin","derived=true","asdf=true"],"Hostname":"23304fc829f9","Image":"sha256:4ab15c48b859c2920dd5224f92aabcd39a52794c5b3cf088fb3bbb438756c246","Labels":{},"OnBuild":[],"OpenStdin":false,"StdinOnce":false,"Tty":false,"User":"","Volumes":null,"WorkingDir":""},"container":"e91032eb0403a61bfe085ff5a5a48e3659e5a6deae9f4d678daa2ae399d5a001","container_config":{"AttachStderr":false,"AttachStdin":false,"AttachStdout":false,"Cmd":["/bin/sh","-c","#(nop) CMD [\"/bin/sh\" \"-c\" \"echo hi\"]"],"Domainname":"","Entrypoint":null,"Env":["PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin","derived=true","asdf=true"],"Hostname":"23304fc829f9","Image":"sha256:4ab15c48b859c2920dd5224f92aabcd39a52794c5b3cf088fb3bbb438756c246","Labels":{},"OnBuild":[],"OpenStdin":false,"StdinOnce":false,"Tty":false,"User":"","Volumes":null,"WorkingDir":""},"created":"2015-11-04T23:06:32.365666163Z","docker_version":"1.9.0-dev","id":"0850bfdeb7b060b1004a09099846c2f023a3f2ecbf33f56b4774384b00ce0323","os":"linux","parent":"74cf9c92699240efdba1903c2748ef57105d5bedc588084c4e88f3bb1c3ef0b0","throwaway":true}`,
`{"id":"74cf9c92699240efdba1903c2748ef57105d5bedc588084c4e88f3bb1c3ef0b0","parent":"178be37afc7c49e951abd75525dbe0871b62ad49402f037164ee6314f754599d","created":"2015-11-04T23:06:32.083868454Z","container_config":{"Cmd":["/bin/sh -c dd if=/dev/zero of=/file bs=1024 count=1024"]}}`,
`{"architecture":"amd64","config":{"AttachStderr":false,"AttachStdin":false,"AttachStdout":false,"Cmd":["/bin/sh","-c","echo hi"],"Domainname":"","Entrypoint":null,"Env":["PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin","derived=true","asdf=true"],"Hostname":"23304fc829f9","Image":"sha256:4ab15c48b859c2920dd5224f92aabcd39a52794c5b3cf088fb3bbb438756c246","Labels":{},"OnBuild":[],"OpenStdin":false,"StdinOnce":false,"Tty":false,"User":"","Volumes":null,"WorkingDir":""},"container":"e91032eb0403a61bfe085ff5a5a48e3659e5a6deae9f4d678daa2ae399d5a001","container_config":{"AttachStderr":false,"AttachStdin":false,"AttachStdout":false,"Cmd":["/bin/sh","-c","#(nop) CMD [\"/bin/sh\" \"-c\" \"echo hi\"]"],"Domainname":"","Entrypoint":null,"Env":["PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin","derived=true","asdf=true"],"Hostname":"23304fc829f9","Image":"sha256:4ab15c48b859c2920dd5224f92aabcd39a52794c5b3cf088fb3bbb438756c246","Labels":{},"OnBuild":[],"OpenStdin":false,"StdinOnce":false,"Tty":false,"User":"","Volumes":null,"WorkingDir":""},"created":"2015-11-04T23:06:32.365666163Z","docker_version":"1.9.0-dev","id":"69e5c1bfadad697fdb6db59f6326648fa119e0c031a0eda33b8cfadcab54ba7f","os":"linux","parent":"74cf9c92699240efdba1903c2748ef57105d5bedc588084c4e88f3bb1c3ef0b0","throwaway":true}`,
`{"id":"74cf9c92699240efdba1903c2748ef57105d5bedc588084c4e88f3bb1c3ef0b0","parent":"178be37afc7c49e951abd75525dbe0871b62ad49402f037164ee6314f754599d","created":"2015-11-04T23:06:32.083868454Z","container_config":{"Cmd":["/bin/sh -c dd if=/dev/zero of=/file bs=1024 count=1024"]},"author":"Alyssa P. Hacker \u003calyspdev@example.com\u003e"}`,
`{"id":"178be37afc7c49e951abd75525dbe0871b62ad49402f037164ee6314f754599d","parent":"b449305a55a283538c4574856a8b701f2a3d5ec08ef8aec47f385f20339a4866","created":"2015-11-04T23:06:31.192097572Z","container_config":{"Cmd":["/bin/sh -c #(nop) ENV asdf=true"]},"throwaway":true}`,
`{"id":"b449305a55a283538c4574856a8b701f2a3d5ec08ef8aec47f385f20339a4866","parent":"9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e","created":"2015-11-04T23:06:30.934316144Z","container_config":{"Cmd":["/bin/sh -c #(nop) ENV derived=true"]},"throwaway":true}`,
`{"id":"9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e","parent":"3690474eb5b4b26fdfbd89c6e159e8cc376ca76ef48032a30fa6aafd56337880","created":"2015-10-31T22:22:55.613815829Z","container_config":{"Cmd":["/bin/sh -c #(nop) CMD [\"sh\"]"]}}`,

View file

@ -55,6 +55,9 @@ func (mb *builder) Build(ctx context.Context) (distribution.Manifest, error) {
// Add config to the blob store
m.Config, err = mb.bs.Put(ctx, MediaTypeConfig, mb.configJSON)
// Override MediaType, since Put always replaces the specified media
// type with application/octet-stream in the descriptor it returns.
m.Config.MediaType = MediaTypeConfig
if err != nil {
return nil, err
}

View file

@ -32,7 +32,7 @@ func (bs *mockBlobService) Put(ctx context.Context, mediaType string, p []byte)
d := distribution.Descriptor{
Digest: digest.FromBytes(p),
Size: int64(len(p)),
MediaType: mediaType,
MediaType: "application/octet-stream",
}
bs.descriptors[d.Digest] = d
return d, nil

View file

@ -46,7 +46,11 @@ func copyFullPayload(responseWriter http.ResponseWriter, r *http.Request, destWr
// instead of showing 0 for the HTTP status.
responseWriter.WriteHeader(499)
ctxu.GetLogger(context).Error("client disconnected during " + action)
ctxu.GetLoggerWithFields(context, map[interface{}]interface{}{
"error": err,
"copied": copied,
"contentLength": r.ContentLength,
}, "error", "copied", "contentLength").Error("client disconnected during " + action)
return errors.New("client disconnected")
default:
}

View file

@ -1,7 +1,14 @@
package registry
import (
"fmt"
"os"
"github.com/docker/distribution/context"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/docker/distribution/version"
"github.com/docker/libtrust"
"github.com/spf13/cobra"
)
@ -10,6 +17,7 @@ var showVersion bool
func init() {
RootCmd.AddCommand(ServeCmd)
RootCmd.AddCommand(GCCmd)
GCCmd.Flags().BoolVarP(&dryRun, "dry-run", "d", false, "do everything except remove the blobs")
RootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "show the version and exit")
}
@ -26,3 +34,51 @@ var RootCmd = &cobra.Command{
cmd.Usage()
},
}
var dryRun bool
// GCCmd is the cobra command that corresponds to the garbage-collect subcommand
var GCCmd = &cobra.Command{
Use: "garbage-collect <config>",
Short: "`garbage-collect` deletes layers not referenced by any manifests",
Long: "`garbage-collect` deletes layers not referenced by any manifests",
Run: func(cmd *cobra.Command, args []string) {
config, err := resolveConfiguration(args)
if err != nil {
fmt.Fprintf(os.Stderr, "configuration error: %v\n", err)
cmd.Usage()
os.Exit(1)
}
driver, err := factory.Create(config.Storage.Type(), config.Storage.Parameters())
if err != nil {
fmt.Fprintf(os.Stderr, "failed to construct %s driver: %v", config.Storage.Type(), err)
os.Exit(1)
}
ctx := context.Background()
ctx, err = configureLogging(ctx, config)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to configure logging with config: %s", err)
os.Exit(1)
}
k, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
registry, err := storage.NewRegistry(ctx, driver, storage.DisableSchema1Signatures, storage.Schema1SigningKey(k))
if err != nil {
fmt.Fprintf(os.Stderr, "failed to construct registry: %v", err)
os.Exit(1)
}
err = storage.MarkAndSweep(ctx, driver, registry, dryRun)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to garbage collect: %v", err)
os.Exit(1)
}
},
}

View file

@ -16,6 +16,7 @@ import (
"github.com/docker/distribution/registry/storage/cache/memory"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/docker/distribution/testutil"
"path"
)
// TestWriteSeek tests that the current file size can be
@ -83,6 +84,15 @@ func TestSimpleBlobUpload(t *testing.T) {
t.Fatalf("unexpected error during upload cancellation: %v", err)
}
// get the enclosing directory
uploadPath := path.Dir(blobUpload.(*blobWriter).path)
// ensure state was cleaned up
_, err = driver.List(ctx, uploadPath)
if err == nil {
t.Fatal("files in upload path after cleanup")
}
// Do a resume, get unknown upload
blobUpload, err = bs.Resume(ctx, blobUpload.ID())
if err != distribution.ErrBlobUploadUnknown {
@ -128,6 +138,13 @@ func TestSimpleBlobUpload(t *testing.T) {
t.Fatalf("unexpected error finishing layer upload: %v", err)
}
// ensure state was cleaned up
uploadPath = path.Dir(blobUpload.(*blobWriter).path)
_, err = driver.List(ctx, uploadPath)
if err == nil {
t.Fatal("files in upload path after commit")
}
// After finishing an upload, it should no longer exist.
if _, err := bs.Resume(ctx, blobUpload.ID()); err != distribution.ErrBlobUploadUnknown {
t.Fatalf("expected layer upload to be unknown, got %v", err)

View file

@ -18,8 +18,8 @@ var (
errResumableDigestNotAvailable = errors.New("resumable digest not available")
)
// layerWriter is used to control the various aspects of resumable
// layer upload. It implements the LayerUpload interface.
// blobWriter is used to control the various aspects of resumable
// blob upload.
type blobWriter struct {
ctx context.Context
blobStore *linkedBlobStore
@ -34,6 +34,7 @@ type blobWriter struct {
path string
resumableDigestEnabled bool
committed bool
}
var _ distribution.BlobWriter = &blobWriter{}
@ -56,6 +57,8 @@ func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor)
return distribution.Descriptor{}, err
}
bw.Close()
canonical, err := bw.validateBlob(ctx, desc)
if err != nil {
return distribution.Descriptor{}, err
@ -78,6 +81,7 @@ func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor)
return distribution.Descriptor{}, err
}
bw.committed = true
return canonical, nil
}
@ -89,11 +93,14 @@ func (bw *blobWriter) Cancel(ctx context.Context) error {
return err
}
if err := bw.Close(); err != nil {
context.GetLogger(ctx).Errorf("error closing blobwriter: %s", err)
}
if err := bw.removeResources(ctx); err != nil {
return err
}
bw.Close()
return nil
}
@ -130,6 +137,10 @@ func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
}
func (bw *blobWriter) Close() error {
if bw.committed {
return errors.New("blobwriter close after commit")
}
if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
return err
}

View file

@ -493,6 +493,9 @@ func (w *writer) Write(p []byte) (int, error) {
// Size returns the number of bytes written to this FileWriter.
func (w *writer) Size() int64 {
if !w.closed {
return w.offset + int64(w.buffSize)
}
return w.size
}

View file

@ -18,6 +18,7 @@ import (
"io/ioutil"
"net/http"
"reflect"
"sort"
"strconv"
"strings"
"time"
@ -80,6 +81,7 @@ func init() {
"ap-northeast-1",
"ap-northeast-2",
"sa-east-1",
"cn-north-1",
} {
validRegions[region] = struct{}{}
}
@ -136,14 +138,21 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
secretKey = ""
}
regionEndpoint := parameters["regionendpoint"]
if regionEndpoint == nil {
regionEndpoint = ""
}
regionName, ok := parameters["region"]
if regionName == nil || fmt.Sprint(regionName) == "" {
return nil, fmt.Errorf("No region parameter provided")
}
region := fmt.Sprint(regionName)
_, ok = validRegions[region]
if !ok {
return nil, fmt.Errorf("Invalid region provided: %v", region)
// Don't check the region value if a custom endpoint is provided.
if regionEndpoint == "" {
if _, ok = validRegions[region]; !ok {
return nil, fmt.Errorf("Invalid region provided: %v", region)
}
}
bucket := parameters["bucket"]
@ -151,11 +160,6 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
return nil, fmt.Errorf("No bucket parameter provided")
}
regionEndpoint := parameters["regionendpoint"]
if regionEndpoint == nil {
regionEndpoint = ""
}
encryptBool := false
encrypt := parameters["encrypt"]
switch encrypt := encrypt.(type) {
@ -716,6 +720,12 @@ func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver
}
}
type completedParts []*s3.CompletedPart
func (a completedParts) Len() int { return len(a) }
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }
func (w *writer) Write(p []byte) (int, error) {
if w.closed {
return 0, fmt.Errorf("already closed")
@ -728,19 +738,22 @@ func (w *writer) Write(p []byte) (int, error) {
// If the last written part is smaller than minChunkSize, we need to make a
// new multipart upload :sadface:
if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize {
var completedParts []*s3.CompletedPart
var completedUploadedParts completedParts
for _, part := range w.parts {
completedParts = append(completedParts, &s3.CompletedPart{
completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{
ETag: part.ETag,
PartNumber: part.PartNumber,
})
}
sort.Sort(completedUploadedParts)
_, err := w.driver.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedParts,
Parts: completedUploadedParts,
},
})
if err != nil {
@ -880,19 +893,23 @@ func (w *writer) Commit() error {
return err
}
w.committed = true
var completedParts []*s3.CompletedPart
var completedUploadedParts completedParts
for _, part := range w.parts {
completedParts = append(completedParts, &s3.CompletedPart{
completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{
ETag: part.ETag,
PartNumber: part.PartNumber,
})
}
sort.Sort(completedUploadedParts)
_, err = w.driver.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedParts,
Parts: completedUploadedParts,
},
})
if err != nil {

View file

@ -698,6 +698,9 @@ func (w *writer) Close() error {
if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil {
return err
}
if err := w.waitForSegmentsToShowUp(); err != nil {
return err
}
}
w.closed = true
@ -732,10 +735,14 @@ func (w *writer) Commit() error {
}
w.committed = true
return w.waitForSegmentsToShowUp()
}
func (w *writer) waitForSegmentsToShowUp() error {
var err error
waitingTime := readAfterWriteWait
endTime := time.Now().Add(readAfterWriteTimeout)
for {
var info swift.Object
if info, _, err = w.driver.Conn.Object(w.driver.Container, w.driver.swiftPath(w.path)); err == nil {

View file

@ -1,8 +1,7 @@
package registry
package storage
import (
"fmt"
"os"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
@ -10,21 +9,15 @@ import (
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/docker/libtrust"
"github.com/spf13/cobra"
)
func emit(format string, a ...interface{}) {
if dryRun {
fmt.Printf(format+"\n", a...)
}
fmt.Printf(format+"\n", a...)
}
func markAndSweep(ctx context.Context, storageDriver driver.StorageDriver, registry distribution.Namespace) error {
// MarkAndSweep performs a mark and sweep of registry data
func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, registry distribution.Namespace, dryRun bool) error {
repositoryEnumerator, ok := registry.(distribution.RepositoryEnumerator)
if !ok {
return fmt.Errorf("unable to convert Namespace to RepositoryEnumerator")
@ -33,7 +26,9 @@ func markAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
// mark
markSet := make(map[digest.Digest]struct{})
err := repositoryEnumerator.Enumerate(ctx, func(repoName string) error {
emit(repoName)
if dryRun {
emit(repoName)
}
var err error
named, err := reference.ParseNamed(repoName)
@ -57,7 +52,9 @@ func markAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
err = manifestEnumerator.Enumerate(ctx, func(dgst digest.Digest) error {
// Mark the manifest's blob
emit("%s: marking manifest %s ", repoName, dgst)
if dryRun {
emit("%s: marking manifest %s ", repoName, dgst)
}
markSet[dgst] = struct{}{}
manifest, err := manifestService.Get(ctx, dgst)
@ -68,7 +65,9 @@ func markAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
descriptors := manifest.References()
for _, descriptor := range descriptors {
markSet[descriptor.Digest] = struct{}{}
emit("%s: marking blob %s", repoName, descriptor.Digest)
if dryRun {
emit("%s: marking blob %s", repoName, descriptor.Digest)
}
}
switch manifest.(type) {
@ -82,13 +81,17 @@ func markAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
return fmt.Errorf("failed to get signatures for signed manifest: %v", err)
}
for _, signatureDigest := range signatures {
emit("%s: marking signature %s", repoName, signatureDigest)
if dryRun {
emit("%s: marking signature %s", repoName, signatureDigest)
}
markSet[signatureDigest] = struct{}{}
}
break
case *schema2.DeserializedManifest:
config := manifest.(*schema2.DeserializedManifest).Config
emit("%s: marking configuration %s", repoName, config.Digest)
if dryRun {
emit("%s: marking configuration %s", repoName, config.Digest)
}
markSet[config.Digest] = struct{}{}
break
}
@ -96,6 +99,17 @@ func markAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
return nil
})
if err != nil {
// In certain situations such as unfinished uploads, deleting all
// tags in S3 or removing the _manifests folder manually, this
// error may be of type PathNotFound.
//
// In these cases we can continue marking other manifests safely.
if _, ok := err.(driver.PathNotFoundError); ok {
return nil
}
}
return err
})
@ -116,13 +130,14 @@ func markAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
if err != nil {
return fmt.Errorf("error enumerating blobs: %v", err)
}
emit("\n%d blobs marked, %d blobs eligible for deletion", len(markSet), len(deleteSet))
if dryRun {
emit("\n%d blobs marked, %d blobs eligible for deletion", len(markSet), len(deleteSet))
}
// Construct vacuum
vacuum := storage.NewVacuum(ctx, storageDriver)
vacuum := NewVacuum(ctx, storageDriver)
for dgst := range deleteSet {
emit("blob eligible for deletion: %s", dgst)
if dryRun {
emit("blob eligible for deletion: %s", dgst)
continue
}
err = vacuum.RemoveBlob(string(dgst))
@ -133,55 +148,3 @@ func markAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis
return err
}
func init() {
GCCmd.Flags().BoolVarP(&dryRun, "dry-run", "d", false, "do everything expect remove the blobs")
}
var dryRun bool
// GCCmd is the cobra command that corresponds to the garbage-collect subcommand
var GCCmd = &cobra.Command{
Use: "garbage-collect <config>",
Short: "`garbage-collect` deletes layers not referenced by any manifests",
Long: "`garbage-collect` deletes layers not referenced by any manifests",
Run: func(cmd *cobra.Command, args []string) {
config, err := resolveConfiguration(args)
if err != nil {
fmt.Fprintf(os.Stderr, "configuration error: %v\n", err)
cmd.Usage()
os.Exit(1)
}
driver, err := factory.Create(config.Storage.Type(), config.Storage.Parameters())
if err != nil {
fmt.Fprintf(os.Stderr, "failed to construct %s driver: %v", config.Storage.Type(), err)
os.Exit(1)
}
ctx := context.Background()
ctx, err = configureLogging(ctx, config)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to configure logging with config: %s", err)
os.Exit(1)
}
k, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
registry, err := storage.NewRegistry(ctx, driver, storage.DisableSchema1Signatures, storage.Schema1SigningKey(k))
if err != nil {
fmt.Fprintf(os.Stderr, "failed to construct registry: %v", err)
os.Exit(1)
}
err = markAndSweep(ctx, driver, registry)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to garbage collect: %v", err)
os.Exit(1)
}
},
}

View file

@ -1,14 +1,14 @@
package registry
package storage
import (
"io"
"path"
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/docker/distribution/testutil"
@ -22,7 +22,7 @@ type image struct {
func createRegistry(t *testing.T, driver driver.StorageDriver) distribution.Namespace {
ctx := context.Background()
registry, err := storage.NewRegistry(ctx, driver, storage.EnableDelete)
registry, err := NewRegistry(ctx, driver, EnableDelete)
if err != nil {
t.Fatalf("Failed to construct namespace")
}
@ -161,7 +161,7 @@ func TestNoDeletionNoEffect(t *testing.T) {
}
// Run GC
err = markAndSweep(context.Background(), inmemoryDriver, registry)
err = MarkAndSweep(context.Background(), inmemoryDriver, registry, false)
if err != nil {
t.Fatalf("Failed mark and sweep: %v", err)
}
@ -177,6 +177,37 @@ func TestNoDeletionNoEffect(t *testing.T) {
}
}
func TestGCWithMissingManifests(t *testing.T) {
ctx := context.Background()
d := inmemory.New()
registry := createRegistry(t, d)
repo := makeRepository(t, registry, "testrepo")
uploadRandomSchema1Image(t, repo)
// Simulate a missing _manifests directory
revPath, err := pathFor(manifestRevisionsPathSpec{"testrepo"})
if err != nil {
t.Fatal(err)
}
_manifestsPath := path.Dir(revPath)
err = d.Delete(ctx, _manifestsPath)
if err != nil {
t.Fatal(err)
}
err = MarkAndSweep(context.Background(), d, registry, false)
if err != nil {
t.Fatalf("Failed mark and sweep: %v", err)
}
blobs := allBlobs(t, registry)
if len(blobs) > 0 {
t.Errorf("unexpected blobs after gc")
}
}
func TestDeletionHasEffect(t *testing.T) {
ctx := context.Background()
inmemoryDriver := inmemory.New()
@ -193,7 +224,7 @@ func TestDeletionHasEffect(t *testing.T) {
manifests.Delete(ctx, image3.manifestDigest)
// Run GC
err = markAndSweep(context.Background(), inmemoryDriver, registry)
err = MarkAndSweep(context.Background(), inmemoryDriver, registry, false)
if err != nil {
t.Fatalf("Failed mark and sweep: %v", err)
}
@ -327,7 +358,7 @@ func TestOrphanBlobDeleted(t *testing.T) {
uploadRandomSchema2Image(t, repo)
// Run GC
err = markAndSweep(context.Background(), inmemoryDriver, registry)
err = MarkAndSweep(context.Background(), inmemoryDriver, registry, false)
if err != nil {
t.Fatalf("Failed mark and sweep: %v", err)
}