diff --git a/storagedriver/azure/azure.go b/storagedriver/azure/azure.go index ba716841..489a6348 100644 --- a/storagedriver/azure/azure.go +++ b/storagedriver/azure/azure.go @@ -103,7 +103,7 @@ func (d *Driver) PutContent(path string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { if ok, err := d.client.BlobExists(d.container, path); err != nil { return nil, err } else if !ok { @@ -115,7 +115,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { return nil, err } - if offset >= size { + if offset >= int64(size) { return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} } @@ -129,10 +129,10 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { var ( lastBlockNum int - resumableOffset uint64 + resumableOffset int64 blocks []azure.Block ) @@ -153,12 +153,12 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo return fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error()) } - var totalSize uint64 + var totalSize int64 for _, v := range parts.CommittedBlocks { blocks = append(blocks, azure.Block{ Id: v.Name, Status: azure.BlockStatusCommitted}) - totalSize += uint64(v.Size) + totalSize += int64(v.Size) } // NOTE: Azure driver currently supports only append mode (resumable diff --git a/storagedriver/filesystem/driver.go b/storagedriver/filesystem/driver.go index a4b2e688..3fbfcdf6 100644 --- a/storagedriver/filesystem/driver.go +++ b/storagedriver/filesystem/driver.go @@ -80,7 +80,7 @@ func (d *Driver) PutContent(subPath string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { file, err := os.OpenFile(d.subPath(path), os.O_RDONLY, 0644) if err != nil { return nil, storagedriver.PathNotFoundError{Path: path} @@ -100,7 +100,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(subPath string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(subPath string, offset, size int64, reader io.ReadCloser) error { defer reader.Close() resumableOffset, err := d.CurrentSize(subPath) @@ -108,7 +108,7 @@ func (d *Driver) WriteStream(subPath string, offset, size uint64, reader io.Read return err } - if offset > resumableOffset { + if offset > int64(resumableOffset) { return storagedriver.InvalidOffsetError{Path: subPath, Offset: offset} } @@ -131,13 +131,15 @@ func (d *Driver) WriteStream(subPath string, offset, size uint64, reader io.Read } defer file.Close() + // TODO(sday): Use Seek + Copy here. + buf := make([]byte, 32*1024) for { bytesRead, er := reader.Read(buf) if bytesRead > 0 { bytesWritten, ew := file.WriteAt(buf[0:bytesRead], int64(offset)) if bytesWritten > 0 { - offset += uint64(bytesWritten) + offset += int64(bytesWritten) } if ew != nil { err = ew diff --git a/storagedriver/inmemory/driver.go b/storagedriver/inmemory/driver.go index 98b068e9..3231b017 100644 --- a/storagedriver/inmemory/driver.go +++ b/storagedriver/inmemory/driver.go @@ -61,7 +61,7 @@ func (d *Driver) PutContent(path string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { d.mutex.RLock() defer d.mutex.RUnlock() contents, err := d.GetContent(path) @@ -79,7 +79,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { defer reader.Close() d.mutex.RLock() defer d.mutex.RUnlock() @@ -89,7 +89,7 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo return err } - if offset > resumableOffset { + if offset > int64(resumableOffset) { return storagedriver.InvalidOffsetError{Path: path, Offset: offset} } diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index c77797eb..7e52a084 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -234,7 +234,7 @@ func (driver *StorageDriverClient) PutContent(path string, contents []byte) erro // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (driver *StorageDriverClient) ReadStream(path string, offset int64) (io.ReadCloser, error) { if err := driver.exited(); err != nil { return nil, err } @@ -261,7 +261,7 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (driver *StorageDriverClient) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { if err := driver.exited(); err != nil { return err } diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go index 7d1876ca..1c0084f9 100644 --- a/storagedriver/ipc/server.go +++ b/storagedriver/ipc/server.go @@ -100,7 +100,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { case "ReadStream": path, _ := request.Parameters["Path"].(string) // Depending on serialization method, Offset may be convereted to any int/uint type - offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint() + offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() reader, err := driver.ReadStream(path, offset) var response ReadStreamResponse if err != nil { @@ -115,9 +115,9 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { case "WriteStream": path, _ := request.Parameters["Path"].(string) // Depending on serialization method, Offset may be convereted to any int/uint type - offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint() + offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() // Depending on serialization method, Size may be convereted to any int/uint type - size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(uint64(0))).Uint() + size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int() reader, _ := request.Parameters["Reader"].(io.ReadCloser) err := driver.WriteStream(path, offset, size, reader) response := WriteStreamResponse{ diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index def03e3e..3d5cd511 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -17,7 +17,7 @@ const driverName = "s3" // minChunkSize defines the minimum multipart upload chunk size // S3 API requires multipart upload chunks to be at least 5MB -const minChunkSize = uint64(5 * 1024 * 1024) +const minChunkSize = 5 * 1024 * 1024 // listPartsMax is the largest amount of parts you can request from S3 const listPartsMax = 1000 @@ -120,9 +120,9 @@ func (d *Driver) PutContent(path string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { headers := make(http.Header) - headers.Add("Range", "bytes="+strconv.FormatUint(offset, 10)+"-") + headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-") resp, err := d.Bucket.GetResponseWithHeaders(path, headers) if err != nil { @@ -133,22 +133,22 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { defer reader.Close() - chunkSize := minChunkSize + chunkSize := int64(minChunkSize) for size/chunkSize >= listPartsMax { chunkSize *= 2 } partNumber := 1 - totalRead := uint64(0) + var totalRead int64 multi, parts, err := d.getAllParts(path) if err != nil { return err } - if (offset) > uint64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) { + if (offset) > int64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) { return storagedriver.InvalidOffsetError{Path: path, Offset: offset} } @@ -161,11 +161,11 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo buf := make([]byte, chunkSize) for { bytesRead, err := io.ReadFull(reader, buf) - totalRead += uint64(bytesRead) + totalRead += int64(bytesRead) if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { return err - } else if (uint64(bytesRead) < chunkSize) && totalRead != size { + } else if (int64(bytesRead) < chunkSize) && totalRead != size { break } else { part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead])) diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go index 1b6c5c00..d257e4b2 100644 --- a/storagedriver/storagedriver.go +++ b/storagedriver/storagedriver.go @@ -44,7 +44,7 @@ type StorageDriver interface { // ReadStream retrieves an io.ReadCloser for the content stored at "path" // with a given byte offset. // May be used to resume reading a stream by providing a nonzero offset. - ReadStream(path string, offset uint64) (io.ReadCloser, error) + ReadStream(path string, offset int64) (io.ReadCloser, error) // WriteStream stores the contents of the provided io.ReadCloser at a // location designated by the given path. @@ -52,7 +52,7 @@ type StorageDriver interface { // "size" bytes. // May be used to resume writing a stream by providing a nonzero offset. // The offset must be no larger than the CurrentSize for this path. - WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error + WriteStream(path string, offset, size int64, readCloser io.ReadCloser) error // CurrentSize retrieves the curernt size in bytes of the object at the // given path. @@ -86,7 +86,7 @@ func (err PathNotFoundError) Error() string { // invalid offset. type InvalidOffsetError struct { Path string - Offset uint64 + Offset int64 } func (err InvalidOffsetError) Error() string { diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 61756667..c2604f4f 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -173,7 +173,7 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { filename := randomString(32) defer suite.StorageDriver.Delete(filename) - chunkSize := uint64(10 * 1024 * 1024) + chunkSize := int64(10 * 1024 * 1024) contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize)) @@ -186,19 +186,19 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { offset, err := suite.StorageDriver.CurrentSize(filename) c.Assert(err, check.IsNil) - if offset > chunkSize { + if int64(offset) > chunkSize { c.Fatalf("Offset too large, %d > %d", offset, chunkSize) } - err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize]))) + err = suite.StorageDriver.WriteStream(filename, int64(offset), 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize]))) c.Assert(err, check.IsNil) offset, err = suite.StorageDriver.CurrentSize(filename) c.Assert(err, check.IsNil) - if offset > 2*chunkSize { + if int64(offset) > 2*chunkSize { c.Fatalf("Offset too large, %d > %d", offset, 2*chunkSize) } - err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:]))) + err = suite.StorageDriver.WriteStream(filename, int64(offset), 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:]))) c.Assert(err, check.IsNil) received, err := suite.StorageDriver.GetContent(filename) @@ -212,7 +212,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { filename := randomString(32) defer suite.StorageDriver.Delete(filename) - chunkSize := uint64(32) + chunkSize := int64(32) contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize)) @@ -260,13 +260,13 @@ func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) { // TestList checks the returned list of keys after populating a directory tree. func (suite *DriverSuite) TestList(c *check.C) { - rootDirectory := "/" + randomString(uint64(8+rand.Intn(8))) + rootDirectory := "/" + randomString(int64(8+rand.Intn(8))) defer suite.StorageDriver.Delete(rootDirectory) - parentDirectory := rootDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + parentDirectory := rootDirectory + "/" + randomString(int64(8+rand.Intn(8))) childFiles := make([]string, 50) for i := 0; i < len(childFiles); i++ { - childFile := parentDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + childFile := parentDirectory + "/" + randomString(int64(8+rand.Intn(8))) childFiles[i] = childFile err := suite.StorageDriver.PutContent(childFile, []byte(randomString(32))) c.Assert(err, check.IsNil) @@ -388,7 +388,7 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { doneChan := make(chan struct{}) - testStream := func(size int) { + testStream := func(size int64) { suite.testFileStreams(c, size) doneChan <- struct{}{} } @@ -406,7 +406,7 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { } -func (suite *DriverSuite) testFileStreams(c *check.C, size int) { +func (suite *DriverSuite) testFileStreams(c *check.C, size int64) { tf, err := ioutil.TempFile("", "tf") c.Assert(err, check.IsNil) defer os.Remove(tf.Name()) @@ -414,7 +414,7 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int) { tfName := path.Base(tf.Name()) defer suite.StorageDriver.Delete(tfName) - contents := []byte(randomString(uint64(size))) + contents := []byte(randomString(size)) _, err = tf.Write(contents) c.Assert(err, check.IsNil) @@ -422,7 +422,7 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int) { tf.Sync() tf.Seek(0, os.SEEK_SET) - err = suite.StorageDriver.WriteStream(tfName, 0, uint64(size), tf) + err = suite.StorageDriver.WriteStream(tfName, 0, size, tf) c.Assert(err, check.IsNil) reader, err := suite.StorageDriver.ReadStream(tfName, 0) @@ -450,7 +450,7 @@ func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, contents []byte) { defer suite.StorageDriver.Delete(filename) - err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents))) + err := suite.StorageDriver.WriteStream(filename, 0, int64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents))) c.Assert(err, check.IsNil) reader, err := suite.StorageDriver.ReadStream(filename, 0) @@ -465,7 +465,7 @@ func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, c var pathChars = []byte("abcdefghijklmnopqrstuvwxyz") -func randomString(length uint64) string { +func randomString(length int64) string { b := make([]byte, length) for i := range b { b[i] = pathChars[rand.Intn(len(pathChars))]