From 66107df1afd3d1782ab9a73eccd7b72e4e1bcaaf Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 2 Dec 2014 19:01:00 -0800 Subject: [PATCH] Use int64 for ReadStream and WriteStream offsets This change brings the storagedriver API in line with the Go standard library's use of int64 for offsets. The main benefit is simplicity in interfacing with the io library reducing the number of type conversions in simple code. --- storagedriver/azure/azure.go | 12 +++++------ storagedriver/filesystem/driver.go | 10 +++++---- storagedriver/inmemory/driver.go | 6 +++--- storagedriver/ipc/client.go | 4 ++-- storagedriver/ipc/server.go | 6 +++--- storagedriver/s3/s3.go | 18 ++++++++-------- storagedriver/storagedriver.go | 6 +++--- storagedriver/testsuites/testsuites.go | 30 +++++++++++++------------- 8 files changed, 47 insertions(+), 45 deletions(-) diff --git a/storagedriver/azure/azure.go b/storagedriver/azure/azure.go index ba7168410..489a63486 100644 --- a/storagedriver/azure/azure.go +++ b/storagedriver/azure/azure.go @@ -103,7 +103,7 @@ func (d *Driver) PutContent(path string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { if ok, err := d.client.BlobExists(d.container, path); err != nil { return nil, err } else if !ok { @@ -115,7 +115,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { return nil, err } - if offset >= size { + if offset >= int64(size) { return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} } @@ -129,10 +129,10 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { var ( lastBlockNum int - resumableOffset uint64 + resumableOffset int64 blocks []azure.Block ) @@ -153,12 +153,12 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo return fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error()) } - var totalSize uint64 + var totalSize int64 for _, v := range parts.CommittedBlocks { blocks = append(blocks, azure.Block{ Id: v.Name, Status: azure.BlockStatusCommitted}) - totalSize += uint64(v.Size) + totalSize += int64(v.Size) } // NOTE: Azure driver currently supports only append mode (resumable diff --git a/storagedriver/filesystem/driver.go b/storagedriver/filesystem/driver.go index a4b2e6885..3fbfcdf66 100644 --- a/storagedriver/filesystem/driver.go +++ b/storagedriver/filesystem/driver.go @@ -80,7 +80,7 @@ func (d *Driver) PutContent(subPath string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { file, err := os.OpenFile(d.subPath(path), os.O_RDONLY, 0644) if err != nil { return nil, storagedriver.PathNotFoundError{Path: path} @@ -100,7 +100,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(subPath string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(subPath string, offset, size int64, reader io.ReadCloser) error { defer reader.Close() resumableOffset, err := d.CurrentSize(subPath) @@ -108,7 +108,7 @@ func (d *Driver) WriteStream(subPath string, offset, size uint64, reader io.Read return err } - if offset > resumableOffset { + if offset > int64(resumableOffset) { return storagedriver.InvalidOffsetError{Path: subPath, Offset: offset} } @@ -131,13 +131,15 @@ func (d *Driver) WriteStream(subPath string, offset, size uint64, reader io.Read } defer file.Close() + // TODO(sday): Use Seek + Copy here. + buf := make([]byte, 32*1024) for { bytesRead, er := reader.Read(buf) if bytesRead > 0 { bytesWritten, ew := file.WriteAt(buf[0:bytesRead], int64(offset)) if bytesWritten > 0 { - offset += uint64(bytesWritten) + offset += int64(bytesWritten) } if ew != nil { err = ew diff --git a/storagedriver/inmemory/driver.go b/storagedriver/inmemory/driver.go index 98b068e92..3231b017d 100644 --- a/storagedriver/inmemory/driver.go +++ b/storagedriver/inmemory/driver.go @@ -61,7 +61,7 @@ func (d *Driver) PutContent(path string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { d.mutex.RLock() defer d.mutex.RUnlock() contents, err := d.GetContent(path) @@ -79,7 +79,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { defer reader.Close() d.mutex.RLock() defer d.mutex.RUnlock() @@ -89,7 +89,7 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo return err } - if offset > resumableOffset { + if offset > int64(resumableOffset) { return storagedriver.InvalidOffsetError{Path: path, Offset: offset} } diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index c77797ebd..7e52a0841 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -234,7 +234,7 @@ func (driver *StorageDriverClient) PutContent(path string, contents []byte) erro // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (driver *StorageDriverClient) ReadStream(path string, offset int64) (io.ReadCloser, error) { if err := driver.exited(); err != nil { return nil, err } @@ -261,7 +261,7 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (driver *StorageDriverClient) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { if err := driver.exited(); err != nil { return err } diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go index 7d1876ca7..1c0084f9b 100644 --- a/storagedriver/ipc/server.go +++ b/storagedriver/ipc/server.go @@ -100,7 +100,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { case "ReadStream": path, _ := request.Parameters["Path"].(string) // Depending on serialization method, Offset may be convereted to any int/uint type - offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint() + offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() reader, err := driver.ReadStream(path, offset) var response ReadStreamResponse if err != nil { @@ -115,9 +115,9 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { case "WriteStream": path, _ := request.Parameters["Path"].(string) // Depending on serialization method, Offset may be convereted to any int/uint type - offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint() + offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() // Depending on serialization method, Size may be convereted to any int/uint type - size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(uint64(0))).Uint() + size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int() reader, _ := request.Parameters["Reader"].(io.ReadCloser) err := driver.WriteStream(path, offset, size, reader) response := WriteStreamResponse{ diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index def03e3e2..3d5cd511b 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -17,7 +17,7 @@ const driverName = "s3" // minChunkSize defines the minimum multipart upload chunk size // S3 API requires multipart upload chunks to be at least 5MB -const minChunkSize = uint64(5 * 1024 * 1024) +const minChunkSize = 5 * 1024 * 1024 // listPartsMax is the largest amount of parts you can request from S3 const listPartsMax = 1000 @@ -120,9 +120,9 @@ func (d *Driver) PutContent(path string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { headers := make(http.Header) - headers.Add("Range", "bytes="+strconv.FormatUint(offset, 10)+"-") + headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-") resp, err := d.Bucket.GetResponseWithHeaders(path, headers) if err != nil { @@ -133,22 +133,22 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { defer reader.Close() - chunkSize := minChunkSize + chunkSize := int64(minChunkSize) for size/chunkSize >= listPartsMax { chunkSize *= 2 } partNumber := 1 - totalRead := uint64(0) + var totalRead int64 multi, parts, err := d.getAllParts(path) if err != nil { return err } - if (offset) > uint64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) { + if (offset) > int64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) { return storagedriver.InvalidOffsetError{Path: path, Offset: offset} } @@ -161,11 +161,11 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo buf := make([]byte, chunkSize) for { bytesRead, err := io.ReadFull(reader, buf) - totalRead += uint64(bytesRead) + totalRead += int64(bytesRead) if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { return err - } else if (uint64(bytesRead) < chunkSize) && totalRead != size { + } else if (int64(bytesRead) < chunkSize) && totalRead != size { break } else { part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead])) diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go index 1b6c5c00d..d257e4b26 100644 --- a/storagedriver/storagedriver.go +++ b/storagedriver/storagedriver.go @@ -44,7 +44,7 @@ type StorageDriver interface { // ReadStream retrieves an io.ReadCloser for the content stored at "path" // with a given byte offset. // May be used to resume reading a stream by providing a nonzero offset. - ReadStream(path string, offset uint64) (io.ReadCloser, error) + ReadStream(path string, offset int64) (io.ReadCloser, error) // WriteStream stores the contents of the provided io.ReadCloser at a // location designated by the given path. @@ -52,7 +52,7 @@ type StorageDriver interface { // "size" bytes. // May be used to resume writing a stream by providing a nonzero offset. // The offset must be no larger than the CurrentSize for this path. - WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error + WriteStream(path string, offset, size int64, readCloser io.ReadCloser) error // CurrentSize retrieves the curernt size in bytes of the object at the // given path. @@ -86,7 +86,7 @@ func (err PathNotFoundError) Error() string { // invalid offset. type InvalidOffsetError struct { Path string - Offset uint64 + Offset int64 } func (err InvalidOffsetError) Error() string { diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 617566678..c2604f4fa 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -173,7 +173,7 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { filename := randomString(32) defer suite.StorageDriver.Delete(filename) - chunkSize := uint64(10 * 1024 * 1024) + chunkSize := int64(10 * 1024 * 1024) contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize)) @@ -186,19 +186,19 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { offset, err := suite.StorageDriver.CurrentSize(filename) c.Assert(err, check.IsNil) - if offset > chunkSize { + if int64(offset) > chunkSize { c.Fatalf("Offset too large, %d > %d", offset, chunkSize) } - err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize]))) + err = suite.StorageDriver.WriteStream(filename, int64(offset), 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize]))) c.Assert(err, check.IsNil) offset, err = suite.StorageDriver.CurrentSize(filename) c.Assert(err, check.IsNil) - if offset > 2*chunkSize { + if int64(offset) > 2*chunkSize { c.Fatalf("Offset too large, %d > %d", offset, 2*chunkSize) } - err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:]))) + err = suite.StorageDriver.WriteStream(filename, int64(offset), 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:]))) c.Assert(err, check.IsNil) received, err := suite.StorageDriver.GetContent(filename) @@ -212,7 +212,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { filename := randomString(32) defer suite.StorageDriver.Delete(filename) - chunkSize := uint64(32) + chunkSize := int64(32) contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize)) @@ -260,13 +260,13 @@ func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) { // TestList checks the returned list of keys after populating a directory tree. func (suite *DriverSuite) TestList(c *check.C) { - rootDirectory := "/" + randomString(uint64(8+rand.Intn(8))) + rootDirectory := "/" + randomString(int64(8+rand.Intn(8))) defer suite.StorageDriver.Delete(rootDirectory) - parentDirectory := rootDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + parentDirectory := rootDirectory + "/" + randomString(int64(8+rand.Intn(8))) childFiles := make([]string, 50) for i := 0; i < len(childFiles); i++ { - childFile := parentDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + childFile := parentDirectory + "/" + randomString(int64(8+rand.Intn(8))) childFiles[i] = childFile err := suite.StorageDriver.PutContent(childFile, []byte(randomString(32))) c.Assert(err, check.IsNil) @@ -388,7 +388,7 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { doneChan := make(chan struct{}) - testStream := func(size int) { + testStream := func(size int64) { suite.testFileStreams(c, size) doneChan <- struct{}{} } @@ -406,7 +406,7 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { } -func (suite *DriverSuite) testFileStreams(c *check.C, size int) { +func (suite *DriverSuite) testFileStreams(c *check.C, size int64) { tf, err := ioutil.TempFile("", "tf") c.Assert(err, check.IsNil) defer os.Remove(tf.Name()) @@ -414,7 +414,7 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int) { tfName := path.Base(tf.Name()) defer suite.StorageDriver.Delete(tfName) - contents := []byte(randomString(uint64(size))) + contents := []byte(randomString(size)) _, err = tf.Write(contents) c.Assert(err, check.IsNil) @@ -422,7 +422,7 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int) { tf.Sync() tf.Seek(0, os.SEEK_SET) - err = suite.StorageDriver.WriteStream(tfName, 0, uint64(size), tf) + err = suite.StorageDriver.WriteStream(tfName, 0, size, tf) c.Assert(err, check.IsNil) reader, err := suite.StorageDriver.ReadStream(tfName, 0) @@ -450,7 +450,7 @@ func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, contents []byte) { defer suite.StorageDriver.Delete(filename) - err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents))) + err := suite.StorageDriver.WriteStream(filename, 0, int64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents))) c.Assert(err, check.IsNil) reader, err := suite.StorageDriver.ReadStream(filename, 0) @@ -465,7 +465,7 @@ func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, c var pathChars = []byte("abcdefghijklmnopqrstuvwxyz") -func randomString(length uint64) string { +func randomString(length int64) string { b := make([]byte, length) for i := range b { b[i] = pathChars[rand.Intn(len(pathChars))]