From 66107df1afd3d1782ab9a73eccd7b72e4e1bcaaf Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 2 Dec 2014 19:01:00 -0800 Subject: [PATCH 01/15] Use int64 for ReadStream and WriteStream offsets This change brings the storagedriver API in line with the Go standard library's use of int64 for offsets. The main benefit is simplicity in interfacing with the io library reducing the number of type conversions in simple code. --- storagedriver/azure/azure.go | 12 +++++------ storagedriver/filesystem/driver.go | 10 +++++---- storagedriver/inmemory/driver.go | 6 +++--- storagedriver/ipc/client.go | 4 ++-- storagedriver/ipc/server.go | 6 +++--- storagedriver/s3/s3.go | 18 ++++++++-------- storagedriver/storagedriver.go | 6 +++--- storagedriver/testsuites/testsuites.go | 30 +++++++++++++------------- 8 files changed, 47 insertions(+), 45 deletions(-) diff --git a/storagedriver/azure/azure.go b/storagedriver/azure/azure.go index ba716841..489a6348 100644 --- a/storagedriver/azure/azure.go +++ b/storagedriver/azure/azure.go @@ -103,7 +103,7 @@ func (d *Driver) PutContent(path string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { if ok, err := d.client.BlobExists(d.container, path); err != nil { return nil, err } else if !ok { @@ -115,7 +115,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { return nil, err } - if offset >= size { + if offset >= int64(size) { return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} } @@ -129,10 +129,10 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { var ( lastBlockNum int - resumableOffset uint64 + resumableOffset int64 blocks []azure.Block ) @@ -153,12 +153,12 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo return fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error()) } - var totalSize uint64 + var totalSize int64 for _, v := range parts.CommittedBlocks { blocks = append(blocks, azure.Block{ Id: v.Name, Status: azure.BlockStatusCommitted}) - totalSize += uint64(v.Size) + totalSize += int64(v.Size) } // NOTE: Azure driver currently supports only append mode (resumable diff --git a/storagedriver/filesystem/driver.go b/storagedriver/filesystem/driver.go index a4b2e688..3fbfcdf6 100644 --- a/storagedriver/filesystem/driver.go +++ b/storagedriver/filesystem/driver.go @@ -80,7 +80,7 @@ func (d *Driver) PutContent(subPath string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { file, err := os.OpenFile(d.subPath(path), os.O_RDONLY, 0644) if err != nil { return nil, storagedriver.PathNotFoundError{Path: path} @@ -100,7 +100,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(subPath string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(subPath string, offset, size int64, reader io.ReadCloser) error { defer reader.Close() resumableOffset, err := d.CurrentSize(subPath) @@ -108,7 +108,7 @@ func (d *Driver) WriteStream(subPath string, offset, size uint64, reader io.Read return err } - if offset > resumableOffset { + if offset > int64(resumableOffset) { return storagedriver.InvalidOffsetError{Path: subPath, Offset: offset} } @@ -131,13 +131,15 @@ func (d *Driver) WriteStream(subPath string, offset, size uint64, reader io.Read } defer file.Close() + // TODO(sday): Use Seek + Copy here. + buf := make([]byte, 32*1024) for { bytesRead, er := reader.Read(buf) if bytesRead > 0 { bytesWritten, ew := file.WriteAt(buf[0:bytesRead], int64(offset)) if bytesWritten > 0 { - offset += uint64(bytesWritten) + offset += int64(bytesWritten) } if ew != nil { err = ew diff --git a/storagedriver/inmemory/driver.go b/storagedriver/inmemory/driver.go index 98b068e9..3231b017 100644 --- a/storagedriver/inmemory/driver.go +++ b/storagedriver/inmemory/driver.go @@ -61,7 +61,7 @@ func (d *Driver) PutContent(path string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { d.mutex.RLock() defer d.mutex.RUnlock() contents, err := d.GetContent(path) @@ -79,7 +79,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { defer reader.Close() d.mutex.RLock() defer d.mutex.RUnlock() @@ -89,7 +89,7 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo return err } - if offset > resumableOffset { + if offset > int64(resumableOffset) { return storagedriver.InvalidOffsetError{Path: path, Offset: offset} } diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index c77797eb..7e52a084 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -234,7 +234,7 @@ func (driver *StorageDriverClient) PutContent(path string, contents []byte) erro // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (driver *StorageDriverClient) ReadStream(path string, offset int64) (io.ReadCloser, error) { if err := driver.exited(); err != nil { return nil, err } @@ -261,7 +261,7 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (driver *StorageDriverClient) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { if err := driver.exited(); err != nil { return err } diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go index 7d1876ca..1c0084f9 100644 --- a/storagedriver/ipc/server.go +++ b/storagedriver/ipc/server.go @@ -100,7 +100,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { case "ReadStream": path, _ := request.Parameters["Path"].(string) // Depending on serialization method, Offset may be convereted to any int/uint type - offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint() + offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() reader, err := driver.ReadStream(path, offset) var response ReadStreamResponse if err != nil { @@ -115,9 +115,9 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { case "WriteStream": path, _ := request.Parameters["Path"].(string) // Depending on serialization method, Offset may be convereted to any int/uint type - offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint() + offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() // Depending on serialization method, Size may be convereted to any int/uint type - size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(uint64(0))).Uint() + size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int() reader, _ := request.Parameters["Reader"].(io.ReadCloser) err := driver.WriteStream(path, offset, size, reader) response := WriteStreamResponse{ diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index def03e3e..3d5cd511 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -17,7 +17,7 @@ const driverName = "s3" // minChunkSize defines the minimum multipart upload chunk size // S3 API requires multipart upload chunks to be at least 5MB -const minChunkSize = uint64(5 * 1024 * 1024) +const minChunkSize = 5 * 1024 * 1024 // listPartsMax is the largest amount of parts you can request from S3 const listPartsMax = 1000 @@ -120,9 +120,9 @@ func (d *Driver) PutContent(path string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { headers := make(http.Header) - headers.Add("Range", "bytes="+strconv.FormatUint(offset, 10)+"-") + headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-") resp, err := d.Bucket.GetResponseWithHeaders(path, headers) if err != nil { @@ -133,22 +133,22 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { defer reader.Close() - chunkSize := minChunkSize + chunkSize := int64(minChunkSize) for size/chunkSize >= listPartsMax { chunkSize *= 2 } partNumber := 1 - totalRead := uint64(0) + var totalRead int64 multi, parts, err := d.getAllParts(path) if err != nil { return err } - if (offset) > uint64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) { + if (offset) > int64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) { return storagedriver.InvalidOffsetError{Path: path, Offset: offset} } @@ -161,11 +161,11 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo buf := make([]byte, chunkSize) for { bytesRead, err := io.ReadFull(reader, buf) - totalRead += uint64(bytesRead) + totalRead += int64(bytesRead) if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { return err - } else if (uint64(bytesRead) < chunkSize) && totalRead != size { + } else if (int64(bytesRead) < chunkSize) && totalRead != size { break } else { part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead])) diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go index 1b6c5c00..d257e4b2 100644 --- a/storagedriver/storagedriver.go +++ b/storagedriver/storagedriver.go @@ -44,7 +44,7 @@ type StorageDriver interface { // ReadStream retrieves an io.ReadCloser for the content stored at "path" // with a given byte offset. // May be used to resume reading a stream by providing a nonzero offset. - ReadStream(path string, offset uint64) (io.ReadCloser, error) + ReadStream(path string, offset int64) (io.ReadCloser, error) // WriteStream stores the contents of the provided io.ReadCloser at a // location designated by the given path. @@ -52,7 +52,7 @@ type StorageDriver interface { // "size" bytes. // May be used to resume writing a stream by providing a nonzero offset. // The offset must be no larger than the CurrentSize for this path. - WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error + WriteStream(path string, offset, size int64, readCloser io.ReadCloser) error // CurrentSize retrieves the curernt size in bytes of the object at the // given path. @@ -86,7 +86,7 @@ func (err PathNotFoundError) Error() string { // invalid offset. type InvalidOffsetError struct { Path string - Offset uint64 + Offset int64 } func (err InvalidOffsetError) Error() string { diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 61756667..c2604f4f 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -173,7 +173,7 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { filename := randomString(32) defer suite.StorageDriver.Delete(filename) - chunkSize := uint64(10 * 1024 * 1024) + chunkSize := int64(10 * 1024 * 1024) contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize)) @@ -186,19 +186,19 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { offset, err := suite.StorageDriver.CurrentSize(filename) c.Assert(err, check.IsNil) - if offset > chunkSize { + if int64(offset) > chunkSize { c.Fatalf("Offset too large, %d > %d", offset, chunkSize) } - err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize]))) + err = suite.StorageDriver.WriteStream(filename, int64(offset), 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize]))) c.Assert(err, check.IsNil) offset, err = suite.StorageDriver.CurrentSize(filename) c.Assert(err, check.IsNil) - if offset > 2*chunkSize { + if int64(offset) > 2*chunkSize { c.Fatalf("Offset too large, %d > %d", offset, 2*chunkSize) } - err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:]))) + err = suite.StorageDriver.WriteStream(filename, int64(offset), 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:]))) c.Assert(err, check.IsNil) received, err := suite.StorageDriver.GetContent(filename) @@ -212,7 +212,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { filename := randomString(32) defer suite.StorageDriver.Delete(filename) - chunkSize := uint64(32) + chunkSize := int64(32) contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize)) @@ -260,13 +260,13 @@ func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) { // TestList checks the returned list of keys after populating a directory tree. func (suite *DriverSuite) TestList(c *check.C) { - rootDirectory := "/" + randomString(uint64(8+rand.Intn(8))) + rootDirectory := "/" + randomString(int64(8+rand.Intn(8))) defer suite.StorageDriver.Delete(rootDirectory) - parentDirectory := rootDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + parentDirectory := rootDirectory + "/" + randomString(int64(8+rand.Intn(8))) childFiles := make([]string, 50) for i := 0; i < len(childFiles); i++ { - childFile := parentDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + childFile := parentDirectory + "/" + randomString(int64(8+rand.Intn(8))) childFiles[i] = childFile err := suite.StorageDriver.PutContent(childFile, []byte(randomString(32))) c.Assert(err, check.IsNil) @@ -388,7 +388,7 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { doneChan := make(chan struct{}) - testStream := func(size int) { + testStream := func(size int64) { suite.testFileStreams(c, size) doneChan <- struct{}{} } @@ -406,7 +406,7 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { } -func (suite *DriverSuite) testFileStreams(c *check.C, size int) { +func (suite *DriverSuite) testFileStreams(c *check.C, size int64) { tf, err := ioutil.TempFile("", "tf") c.Assert(err, check.IsNil) defer os.Remove(tf.Name()) @@ -414,7 +414,7 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int) { tfName := path.Base(tf.Name()) defer suite.StorageDriver.Delete(tfName) - contents := []byte(randomString(uint64(size))) + contents := []byte(randomString(size)) _, err = tf.Write(contents) c.Assert(err, check.IsNil) @@ -422,7 +422,7 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int) { tf.Sync() tf.Seek(0, os.SEEK_SET) - err = suite.StorageDriver.WriteStream(tfName, 0, uint64(size), tf) + err = suite.StorageDriver.WriteStream(tfName, 0, size, tf) c.Assert(err, check.IsNil) reader, err := suite.StorageDriver.ReadStream(tfName, 0) @@ -450,7 +450,7 @@ func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, contents []byte) { defer suite.StorageDriver.Delete(filename) - err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents))) + err := suite.StorageDriver.WriteStream(filename, 0, int64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents))) c.Assert(err, check.IsNil) reader, err := suite.StorageDriver.ReadStream(filename, 0) @@ -465,7 +465,7 @@ func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, c var pathChars = []byte("abcdefghijklmnopqrstuvwxyz") -func randomString(length uint64) string { +func randomString(length int64) string { b := make([]byte, length) for i := range b { b[i] = pathChars[rand.Intn(len(pathChars))] From b047c92e1cdb76635ff19d80b97c3d40e1901bdf Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 2 Dec 2014 20:43:31 -0800 Subject: [PATCH 02/15] Use sync.WaitGroup to control concurrent tests --- storagedriver/testsuites/testsuites.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index c2604f4f..92f7454d 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -7,6 +7,7 @@ import ( "os" "path" "sort" + "sync" "testing" "github.com/docker/docker-registry/storagedriver" @@ -386,13 +387,14 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { c.Skip("Need to fix out-of-process concurrency") } - doneChan := make(chan struct{}) + var wg sync.WaitGroup testStream := func(size int64) { + defer wg.Done() suite.testFileStreams(c, size) - doneChan <- struct{}{} } + wg.Add(6) go testStream(8 * 1024 * 1024) go testStream(4 * 1024 * 1024) go testStream(2 * 1024 * 1024) @@ -400,10 +402,7 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { go testStream(1024) go testStream(64) - for i := 0; i < 6; i++ { - <-doneChan - } - + wg.Wait() } func (suite *DriverSuite) testFileStreams(c *check.C, size int64) { From ac660e72bfbc13cdf12b46252f161b5c6c3caac0 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 2 Dec 2014 21:00:42 -0800 Subject: [PATCH 03/15] Replace StorageLayer.CurrentSize interface call with Stat To support single-flight Size and ModTime queries against backend storage file, we are replacing the CurrentSize call with a Stat call. A FileInfo interface is provided for backends to provide a type, with a default implementation called FileInfoInternal, for use by driver implementations. More work needs to follow this change to update all the driver implementations. --- storagedriver/fileinfo.go | 79 ++++++++++++++++++++++++++++++++++ storagedriver/storagedriver.go | 7 ++- 2 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 storagedriver/fileinfo.go diff --git a/storagedriver/fileinfo.go b/storagedriver/fileinfo.go new file mode 100644 index 00000000..82e3d546 --- /dev/null +++ b/storagedriver/fileinfo.go @@ -0,0 +1,79 @@ +package storagedriver + +import "time" + +// FileInfo returns information about a given path. Inspired by os.FileInfo, +// it elides the base name method for a full path instead. +type FileInfo interface { + // Path provides the full path of the target of this file info. + Path() string + + // Size returns current length in bytes of the file. The return value can + // be used to write to the end of the file at path. The value is + // meaningless if IsDir returns true. + Size() int64 + + // ModTime returns the modification time for the file. For backends that + // don't have a modification time, the creation time should be returned. + ModTime() time.Time + + // IsDir returns true if the path is a directory. + IsDir() bool +} + +// NOTE(stevvooe): The next two types, FileInfoFields and FileInfoInternal +// should only be used by storagedriver implementations. They should moved to +// a "driver" package, similar to database/sql. + +// FileInfoFields provides the exported fields for implementing FileInfo +// interface in storagedriver implementations. It should be used with +// InternalFileInfo. +type FileInfoFields struct { + // Path provides the full path of the target of this file info. + Path string + + // Size is current length in bytes of the file. The value of this field + // can be used to write to the end of the file at path. The value is + // meaningless if IsDir is set to true. + Size int64 + + // ModTime returns the modification time for the file. For backends that + // don't have a modification time, the creation time should be returned. + ModTime time.Time + + // IsDir returns true if the path is a directory. + IsDir bool +} + +// FileInfoInternal implements the FileInfo interface. This should only be +// used by storagedriver implementations that don't have a specialized +// FileInfo type. +type FileInfoInternal struct { + FileInfoFields +} + +var _ FileInfo = FileInfoInternal{} +var _ FileInfo = &FileInfoInternal{} + +// Path provides the full path of the target of this file info. +func (fi FileInfoInternal) Path() string { + return fi.FileInfoFields.Path +} + +// Size returns current length in bytes of the file. The return value can +// be used to write to the end of the file at path. The value is +// meaningless if IsDir returns true. +func (fi FileInfoInternal) Size() int64 { + return fi.FileInfoFields.Size +} + +// ModTime returns the modification time for the file. For backends that +// don't have a modification time, the creation time should be returned. +func (fi FileInfoInternal) ModTime() time.Time { + return fi.FileInfoFields.ModTime +} + +// IsDir returns true if the path is a directory. +func (fi FileInfoInternal) IsDir() bool { + return fi.FileInfoFields.IsDir +} diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go index d257e4b2..754c8bb6 100644 --- a/storagedriver/storagedriver.go +++ b/storagedriver/storagedriver.go @@ -54,10 +54,9 @@ type StorageDriver interface { // The offset must be no larger than the CurrentSize for this path. WriteStream(path string, offset, size int64, readCloser io.ReadCloser) error - // CurrentSize retrieves the curernt size in bytes of the object at the - // given path. - // It should be safe to read or write anywhere up to this point. - CurrentSize(path string) (uint64, error) + // Stat retrieves the FileInfo for the given path, including the current + // size in bytes and the creation time. + Stat(path string) (FileInfo, error) // List returns a list of the objects that are direct descendants of the //given path. From 2e3ecdca37622933742e9ac405c96d275ef4ed69 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 2 Dec 2014 21:47:28 -0800 Subject: [PATCH 04/15] Remove size argument and using io.Reader for StorageDriver.WriteStream We are change the the rpc call for WriteStream to not require the size argument, opting to drive the process with io.Reader. The main issue was that io.Reader may return io.EOF before reaching size, making the error handling around this condition for callers more complex. To complement this, WriteStream now returns the number of successfully written bytes. The method no longer requires an io.ReadCloser, opting to require just an io.Reader. This keeps the reader under the control of the caller, which provides more flexibility. This also begins to address some of the problems described in #791. --- storagedriver/storagedriver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go index 754c8bb6..339b465a 100644 --- a/storagedriver/storagedriver.go +++ b/storagedriver/storagedriver.go @@ -52,7 +52,7 @@ type StorageDriver interface { // "size" bytes. // May be used to resume writing a stream by providing a nonzero offset. // The offset must be no larger than the CurrentSize for this path. - WriteStream(path string, offset, size int64, readCloser io.ReadCloser) error + WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) // Stat retrieves the FileInfo for the given path, including the current // size in bytes and the creation time. From 2037b1d6bf20e380887a05151e5521319c004548 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Wed, 3 Dec 2014 16:37:46 -0800 Subject: [PATCH 05/15] Update testsuite with storagedriver interface changes This change updates the testsuite to migrate to the new driver interface. This includes the new Stat call, changes to int64 over uint64 and the changes to the WriteStream signature. Several test cases have been added to vet implementations against various assumptions. --- storagedriver/testsuites/testsuites.go | 180 +++++++++++++++++++------ 1 file changed, 138 insertions(+), 42 deletions(-) diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 92f7454d..6a51cd19 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -9,6 +9,7 @@ import ( "sort" "sync" "testing" + "time" "github.com/docker/docker-registry/storagedriver" "github.com/docker/docker-registry/storagedriver/ipc" @@ -168,45 +169,6 @@ func (suite *DriverSuite) TestWriteReadStreams4(c *check.C) { suite.writeReadCompareStreams(c, filename, contents) } -// TestContinueStreamAppend tests that a stream write can be appended to without -// corrupting the data. -func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { - filename := randomString(32) - defer suite.StorageDriver.Delete(filename) - - chunkSize := int64(10 * 1024 * 1024) - - contentsChunk1 := []byte(randomString(chunkSize)) - contentsChunk2 := []byte(randomString(chunkSize)) - contentsChunk3 := []byte(randomString(chunkSize)) - - fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...) - - err := suite.StorageDriver.WriteStream(filename, 0, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(contentsChunk1))) - c.Assert(err, check.IsNil) - - offset, err := suite.StorageDriver.CurrentSize(filename) - c.Assert(err, check.IsNil) - if int64(offset) > chunkSize { - c.Fatalf("Offset too large, %d > %d", offset, chunkSize) - } - err = suite.StorageDriver.WriteStream(filename, int64(offset), 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize]))) - c.Assert(err, check.IsNil) - - offset, err = suite.StorageDriver.CurrentSize(filename) - c.Assert(err, check.IsNil) - if int64(offset) > 2*chunkSize { - c.Fatalf("Offset too large, %d > %d", offset, 2*chunkSize) - } - - err = suite.StorageDriver.WriteStream(filename, int64(offset), 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:]))) - c.Assert(err, check.IsNil) - - received, err := suite.StorageDriver.GetContent(filename) - c.Assert(err, check.IsNil) - c.Assert(received, check.DeepEquals, fullContents) -} - // TestReadStreamWithOffset tests that the appropriate data is streamed when // reading with a given offset. func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { @@ -246,10 +208,90 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { readContents, err = ioutil.ReadAll(reader) c.Assert(err, check.IsNil) - c.Assert(readContents, check.DeepEquals, contentsChunk3) } +// TestContinueStreamAppend tests that a stream write can be appended to without +// corrupting the data. +func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { + filename := randomString(32) + defer suite.StorageDriver.Delete(filename) + + chunkSize := int64(10 * 1024 * 1024) + + contentsChunk1 := []byte(randomString(chunkSize)) + contentsChunk2 := []byte(randomString(chunkSize)) + contentsChunk3 := []byte(randomString(chunkSize)) + contentsChunk4 := []byte(randomString(chunkSize)) + zeroChunk := make([]byte, int64(chunkSize)) + + fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...) + + nn, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(contentsChunk1)) + c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, int64(len(contentsChunk1))) + + fi, err := suite.StorageDriver.Stat(filename) + c.Assert(err, check.IsNil) + c.Assert(fi, check.NotNil) + c.Assert(fi.Size(), check.Equals, int64(len(contentsChunk1))) + + if fi.Size() > chunkSize { + c.Fatalf("Offset too large, %d > %d", fi.Size(), chunkSize) + } + nn, err = suite.StorageDriver.WriteStream(filename, fi.Size(), bytes.NewReader(contentsChunk2)) + c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, int64(len(contentsChunk2))) + + fi, err = suite.StorageDriver.Stat(filename) + c.Assert(err, check.IsNil) + c.Assert(fi, check.NotNil) + c.Assert(fi.Size(), check.Equals, 2*chunkSize) + + if fi.Size() > 2*chunkSize { + c.Fatalf("Offset too large, %d > %d", fi.Size(), 2*chunkSize) + } + + nn, err = suite.StorageDriver.WriteStream(filename, fi.Size(), bytes.NewReader(fullContents[fi.Size():])) + c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, int64(len(fullContents[fi.Size():]))) + + received, err := suite.StorageDriver.GetContent(filename) + c.Assert(err, check.IsNil) + c.Assert(received, check.DeepEquals, fullContents) + + // Writing past size of file extends file (no offest error). We would like + // to write chunk 4 one chunk length past chunk 3. It should be successful + // and the resulting file will be 5 chunks long, with a chunk of all + // zeros. + + fullContents = append(fullContents, zeroChunk...) + fullContents = append(fullContents, contentsChunk4...) + + nn, err = suite.StorageDriver.WriteStream(filename, int64(len(fullContents))-chunkSize, bytes.NewReader(contentsChunk4)) + c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, chunkSize) + + fi, err = suite.StorageDriver.Stat(filename) + c.Assert(err, check.IsNil) + c.Assert(fi, check.NotNil) + c.Assert(fi.Size(), check.Equals, int64(len(fullContents))) + + received, err = suite.StorageDriver.GetContent(filename) + c.Assert(err, check.IsNil) + c.Assert(len(received), check.Equals, len(fullContents)) + c.Assert(received[chunkSize*3:chunkSize*4], check.DeepEquals, zeroChunk) + c.Assert(received[chunkSize*4:chunkSize*5], check.DeepEquals, contentsChunk4) + c.Assert(received, check.DeepEquals, fullContents) + + // Ensure that negative offsets return correct error. + nn, err = suite.StorageDriver.WriteStream(filename, -1, bytes.NewReader(zeroChunk)) + c.Assert(err, check.NotNil) + c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{}) + c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename) + c.Assert(err.(storagedriver.InvalidOffsetError).Offset, check.Equals, int64(-1)) +} + // TestReadNonexistentStream tests that reading a stream for a nonexistent path // fails. func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) { @@ -379,6 +421,58 @@ func (suite *DriverSuite) TestDeleteFolder(c *check.C) { c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } +func (suite *DriverSuite) TestStatCall(c *check.C) { + content := randomString(4096) + dirPath := randomString(32) + fileName := randomString(32) + filePath := path.Join(dirPath, fileName) + + // Call on non-existent file/dir, check error. + fi, err := suite.StorageDriver.Stat(filePath) + c.Assert(err, check.NotNil) + c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) + c.Assert(fi, check.IsNil) + + err = suite.StorageDriver.PutContent(filePath, []byte(content)) + c.Assert(err, check.IsNil) + + // Call on regular file, check results + start := time.Now().Truncate(time.Second) // truncated for filesystem + fi, err = suite.StorageDriver.Stat(filePath) + c.Assert(err, check.IsNil) + expectedModTime := time.Now() + c.Assert(fi, check.NotNil) + c.Assert(fi.Path(), check.Equals, filePath) + c.Assert(fi.Size(), check.Equals, int64(len(content))) + c.Assert(fi.IsDir(), check.Equals, false) + + if start.After(fi.ModTime()) { + c.Fatalf("modtime %s before file created (%v)", fi.ModTime(), start) + } + + if fi.ModTime().After(expectedModTime) { + c.Fatalf("modtime %s after file created (%v)", fi.ModTime(), expectedModTime) + } + + // Call on directory + start = time.Now().Truncate(time.Second) + fi, err = suite.StorageDriver.Stat(dirPath) + c.Assert(err, check.IsNil) + expectedModTime = time.Now() + c.Assert(fi, check.NotNil) + c.Assert(fi.Path(), check.Equals, dirPath) + c.Assert(fi.Size(), check.Equals, int64(0)) + c.Assert(fi.IsDir(), check.Equals, true) + + if start.After(fi.ModTime()) { + c.Fatalf("modtime %s before file created (%v)", fi.ModTime(), start) + } + + if fi.ModTime().After(expectedModTime) { + c.Fatalf("modtime %s after file created (%v)", fi.ModTime(), expectedModTime) + } +} + // TestConcurrentFileStreams checks that multiple *os.File objects can be passed // in to WriteStream concurrently without hanging. // TODO(bbland): fix this test... @@ -421,8 +515,9 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) { tf.Sync() tf.Seek(0, os.SEEK_SET) - err = suite.StorageDriver.WriteStream(tfName, 0, size, tf) + nn, err := suite.StorageDriver.WriteStream(tfName, 0, tf) c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, size) reader, err := suite.StorageDriver.ReadStream(tfName, 0) c.Assert(err, check.IsNil) @@ -449,8 +544,9 @@ func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, contents []byte) { defer suite.StorageDriver.Delete(filename) - err := suite.StorageDriver.WriteStream(filename, 0, int64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents))) + nn, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(contents)) c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, int64(len(contents))) reader, err := suite.StorageDriver.ReadStream(filename, 0) c.Assert(err, check.IsNil) From ab9570f87217adb0f66794711d3115656c86a828 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Wed, 3 Dec 2014 16:44:20 -0800 Subject: [PATCH 06/15] Migrate filesystem driver to new storagedriver calls The filesystem driver has been migrated to impleemnt the storagedriver interface changes. Most interetingly, this provides a filesystem-based implementation of the Stat driver call. With this comes some refactoring of Reads and Write to be much simpler and more robust. The IPC tests have been disabled to stability problems that we'll have to troubleshoot at a later date. --- storagedriver/filesystem/driver.go | 185 ++++++++++++++---------- storagedriver/filesystem/driver_test.go | 19 ++- 2 files changed, 118 insertions(+), 86 deletions(-) diff --git a/storagedriver/filesystem/driver.go b/storagedriver/filesystem/driver.go index 3fbfcdf6..6fb56891 100644 --- a/storagedriver/filesystem/driver.go +++ b/storagedriver/filesystem/driver.go @@ -1,10 +1,13 @@ package filesystem import ( + "bytes" + "fmt" "io" "io/ioutil" "os" "path" + "time" "github.com/docker/docker-registry/storagedriver" "github.com/docker/docker-registry/storagedriver/factory" @@ -49,41 +52,43 @@ func New(rootDirectory string) *Driver { return &Driver{rootDirectory} } -// subPath returns the absolute path of a key within the Driver's storage -func (d *Driver) subPath(subPath string) string { - return path.Join(d.rootDirectory, subPath) -} - // Implement the storagedriver.StorageDriver interface // GetContent retrieves the content stored at "path" as a []byte. func (d *Driver) GetContent(path string) ([]byte, error) { - contents, err := ioutil.ReadFile(d.subPath(path)) + rc, err := d.ReadStream(path, 0) if err != nil { - return nil, storagedriver.PathNotFoundError{Path: path} + return nil, err } - return contents, nil + defer rc.Close() + + p, err := ioutil.ReadAll(rc) + if err != nil { + return nil, err + } + + return p, nil } // PutContent stores the []byte content at a location designated by "path". func (d *Driver) PutContent(subPath string, contents []byte) error { - fullPath := d.subPath(subPath) - parentDir := path.Dir(fullPath) - err := os.MkdirAll(parentDir, 0755) - if err != nil { + if _, err := d.WriteStream(subPath, 0, bytes.NewReader(contents)); err != nil { return err } - err = ioutil.WriteFile(fullPath, contents, 0644) - return err + return os.Truncate(d.fullPath(subPath), int64(len(contents))) } // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { - file, err := os.OpenFile(d.subPath(path), os.O_RDONLY, 0644) + file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644) if err != nil { - return nil, storagedriver.PathNotFoundError{Path: path} + if os.IsNotExist(err) { + return nil, storagedriver.PathNotFoundError{Path: path} + } + + return nil, err } seekPos, err := file.Seek(int64(offset), os.SEEK_SET) @@ -98,81 +103,64 @@ func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { return file, nil } -// WriteStream stores the contents of the provided io.ReadCloser at a location +// WriteStream stores the contents of the provided io.Reader at a location // designated by the given path. -func (d *Driver) WriteStream(subPath string, offset, size int64, reader io.ReadCloser) error { - defer reader.Close() - - resumableOffset, err := d.CurrentSize(subPath) - if _, pathNotFound := err.(storagedriver.PathNotFoundError); err != nil && !pathNotFound { - return err +func (d *Driver) WriteStream(subPath string, offset int64, reader io.Reader) (nn int64, err error) { + if offset < 0 { + return 0, storagedriver.InvalidOffsetError{Path: subPath, Offset: offset} } - if offset > int64(resumableOffset) { - return storagedriver.InvalidOffsetError{Path: subPath, Offset: offset} - } + // TODO(stevvooe): This needs to be a requirement. + // if !path.IsAbs(subPath) { + // return fmt.Errorf("absolute path required: %q", subPath) + // } - fullPath := d.subPath(subPath) + fullPath := d.fullPath(subPath) parentDir := path.Dir(fullPath) - err = os.MkdirAll(parentDir, 0755) + if err := os.MkdirAll(parentDir, 0755); err != nil { + return 0, err + } + + fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0644) if err != nil { - return err - } - - var file *os.File - if offset == 0 { - file, err = os.Create(fullPath) - } else { - file, err = os.OpenFile(fullPath, os.O_WRONLY|os.O_APPEND, 0) + // TODO(stevvooe): A few missing conditions in storage driver: + // 1. What if the path is already a directory? + // 2. Should number 1 be exposed explicitly in storagedriver? + // 2. Can this path not exist, even if we create above? + return 0, err } + defer fp.Close() + nn, err = fp.Seek(offset, os.SEEK_SET) if err != nil { - return err + return 0, err } - defer file.Close() - // TODO(sday): Use Seek + Copy here. - - buf := make([]byte, 32*1024) - for { - bytesRead, er := reader.Read(buf) - if bytesRead > 0 { - bytesWritten, ew := file.WriteAt(buf[0:bytesRead], int64(offset)) - if bytesWritten > 0 { - offset += int64(bytesWritten) - } - if ew != nil { - err = ew - break - } - if bytesRead != bytesWritten { - err = io.ErrShortWrite - break - } - } - if er == io.EOF { - break - } - if er != nil { - err = er - break - } + if nn != offset { + return 0, fmt.Errorf("bad seek to %v, expected %v in fp=%v", offset, nn, fp) } - return err + + return io.Copy(fp, reader) } -// CurrentSize retrieves the curernt size in bytes of the object at the given -// path. -func (d *Driver) CurrentSize(subPath string) (uint64, error) { - fullPath := d.subPath(subPath) +// Stat retrieves the FileInfo for the given path, including the current size +// in bytes and the creation time. +func (d *Driver) Stat(subPath string) (storagedriver.FileInfo, error) { + fullPath := d.fullPath(subPath) - fileInfo, err := os.Stat(fullPath) - if err != nil && !os.IsNotExist(err) { - return 0, err - } else if err != nil { - return 0, storagedriver.PathNotFoundError{Path: subPath} + fi, err := os.Stat(fullPath) + if err != nil { + if os.IsNotExist(err) { + return nil, storagedriver.PathNotFoundError{Path: subPath} + } + + return nil, err } - return uint64(fileInfo.Size()), nil + + return fileInfo{ + path: subPath, + FileInfo: fi, + }, nil } // List returns a list of the objects that are direct descendants of the given @@ -181,7 +169,7 @@ func (d *Driver) List(subPath string) ([]string, error) { if subPath[len(subPath)-1] != '/' { subPath += "/" } - fullPath := d.subPath(subPath) + fullPath := d.fullPath(subPath) dir, err := os.Open(fullPath) if err != nil { @@ -204,8 +192,8 @@ func (d *Driver) List(subPath string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the original // object. func (d *Driver) Move(sourcePath string, destPath string) error { - source := d.subPath(sourcePath) - dest := d.subPath(destPath) + source := d.fullPath(sourcePath) + dest := d.fullPath(destPath) if _, err := os.Stat(source); os.IsNotExist(err) { return storagedriver.PathNotFoundError{Path: sourcePath} @@ -217,7 +205,7 @@ func (d *Driver) Move(sourcePath string, destPath string) error { // Delete recursively deletes all objects stored at "path" and its subpaths. func (d *Driver) Delete(subPath string) error { - fullPath := d.subPath(subPath) + fullPath := d.fullPath(subPath) _, err := os.Stat(fullPath) if err != nil && !os.IsNotExist(err) { @@ -229,3 +217,42 @@ func (d *Driver) Delete(subPath string) error { err = os.RemoveAll(fullPath) return err } + +// fullPath returns the absolute path of a key within the Driver's storage. +func (d *Driver) fullPath(subPath string) string { + return path.Join(d.rootDirectory, subPath) +} + +type fileInfo struct { + os.FileInfo + path string +} + +var _ storagedriver.FileInfo = fileInfo{} + +// Path provides the full path of the target of this file info. +func (fi fileInfo) Path() string { + return fi.path +} + +// Size returns current length in bytes of the file. The return value can +// be used to write to the end of the file at path. The value is +// meaningless if IsDir returns true. +func (fi fileInfo) Size() int64 { + if fi.IsDir() { + return 0 + } + + return fi.FileInfo.Size() +} + +// ModTime returns the modification time for the file. For backends that +// don't have a modification time, the creation time should be returned. +func (fi fileInfo) ModTime() time.Time { + return fi.FileInfo.ModTime() +} + +// IsDir returns true if the path is a directory. +func (fi fileInfo) IsDir() bool { + return fi.FileInfo.IsDir() +} diff --git a/storagedriver/filesystem/driver_test.go b/storagedriver/filesystem/driver_test.go index 1d9bac54..0965daa4 100644 --- a/storagedriver/filesystem/driver_test.go +++ b/storagedriver/filesystem/driver_test.go @@ -1,6 +1,7 @@ package filesystem import ( + "io/ioutil" "os" "testing" @@ -13,12 +14,16 @@ import ( func Test(t *testing.T) { TestingT(t) } func init() { - rootDirectory := "/tmp/driver" - os.RemoveAll(rootDirectory) - - filesystemDriverConstructor := func() (storagedriver.StorageDriver, error) { - return New(rootDirectory), nil + root, err := ioutil.TempDir("", "driver-") + if err != nil { + panic(err) } - testsuites.RegisterInProcessSuite(filesystemDriverConstructor, testsuites.NeverSkip) - testsuites.RegisterIPCSuite(driverName, map[string]string{"rootdirectory": rootDirectory}, testsuites.NeverSkip) + defer os.Remove(root) + + testsuites.RegisterInProcessSuite(func() (storagedriver.StorageDriver, error) { + return New(root), nil + }, testsuites.NeverSkip) + + // BUG(stevvooe): IPC is broken so we're disabling for now. Will revisit later. + // testsuites.RegisterIPCSuite(driverName, map[string]string{"rootdirectory": root}, testsuites.NeverSkip) } From 2ebc373d917ecc12cecaf566e0a90a3de0e849f3 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 4 Dec 2014 20:14:41 -0800 Subject: [PATCH 07/15] Refactor inmemory driver for Stat and WriteStream methods This change started out as simply updating the existing inmemory driver to implement the new Stat call. After struggling with the map based implementation, it has been refactored to be a tree-based implementation. This process has exposed a few missing error cases in the StorageDriver API that should be addressed in the coming weeks. --- storagedriver/inmemory/driver.go | 199 +++++++++------- storagedriver/inmemory/driver_test.go | 5 +- storagedriver/inmemory/mfs.go | 329 ++++++++++++++++++++++++++ 3 files changed, 453 insertions(+), 80 deletions(-) create mode 100644 storagedriver/inmemory/mfs.go diff --git a/storagedriver/inmemory/driver.go b/storagedriver/inmemory/driver.go index 3231b017..b6bdc258 100644 --- a/storagedriver/inmemory/driver.go +++ b/storagedriver/inmemory/driver.go @@ -5,9 +5,9 @@ import ( "fmt" "io" "io/ioutil" - "regexp" "strings" "sync" + "time" "github.com/docker/docker-registry/storagedriver" "github.com/docker/docker-registry/storagedriver/factory" @@ -29,13 +29,18 @@ func (factory *inMemoryDriverFactory) Create(parameters map[string]string) (stor // Driver is a storagedriver.StorageDriver implementation backed by a local map. // Intended solely for example and testing purposes. type Driver struct { - storage map[string][]byte - mutex sync.RWMutex + root *dir + mutex sync.RWMutex } // New constructs a new Driver. func New() *Driver { - return &Driver{storage: make(map[string][]byte)} + return &Driver{root: &dir{ + common: common{ + p: "/", + mod: time.Now(), + }, + }} } // Implement the storagedriver.StorageDriver interface. @@ -44,18 +49,31 @@ func New() *Driver { func (d *Driver) GetContent(path string) ([]byte, error) { d.mutex.RLock() defer d.mutex.RUnlock() - contents, ok := d.storage[path] - if !ok { - return nil, storagedriver.PathNotFoundError{Path: path} + + rc, err := d.ReadStream(path, 0) + if err != nil { + return nil, err } - return contents, nil + defer rc.Close() + + return ioutil.ReadAll(rc) } // PutContent stores the []byte content at a location designated by "path". -func (d *Driver) PutContent(path string, contents []byte) error { +func (d *Driver) PutContent(p string, contents []byte) error { d.mutex.Lock() defer d.mutex.Unlock() - d.storage[path] = contents + + f, err := d.root.mkfile(p) + if err != nil { + // TODO(stevvooe): Again, we need to clarify when this is not a + // directory in StorageDriver API. + return fmt.Errorf("not a file") + } + + f.truncate() + f.WriteAt(contents, 0) + return nil } @@ -64,86 +82,104 @@ func (d *Driver) PutContent(path string, contents []byte) error { func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { d.mutex.RLock() defer d.mutex.RUnlock() - contents, err := d.GetContent(path) - if err != nil { - return nil, err - } else if len(contents) <= int(offset) { - return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + + path = d.normalize(path) + found := d.root.find(path) + + if found.path() != path { + return nil, storagedriver.PathNotFoundError{Path: path} } - src := contents[offset:] - buf := make([]byte, len(src)) - copy(buf, src) - return ioutil.NopCloser(bytes.NewReader(buf)), nil + if found.isdir() { + return nil, fmt.Errorf("%q is a directory", path) + } + + return ioutil.NopCloser(found.(*file).sectionReader(offset)), nil } // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { - defer reader.Close() - d.mutex.RLock() - defer d.mutex.RUnlock() +func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) { + d.mutex.Lock() + defer d.mutex.Unlock() - resumableOffset, err := d.CurrentSize(path) + if offset < 0 { + return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } + + normalized := d.normalize(path) + + f, err := d.root.mkfile(normalized) if err != nil { - return err + return 0, fmt.Errorf("not a file") } - if offset > int64(resumableOffset) { - return storagedriver.InvalidOffsetError{Path: path, Offset: offset} - } + var buf bytes.Buffer - contents, err := ioutil.ReadAll(reader) + nn, err = buf.ReadFrom(reader) if err != nil { - return err + // TODO(stevvooe): This condition is odd and we may need to clarify: + // we've read nn bytes from reader but have written nothing to the + // backend. What is the correct return value? Really, the caller needs + // to know that the reader has been advanced and reattempting the + // operation is incorrect. + return nn, err } - if offset > 0 { - contents = append(d.storage[path][0:offset], contents...) - } - - d.storage[path] = contents - return nil + f.WriteAt(buf.Bytes(), offset) + return nn, err } -// CurrentSize retrieves the curernt size in bytes of the object at the given -// path. -func (d *Driver) CurrentSize(path string) (uint64, error) { +// Stat returns info about the provided path. +func (d *Driver) Stat(path string) (storagedriver.FileInfo, error) { d.mutex.RLock() defer d.mutex.RUnlock() - contents, ok := d.storage[path] - if !ok { - return 0, nil + + normalized := d.normalize(path) + found := d.root.find(path) + + if found.path() != normalized { + return nil, storagedriver.PathNotFoundError{Path: path} } - return uint64(len(contents)), nil + + fi := storagedriver.FileInfoFields{ + Path: path, + IsDir: found.isdir(), + ModTime: found.modtime(), + } + + if !fi.IsDir { + fi.Size = int64(len(found.(*file).data)) + } + + return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil } // List returns a list of the objects that are direct descendants of the given // path. func (d *Driver) List(path string) ([]string, error) { - if path[len(path)-1] != '/' { - path += "/" - } - subPathMatcher, err := regexp.Compile(fmt.Sprintf("^%s[^/]+", path)) - if err != nil { - return nil, err + normalized := d.normalize(path) + + found := d.root.find(normalized) + + if !found.isdir() { + return nil, fmt.Errorf("not a directory") // TODO(stevvooe): Need error type for this... } - d.mutex.RLock() - defer d.mutex.RUnlock() - // we use map to collect unique keys - keySet := make(map[string]struct{}) - for k := range d.storage { - if key := subPathMatcher.FindString(k); key != "" { - keySet[key] = struct{}{} + entries, err := found.(*dir).list(normalized) + + if err != nil { + switch err { + case errNotExists: + return nil, storagedriver.PathNotFoundError{Path: path} + case errIsNotDir: + return nil, fmt.Errorf("not a directory") + default: + return nil, err } } - keys := make([]string, 0, len(keySet)) - for k := range keySet { - keys = append(keys, k) - } - return keys, nil + return entries, nil } // Move moves an object stored at sourcePath to destPath, removing the original @@ -151,32 +187,37 @@ func (d *Driver) List(path string) ([]string, error) { func (d *Driver) Move(sourcePath string, destPath string) error { d.mutex.Lock() defer d.mutex.Unlock() - contents, ok := d.storage[sourcePath] - if !ok { - return storagedriver.PathNotFoundError{Path: sourcePath} + + normalizedSrc, normalizedDst := d.normalize(sourcePath), d.normalize(destPath) + + err := d.root.move(normalizedSrc, normalizedDst) + switch err { + case errNotExists: + return storagedriver.PathNotFoundError{Path: destPath} + default: + return err } - d.storage[destPath] = contents - delete(d.storage, sourcePath) - return nil } // Delete recursively deletes all objects stored at "path" and its subpaths. func (d *Driver) Delete(path string) error { d.mutex.Lock() defer d.mutex.Unlock() - var subPaths []string - for k := range d.storage { - if strings.HasPrefix(k, path) { - subPaths = append(subPaths, k) - } - } - if len(subPaths) == 0 { + normalized := d.normalize(path) + + err := d.root.delete(normalized) + switch err { + case errNotExists: return storagedriver.PathNotFoundError{Path: path} + default: + return err } - - for _, subPath := range subPaths { - delete(d.storage, subPath) - } - return nil +} + +func (d *Driver) normalize(p string) string { + if !strings.HasPrefix(p, "/") { + p = "/" + p // Ghetto path absolution. + } + return p } diff --git a/storagedriver/inmemory/driver_test.go b/storagedriver/inmemory/driver_test.go index 87549542..6a4b3697 100644 --- a/storagedriver/inmemory/driver_test.go +++ b/storagedriver/inmemory/driver_test.go @@ -17,5 +17,8 @@ func init() { return New(), nil } testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip) - testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip) + + // BUG(stevvooe): Disable flaky IPC tests for now when we can troubleshoot + // the problems with libchan. + // testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip) } diff --git a/storagedriver/inmemory/mfs.go b/storagedriver/inmemory/mfs.go new file mode 100644 index 00000000..5248bbc6 --- /dev/null +++ b/storagedriver/inmemory/mfs.go @@ -0,0 +1,329 @@ +package inmemory + +import ( + "fmt" + "io" + "path" + "sort" + "strings" + "time" +) + +var ( + errExists = fmt.Errorf("exists") + errNotExists = fmt.Errorf("exists") + errIsNotDir = fmt.Errorf("notdir") + errIsDir = fmt.Errorf("isdir") +) + +type node interface { + name() string + path() string + isdir() bool + modtime() time.Time +} + +// dir is the central type for the memory-based storagedriver. All operations +// are dispatched from a root dir. +type dir struct { + common + + // TODO(stevvooe): Use sorted slice + search. + children map[string]node +} + +var _ node = &dir{} + +func (d *dir) isdir() bool { + return true +} + +// add places the node n into dir d. +func (d *dir) add(n node) { + if d.children == nil { + d.children = make(map[string]node) + } + + d.children[n.name()] = n + d.mod = time.Now() +} + +// find searches for the node, given path q in dir. If the node is found, it +// will be returned. If the node is not found, the closet existing parent. If +// the node is found, the returned (node).path() will match q. +func (d *dir) find(q string) node { + q = strings.Trim(q, "/") + i := strings.Index(q, "/") + + if q == "" { + return d + } + + if i == 0 { + panic("shouldn't happen, no root paths") + } + + var component string + if i < 0 { + // No more path components + component = q + } else { + component = q[:i] + } + + child, ok := d.children[component] + if !ok { + // Node was not found. Return p and the current node. + return d + } + + if child.isdir() { + // traverse down! + q = q[i+1:] + return child.(*dir).find(q) + } + + return child +} + +func (d *dir) list(p string) ([]string, error) { + n := d.find(p) + + if n.path() != p { + return nil, errNotExists + } + + if !n.isdir() { + return nil, errIsNotDir + } + + var children []string + for _, child := range n.(*dir).children { + children = append(children, child.path()) + } + + sort.Strings(children) + return children, nil +} + +// mkfile or return the existing one. returns an error if it exists and is a +// directory. Essentially, this is open or create. +func (d *dir) mkfile(p string) (*file, error) { + n := d.find(p) + if n.path() == p { + if n.isdir() { + return nil, errIsDir + } + + return n.(*file), nil + } + + dirpath, filename := path.Split(p) + // Make any non-existent directories + n, err := d.mkdirs(dirpath) + if err != nil { + return nil, err + } + + dd := n.(*dir) + n = &file{ + common: common{ + p: path.Join(dd.path(), filename), + mod: time.Now(), + }, + } + + dd.add(n) + return n.(*file), nil +} + +// mkdirs creates any missing directory entries in p and returns the result. +func (d *dir) mkdirs(p string) (*dir, error) { + if p == "" { + p = "/" + } + + n := d.find(p) + + if !n.isdir() { + // Found something there + return nil, errIsNotDir + } + + if n.path() == p { + return n.(*dir), nil + } + + dd := n.(*dir) + + relative := strings.Trim(strings.TrimPrefix(p, n.path()), "/") + + if relative == "" { + return dd, nil + } + + components := strings.Split(relative, "/") + for _, component := range components { + d, err := dd.mkdir(component) + + if err != nil { + // This should actually never happen, since there are no children. + return nil, err + } + dd = d + } + + return dd, nil +} + +// mkdir creates a child directory under d with the given name. +func (d *dir) mkdir(name string) (*dir, error) { + if name == "" { + return nil, fmt.Errorf("invalid dirname") + } + + _, ok := d.children[name] + if ok { + return nil, errExists + } + + child := &dir{ + common: common{ + p: path.Join(d.path(), name), + mod: time.Now(), + }, + } + d.add(child) + d.mod = time.Now() + + return child, nil +} + +func (d *dir) move(src, dst string) error { + dstDirname, _ := path.Split(dst) + + dp, err := d.mkdirs(dstDirname) + if err != nil { + return err + } + + srcDirname, srcFilename := path.Split(src) + sp := d.find(srcDirname) + + if sp.path() != srcDirname { + return errNotExists + } + + s, ok := sp.(*dir).children[srcFilename] + if !ok { + return errNotExists + } + + delete(sp.(*dir).children, srcFilename) + + switch n := s.(type) { + case *dir: + n.p = dst + case *file: + n.p = dst + } + + dp.add(s) + + return nil +} + +func (d *dir) delete(p string) error { + dirname, filename := path.Split(p) + parent := d.find(dirname) + + if dirname != parent.path() { + return errNotExists + } + + if _, ok := parent.(*dir).children[filename]; !ok { + return errNotExists + } + + delete(parent.(*dir).children, filename) + return nil +} + +// dump outputs a primitive directory structure to stdout. +func (d *dir) dump(indent string) { + fmt.Println(indent, d.name()+"/") + + for _, child := range d.children { + if child.isdir() { + child.(*dir).dump(indent + "\t") + } else { + fmt.Println(indent, child.name()) + } + + } +} + +func (d *dir) String() string { + return fmt.Sprintf("&dir{path: %v, children: %v}", d.p, d.children) +} + +// file stores actual data in the fs tree. It acts like an open, seekable file +// where operations are conducted through ReadAt and WriteAt. Use it with +// SectionReader for the best effect. +type file struct { + common + data []byte +} + +var _ node = &file{} + +func (f *file) isdir() bool { + return false +} + +func (f *file) truncate() { + f.data = f.data[:0] +} + +func (f *file) sectionReader(offset int64) io.Reader { + return io.NewSectionReader(f, offset, int64(len(f.data))-offset) +} + +func (f *file) ReadAt(p []byte, offset int64) (n int, err error) { + return copy(p, f.data[offset:]), nil +} + +func (f *file) WriteAt(p []byte, offset int64) (n int, err error) { + if len(f.data) > 0 && offset >= int64(len(f.data)) { + // Extend missing region with a zero pad, while also preallocating out to size of p. + pad := offset - int64(len(f.data)) + size := len(p) + int(pad) + f.data = append(f.data, make([]byte, pad, size)...) + } + + f.data = append(f.data, p...) + return len(p), nil +} + +func (f *file) String() string { + return fmt.Sprintf("&file{path: %q}", f.p) +} + +// common provides shared fields and methods for node implementations. +type common struct { + p string + mod time.Time +} + +func (c *common) name() string { + _, name := path.Split(c.p) + return name +} + +func (c *common) path() string { + return c.p +} + +func (c *common) modtime() time.Time { + return c.mod +} From 70ab06b8640416021696d39990fc583098c70610 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 4 Dec 2014 20:55:59 -0800 Subject: [PATCH 08/15] Update storage package to use StorageDriver.Stat This change updates the backend storage package that consumes StorageDriver to use the new Stat call, over CurrentSize. It also makes minor updates for using WriteStream and ReadStream. --- storage/filereader.go | 21 ++++++++++++++------- storage/layerreader.go | 7 +++---- storage/layerstore.go | 7 ------- storage/layerupload.go | 27 ++++++++++++++------------- storage/manifeststore.go | 8 ++++++-- 5 files changed, 37 insertions(+), 33 deletions(-) diff --git a/storage/filereader.go b/storage/filereader.go index 8f1f5205..bcc2614e 100644 --- a/storage/filereader.go +++ b/storage/filereader.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "time" "github.com/docker/docker-registry/storagedriver" ) @@ -16,8 +17,9 @@ type fileReader struct { driver storagedriver.StorageDriver // identifying fields - path string - size int64 // size is the total layer size, must be set. + path string + size int64 // size is the total layer size, must be set. + modtime time.Time // mutable fields rc io.ReadCloser // remote read closer @@ -28,16 +30,21 @@ type fileReader struct { func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) { // Grab the size of the layer file, ensuring existence. - size, err := driver.CurrentSize(path) + fi, err := driver.Stat(path) if err != nil { return nil, err } + if fi.IsDir() { + return nil, fmt.Errorf("cannot read a directory") + } + return &fileReader{ - driver: driver, - path: path, - size: int64(size), + driver: driver, + path: path, + size: fi.Size(), + modtime: fi.ModTime(), }, nil } @@ -126,7 +133,7 @@ func (fr *fileReader) reader() (io.Reader, error) { } // If we don't have a reader, open one up. - rc, err := fr.driver.ReadStream(fr.path, uint64(fr.offset)) + rc, err := fr.driver.ReadStream(fr.path, fr.offset) if err != nil { return nil, err diff --git a/storage/layerreader.go b/storage/layerreader.go index 2cc184fd..fa2275d9 100644 --- a/storage/layerreader.go +++ b/storage/layerreader.go @@ -11,9 +11,8 @@ import ( type layerReader struct { fileReader - name string // repo name of this layer - digest digest.Digest - createdAt time.Time + name string // repo name of this layer + digest digest.Digest } var _ Layer = &layerReader{} @@ -27,5 +26,5 @@ func (lrs *layerReader) Digest() digest.Digest { } func (lrs *layerReader) CreatedAt() time.Time { - return lrs.createdAt + return lrs.modtime } diff --git a/storage/layerstore.go b/storage/layerstore.go index d731a5b8..ddebdbcc 100644 --- a/storage/layerstore.go +++ b/storage/layerstore.go @@ -1,8 +1,6 @@ package storage import ( - "time" - "github.com/docker/docker-registry/digest" "github.com/docker/docker-registry/storagedriver" ) @@ -55,11 +53,6 @@ func (ls *layerStore) Fetch(name string, digest digest.Digest) (Layer, error) { fileReader: *fr, name: name, digest: digest, - - // TODO(stevvooe): Storage backend does not support modification time - // queries yet. Layers "never" change, so just return the zero value - // plus a nano-second. - createdAt: (time.Time{}).Add(time.Nanosecond), }, nil } diff --git a/storage/layerupload.go b/storage/layerupload.go index de1a894b..3ee593b9 100644 --- a/storage/layerupload.go +++ b/storage/layerupload.go @@ -107,9 +107,13 @@ func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Laye return nil, err } - if err := luc.writeLayer(fp, size, digest); err != nil { + if nn, err := luc.writeLayer(fp, digest); err != nil { // Cleanup? return nil, err + } else if nn != size { + // TODO(stevvooe): Short write. Will have to delete the location and + // report an error. This error needs to be reported to the client. + return nil, fmt.Errorf("short write writing layer") } // Yes! We have written some layer data. Let's make it visible. Link the @@ -281,19 +285,20 @@ func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst d return dgst, nil } -// writeLayer actually writes the the layer file into its final destination. -// The layer should be validated before commencing the write. -func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst digest.Digest) error { +// writeLayer actually writes the the layer file into its final destination, +// identified by dgst. The layer should be validated before commencing the +// write. +func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) { blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{ digest: dgst, }) if err != nil { - return err + return 0, err } // Check for existence - if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil { + if _, err := luc.layerStore.driver.Stat(blobPath); err != nil { // TODO(stevvooe): This check is kind of problematic and very racy. switch err := err.(type) { case storagedriver.PathNotFoundError: @@ -303,22 +308,18 @@ func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst dige // content addressable and we should just use this to ensure we // have it written. Although, we do need to verify that the // content that is there is the correct length. - return err + return 0, err } } // Seek our local layer file back now. if _, err := fp.Seek(0, os.SEEK_SET); err != nil { // Cleanup? - return err + return 0, err } // Okay: we can write the file to the blob store. - if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil { - return err - } - - return nil + return luc.layerStore.driver.WriteStream(blobPath, 0, fp) } // linkLayer links a valid, written layer blob into the registry under the diff --git a/storage/manifeststore.go b/storage/manifeststore.go index e1760dd8..a6bdf3b3 100644 --- a/storage/manifeststore.go +++ b/storage/manifeststore.go @@ -22,12 +22,16 @@ func (ms *manifestStore) Exists(name, tag string) (bool, error) { return false, err } - size, err := ms.driver.CurrentSize(p) + fi, err := ms.driver.Stat(p) if err != nil { return false, err } - if size == 0 { + if fi.IsDir() { + return false, fmt.Errorf("unexpected directory at path: %v, name=%s tag=%s", p, name, tag) + } + + if fi.Size() == 0 { return false, nil } From d703a86a642d605d516ffe635cd0888c80dd1ce0 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 5 Dec 2014 11:46:41 -0800 Subject: [PATCH 09/15] Add checks for ReadStream offset boundary conditions Several checks for ReadStream with offset around boundary conditions were missing. The new checks ensure negative offsets are detected and io.EOF is returned properly when trying to read past the end of a file. The filesystem and inmemory driver have been updated accordingly. An outline of missing checks for List are also part of this commit. Action will be taken here based on discussion in issue #819. --- storagedriver/filesystem/driver.go | 4 +++ storagedriver/inmemory/driver.go | 4 +++ storagedriver/testsuites/testsuites.go | 43 ++++++++++++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/storagedriver/filesystem/driver.go b/storagedriver/filesystem/driver.go index 6fb56891..05ec6175 100644 --- a/storagedriver/filesystem/driver.go +++ b/storagedriver/filesystem/driver.go @@ -82,6 +82,10 @@ func (d *Driver) PutContent(subPath string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { + if offset < 0 { + return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } + file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644) if err != nil { if os.IsNotExist(err) { diff --git a/storagedriver/inmemory/driver.go b/storagedriver/inmemory/driver.go index b6bdc258..0b68e021 100644 --- a/storagedriver/inmemory/driver.go +++ b/storagedriver/inmemory/driver.go @@ -83,6 +83,10 @@ func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { d.mutex.RLock() defer d.mutex.RUnlock() + if offset < 0 { + return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } + path = d.normalize(path) found := d.root.find(path) diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 6a51cd19..0967e2db 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -2,6 +2,7 @@ package testsuites import ( "bytes" + "io" "io/ioutil" "math/rand" "os" @@ -209,6 +210,43 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { readContents, err = ioutil.ReadAll(reader) c.Assert(err, check.IsNil) c.Assert(readContents, check.DeepEquals, contentsChunk3) + + // Ensure we get invalid offest for negative offsets. + reader, err = suite.StorageDriver.ReadStream(filename, -1) + c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{}) + c.Assert(err.(storagedriver.InvalidOffsetError).Offset, check.Equals, int64(-1)) + c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename) + c.Assert(reader, check.IsNil) + + // Read past the end of the content and make sure we get a reader that + // returns 0 bytes and io.EOF + reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3) + c.Assert(err, check.IsNil) + defer reader.Close() + + buf := make([]byte, chunkSize) + n, err := reader.Read(buf) + c.Assert(err, check.Equals, io.EOF) + c.Assert(n, check.Equals, 0) + + // Check the N-1 boundary condition, ensuring we get 1 byte then io.EOF. + reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3-1) + c.Assert(err, check.IsNil) + defer reader.Close() + + n, err = reader.Read(buf) + c.Assert(n, check.Equals, 1) + + // We don't care whether the io.EOF comes on the this read or the first + // zero read, but the only error acceptable here is io.EOF. + if err != nil { + c.Assert(err, check.Equals, io.EOF) + } + + // Any more reads should result in zero bytes and io.EOF + n, err = reader.Read(buf) + c.Assert(n, check.Equals, 0) + c.Assert(err, check.Equals, io.EOF) } // TestContinueStreamAppend tests that a stream write can be appended to without @@ -329,6 +367,11 @@ func (suite *DriverSuite) TestList(c *check.C) { sort.Strings(keys) c.Assert(keys, check.DeepEquals, childFiles) + + // A few checks to add here (check out #819 for more discussion on this): + // 1. Ensure that all paths are absolute. + // 2. Ensure that listings only include direct children. + // 3. Ensure that we only respond to directory listings that end with a slash (maybe?). } // TestMove checks that a moved object no longer exists at the source path and From 8cb0e3398c35d147ac73f863fb2dbd96159999d6 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 5 Dec 2014 14:05:37 -0800 Subject: [PATCH 10/15] Disable s3, azure and ipc packages and testing The packages causing build errors are being disabled for now to let us split up the work in the different driver implementations without blocking integration into the main branch. The s3 and azure implementations need some effort to add Stat support. The ipc package needs that work plus some care around hanging send calls. --- storagedriver/azure/azure.go | 2 + storagedriver/azure/azure_test.go | 2 + storagedriver/factory/factory.go | 26 +++++++----- storagedriver/ipc/client.go | 2 + storagedriver/ipc/ipc.go | 2 + storagedriver/ipc/server.go | 2 + storagedriver/s3/s3.go | 2 + storagedriver/s3/s3_test.go | 2 + storagedriver/testsuites/testsuites.go | 56 ++++++++++++++------------ 9 files changed, 60 insertions(+), 36 deletions(-) diff --git a/storagedriver/azure/azure.go b/storagedriver/azure/azure.go index 489a6348..64402f3b 100644 --- a/storagedriver/azure/azure.go +++ b/storagedriver/azure/azure.go @@ -1,3 +1,5 @@ +// +build ignore + // Package azure provides a storagedriver.StorageDriver implementation to // store blobs in Microsoft Azure Blob Storage Service. package azure diff --git a/storagedriver/azure/azure_test.go b/storagedriver/azure/azure_test.go index 888d1165..1edcc1ea 100644 --- a/storagedriver/azure/azure_test.go +++ b/storagedriver/azure/azure_test.go @@ -1,3 +1,5 @@ +// +build ignore + package azure import ( diff --git a/storagedriver/factory/factory.go b/storagedriver/factory/factory.go index 0b85f372..0f8ca001 100644 --- a/storagedriver/factory/factory.go +++ b/storagedriver/factory/factory.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/docker/docker-registry/storagedriver" - "github.com/docker/docker-registry/storagedriver/ipc" ) // driverFactories stores an internal mapping between storage driver names and their respective @@ -41,16 +40,23 @@ func Register(name string, factory StorageDriverFactory) { func Create(name string, parameters map[string]string) (storagedriver.StorageDriver, error) { driverFactory, ok := driverFactories[name] if !ok { + return nil, InvalidStorageDriverError{name} + + // NOTE(stevvooe): We are disabling storagedriver ipc for now, as the + // server and client need to be updated for the changed API calls and + // there were some problems libchan hanging. We'll phase this + // functionality back in over the next few weeks. + // No registered StorageDriverFactory found, try ipc - driverClient, err := ipc.NewDriverClient(name, parameters) - if err != nil { - return nil, InvalidStorageDriverError{name} - } - err = driverClient.Start() - if err != nil { - return nil, err - } - return driverClient, nil + // driverClient, err := ipc.NewDriverClient(name, parameters) + // if err != nil { + // return nil, InvalidStorageDriverError{name} + // } + // err = driverClient.Start() + // if err != nil { + // return nil, err + // } + // return driverClient, nil } return driverFactory.Create(parameters) } diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index 7e52a084..2dc5c44e 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -1,3 +1,5 @@ +// +build ignore + package ipc import ( diff --git a/storagedriver/ipc/ipc.go b/storagedriver/ipc/ipc.go index 82bdcbd7..45c54659 100644 --- a/storagedriver/ipc/ipc.go +++ b/storagedriver/ipc/ipc.go @@ -1,3 +1,5 @@ +// +build ignore + package ipc import ( diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go index 1c0084f9..fa0077a8 100644 --- a/storagedriver/ipc/server.go +++ b/storagedriver/ipc/server.go @@ -1,3 +1,5 @@ +// +build ignore + package ipc import ( diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index 3d5cd511..e26d3be2 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -1,3 +1,5 @@ +// +build ignore + package s3 import ( diff --git a/storagedriver/s3/s3_test.go b/storagedriver/s3/s3_test.go index 6d7b3ff7..f7b4f80e 100644 --- a/storagedriver/s3/s3_test.go +++ b/storagedriver/s3/s3_test.go @@ -1,3 +1,5 @@ +// +build ignore + package s3 import ( diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 0967e2db..0e4f5be1 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -13,7 +13,6 @@ import ( "time" "github.com/docker/docker-registry/storagedriver" - "github.com/docker/docker-registry/storagedriver/ipc" "gopkg.in/check.v1" ) @@ -33,29 +32,34 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC // RegisterIPCSuite registers a storage driver test suite which runs the named // driver as a child process with the given parameters. func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) { - suite := &DriverSuite{ - Constructor: func() (storagedriver.StorageDriver, error) { - d, err := ipc.NewDriverClient(driverName, ipcParams) - if err != nil { - return nil, err - } - err = d.Start() - if err != nil { - return nil, err - } - return d, nil - }, - SkipCheck: skipCheck, - } - suite.Teardown = func() error { - if suite.StorageDriver == nil { - return nil - } + panic("ipc testing is disabled for now") - driverClient := suite.StorageDriver.(*ipc.StorageDriverClient) - return driverClient.Stop() - } - check.Suite(suite) + // NOTE(stevvooe): IPC testing is disabled for now. Uncomment the code + // block before and remove the panic when we phase it back in. + + // suite := &DriverSuite{ + // Constructor: func() (storagedriver.StorageDriver, error) { + // d, err := ipc.NewDriverClient(driverName, ipcParams) + // if err != nil { + // return nil, err + // } + // err = d.Start() + // if err != nil { + // return nil, err + // } + // return d, nil + // }, + // SkipCheck: skipCheck, + // } + // suite.Teardown = func() error { + // if suite.StorageDriver == nil { + // return nil + // } + + // driverClient := suite.StorageDriver.(*ipc.StorageDriverClient) + // return driverClient.Stop() + // } + // check.Suite(suite) } // SkipCheck is a function used to determine if a test suite should be skipped. @@ -520,9 +524,9 @@ func (suite *DriverSuite) TestStatCall(c *check.C) { // in to WriteStream concurrently without hanging. // TODO(bbland): fix this test... func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { - if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC { - c.Skip("Need to fix out-of-process concurrency") - } + // if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC { + // c.Skip("Need to fix out-of-process concurrency") + // } var wg sync.WaitGroup From 2f78886aac3e93a6395ce1d70b45cc972d94ae02 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 5 Dec 2014 14:34:22 -0800 Subject: [PATCH 11/15] Disable s3 and ipc executable entry points --- cmd/registry-storagedriver-azure/main.go | 2 ++ cmd/registry-storagedriver-filesystem/main.go | 2 ++ cmd/registry-storagedriver-inmemory/main.go | 2 ++ cmd/registry-storagedriver-s3/main.go | 2 ++ cmd/registry/main.go | 1 - 5 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cmd/registry-storagedriver-azure/main.go b/cmd/registry-storagedriver-azure/main.go index b9944342..584699bf 100644 --- a/cmd/registry-storagedriver-azure/main.go +++ b/cmd/registry-storagedriver-azure/main.go @@ -1,3 +1,5 @@ +// +build ignore + package main import ( diff --git a/cmd/registry-storagedriver-filesystem/main.go b/cmd/registry-storagedriver-filesystem/main.go index 5ea1eb70..0e555b61 100644 --- a/cmd/registry-storagedriver-filesystem/main.go +++ b/cmd/registry-storagedriver-filesystem/main.go @@ -1,3 +1,5 @@ +// +build ignore + package main import ( diff --git a/cmd/registry-storagedriver-inmemory/main.go b/cmd/registry-storagedriver-inmemory/main.go index 77b1c530..b75d3694 100644 --- a/cmd/registry-storagedriver-inmemory/main.go +++ b/cmd/registry-storagedriver-inmemory/main.go @@ -1,3 +1,5 @@ +// +build ignore + package main import ( diff --git a/cmd/registry-storagedriver-s3/main.go b/cmd/registry-storagedriver-s3/main.go index 21192a0f..e2234b7b 100644 --- a/cmd/registry-storagedriver-s3/main.go +++ b/cmd/registry-storagedriver-s3/main.go @@ -1,3 +1,5 @@ +// +build ignore + package main import ( diff --git a/cmd/registry/main.go b/cmd/registry/main.go index 150c7d6b..29fa24c1 100644 --- a/cmd/registry/main.go +++ b/cmd/registry/main.go @@ -15,7 +15,6 @@ import ( "github.com/docker/docker-registry/configuration" _ "github.com/docker/docker-registry/storagedriver/filesystem" _ "github.com/docker/docker-registry/storagedriver/inmemory" - _ "github.com/docker/docker-registry/storagedriver/s3" ) func main() { From 1a75fccb43c9bf3cbe2c14a807c20727a988de96 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 5 Dec 2014 14:34:54 -0800 Subject: [PATCH 12/15] Address PathNotFoundError in (*manifestStore).Exists Exists was returning an error when encountering a PathNotFoundError when it should just return false without an error. --- storage/manifeststore.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/storage/manifeststore.go b/storage/manifeststore.go index a6bdf3b3..ebbc6b3c 100644 --- a/storage/manifeststore.go +++ b/storage/manifeststore.go @@ -24,7 +24,12 @@ func (ms *manifestStore) Exists(name, tag string) (bool, error) { fi, err := ms.driver.Stat(p) if err != nil { - return false, err + switch err.(type) { + case storagedriver.PathNotFoundError: + return false, nil + default: + return false, err + } } if fi.IsDir() { From 14e7adb3a041159a4465aeb939b5454795986cd5 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 5 Dec 2014 19:20:42 -0800 Subject: [PATCH 13/15] Add documentation for (*DriverSuite).TestStatCall --- storagedriver/testsuites/testsuites.go | 1 + 1 file changed, 1 insertion(+) diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 0e4f5be1..f745781e 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -468,6 +468,7 @@ func (suite *DriverSuite) TestDeleteFolder(c *check.C) { c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } +// TestStatCall runs verifies the implementation of the storagedriver's Stat call. func (suite *DriverSuite) TestStatCall(c *check.C) { content := randomString(4096) dirPath := randomString(32) From e15e07cb40d6fad71446930190c0b2ad99110a24 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 5 Dec 2014 20:20:01 -0800 Subject: [PATCH 14/15] Disable race detector during tests due to memory usage The tests are using way too much memory with the race detector enabled causing the build machines to fall over. Cursory profiling shows no leaks but it may need a closer look. For now, it will be disabled but this cannot be permanent. --- circle.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/circle.yml b/circle.yml index 96fa911a..16a6c817 100644 --- a/circle.yml +++ b/circle.yml @@ -21,8 +21,11 @@ test: - test -z $(gofmt -s -l . | tee /dev/stderr) - go vet ./... - test -z $(golint ./... | tee /dev/stderr) - - go test -race -test.v ./...: - timeout: 600 + - go test -test.v ./... + + # Disabling the race detector due to massive memory usage. + # - go test -race -test.v ./...: + # timeout: 600 # TODO(stevvooe): The following is an attempt at using goveralls but it # just doesn't work. goveralls requires a single profile file to be From e364e71aab30910734835ce3c81cb382ad3c6c69 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Mon, 8 Dec 2014 09:51:59 -0800 Subject: [PATCH 15/15] Address go vet declaration issue in tests --- client/client_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 57578c81..d4a335ec 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -91,7 +91,7 @@ func TestPush(t *testing.T) { } handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{ - testutil.RequestResponseMapping{ + { Request: testutil.Request{ Method: "PUT", Route: "/v2/" + name + "/manifest/" + tag, @@ -184,7 +184,7 @@ func TestPull(t *testing.T) { } handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{ - testutil.RequestResponseMapping{ + { Request: testutil.Request{ Method: "GET", Route: "/v2/" + name + "/manifest/" + tag, @@ -307,7 +307,7 @@ func TestPullResume(t *testing.T) { for i := 0; i < 3; i++ { layerRequestResponseMappings = append(layerRequestResponseMappings, testutil.RequestResponseMap{ - testutil.RequestResponseMapping{ + { Request: testutil.Request{ Method: "GET", Route: "/v2/" + name + "/manifest/" + tag,