From 76226c61a910f635c583907858381000c8ba1a40 Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Wed, 5 Oct 2016 16:39:38 -0700 Subject: [PATCH] Fix S3 Delete method's notion of subpaths Deleting "/a" was deleting "/a/b" but also "/ab". Signed-off-by: Noah Treuhaft --- registry/storage/driver/s3-aws/s3.go | 8 ++- .../storage/driver/testsuites/testsuites.go | 49 ++++++++++++++++++- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go index b3c9d37c3..a4fde2355 100644 --- a/registry/storage/driver/s3-aws/s3.go +++ b/registry/storage/driver/s3-aws/s3.go @@ -784,10 +784,12 @@ func min(a, b int) int { // We must be careful since S3 does not guarantee read after delete consistency func (d *driver) Delete(ctx context.Context, path string) error { s3Objects := make([]*s3.ObjectIdentifier, 0, listMax) + s3Path := d.s3Path(path) listObjectsInput := &s3.ListObjectsInput{ Bucket: aws.String(d.Bucket), - Prefix: aws.String(d.s3Path(path)), + Prefix: aws.String(s3Path), } +ListLoop: for { // list all the objects resp, err := d.S3.ListObjects(listObjectsInput) @@ -800,6 +802,10 @@ func (d *driver) Delete(ctx context.Context, path string) error { } for _, key := range resp.Contents { + // Stop if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab"). + if len(*key.Key) > len(s3Path) && (*key.Key)[len(s3Path)] != '/' { + break ListLoop + } s3Objects = append(s3Objects, &s3.ObjectIdentifier{ Key: key.Key, }) diff --git a/registry/storage/driver/testsuites/testsuites.go b/registry/storage/driver/testsuites/testsuites.go index 87f9c6ace..d8afe0c85 100644 --- a/registry/storage/driver/testsuites/testsuites.go +++ b/registry/storage/driver/testsuites/testsuites.go @@ -15,9 +15,10 @@ import ( "testing" "time" + "gopkg.in/check.v1" + "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" - "gopkg.in/check.v1" ) // Test hooks up gocheck into the "go test" runner. @@ -717,6 +718,52 @@ func (suite *DriverSuite) TestDeleteFolder(c *check.C) { c.Assert(strings.Contains(err.Error(), suite.Name()), check.Equals, true) } +// TestDeleteOnlyDeletesSubpaths checks that deleting path A does not +// delete path B when A is a prefix of B but B is not a subpath of A (so that +// deleting "/a" does not delete "/ab"). This matters for services like S3 that +// do not implement directories. +func (suite *DriverSuite) TestDeleteOnlyDeletesSubpaths(c *check.C) { + dirname := randomPath(32) + filename := randomPath(32) + contents := randomContents(32) + + defer suite.deletePath(c, firstPart(dirname)) + + err := suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, filename), contents) + c.Assert(err, check.IsNil) + + err = suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, filename+"suffix"), contents) + c.Assert(err, check.IsNil) + + err = suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, dirname, filename), contents) + c.Assert(err, check.IsNil) + + err = suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, dirname+"suffix", filename), contents) + c.Assert(err, check.IsNil) + + err = suite.StorageDriver.Delete(suite.ctx, path.Join(dirname, filename)) + c.Assert(err, check.IsNil) + + _, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename)) + c.Assert(err, check.NotNil) + c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) + c.Assert(strings.Contains(err.Error(), suite.Name()), check.Equals, true) + + _, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename+"suffix")) + c.Assert(err, check.IsNil) + + err = suite.StorageDriver.Delete(suite.ctx, path.Join(dirname, dirname)) + c.Assert(err, check.IsNil) + + _, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, dirname, filename)) + c.Assert(err, check.NotNil) + c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) + c.Assert(strings.Contains(err.Error(), suite.Name()), check.Equals, true) + + _, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, dirname+"suffix", filename)) + c.Assert(err, check.IsNil) +} + // TestStatCall runs verifies the implementation of the storagedriver's Stat call. func (suite *DriverSuite) TestStatCall(c *check.C) { content := randomContents(4096)