forked from TrueCloudLab/distribution
Renames ResumeWritePosition to CurrentSize in storage driver api
This commit is contained in:
parent
abdf927c40
commit
cb1bdacbe3
8 changed files with 24 additions and 24 deletions
|
@ -98,7 +98,7 @@ func (d *FilesystemDriver) ReadStream(path string, offset uint64) (io.ReadCloser
|
||||||
func (d *FilesystemDriver) WriteStream(subPath string, offset, size uint64, reader io.ReadCloser) error {
|
func (d *FilesystemDriver) WriteStream(subPath string, offset, size uint64, reader io.ReadCloser) error {
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
|
|
||||||
resumableOffset, err := d.ResumeWritePosition(subPath)
|
resumableOffset, err := d.CurrentSize(subPath)
|
||||||
if _, pathNotFound := err.(storagedriver.PathNotFoundError); err != nil && !pathNotFound {
|
if _, pathNotFound := err.(storagedriver.PathNotFoundError); err != nil && !pathNotFound {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -154,7 +154,7 @@ func (d *FilesystemDriver) WriteStream(subPath string, offset, size uint64, read
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *FilesystemDriver) ResumeWritePosition(subPath string) (uint64, error) {
|
func (d *FilesystemDriver) CurrentSize(subPath string) (uint64, error) {
|
||||||
fullPath := d.subPath(subPath)
|
fullPath := d.subPath(subPath)
|
||||||
|
|
||||||
fileInfo, err := os.Stat(fullPath)
|
fileInfo, err := os.Stat(fullPath)
|
||||||
|
|
|
@ -78,7 +78,7 @@ func (d *InMemoryDriver) WriteStream(path string, offset, size uint64, reader io
|
||||||
d.mutex.RLock()
|
d.mutex.RLock()
|
||||||
defer d.mutex.RUnlock()
|
defer d.mutex.RUnlock()
|
||||||
|
|
||||||
resumableOffset, err := d.ResumeWritePosition(path)
|
resumableOffset, err := d.CurrentSize(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -100,7 +100,7 @@ func (d *InMemoryDriver) WriteStream(path string, offset, size uint64, reader io
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *InMemoryDriver) ResumeWritePosition(path string) (uint64, error) {
|
func (d *InMemoryDriver) CurrentSize(path string) (uint64, error) {
|
||||||
d.mutex.RLock()
|
d.mutex.RLock()
|
||||||
defer d.mutex.RUnlock()
|
defer d.mutex.RUnlock()
|
||||||
contents, ok := d.storage[path]
|
contents, ok := d.storage[path]
|
||||||
|
|
|
@ -216,16 +216,16 @@ func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64,
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (driver *StorageDriverClient) ResumeWritePosition(path string) (uint64, error) {
|
func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) {
|
||||||
receiver, remoteSender := libchan.Pipe()
|
receiver, remoteSender := libchan.Pipe()
|
||||||
|
|
||||||
params := map[string]interface{}{"Path": path}
|
params := map[string]interface{}{"Path": path}
|
||||||
err := driver.sender.Send(&Request{Type: "ResumeWritePosition", Parameters: params, ResponseChannel: remoteSender})
|
err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var response ResumeWritePositionResponse
|
var response CurrentSizeResponse
|
||||||
err = receiver.Receive(&response)
|
err = receiver.Receive(&response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
|
|
@ -49,8 +49,8 @@ type WriteStreamResponse struct {
|
||||||
Error *responseError
|
Error *responseError
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResumeWritePositionResponse is a response for a ResumeWritePosition request
|
// CurrentSizeResponse is a response for a CurrentSize request
|
||||||
type ResumeWritePositionResponse struct {
|
type CurrentSizeResponse struct {
|
||||||
Position uint64
|
Position uint64
|
||||||
Error *responseError
|
Error *responseError
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,10 +119,10 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
case "ResumeWritePosition":
|
case "CurrentSize":
|
||||||
path, _ := request.Parameters["Path"].(string)
|
path, _ := request.Parameters["Path"].(string)
|
||||||
position, err := driver.ResumeWritePosition(path)
|
position, err := driver.CurrentSize(path)
|
||||||
response := ResumeWritePositionResponse{
|
response := CurrentSizeResponse{
|
||||||
Position: position,
|
Position: position,
|
||||||
Error: ResponseError(err),
|
Error: ResponseError(err),
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,7 +177,7 @@ func (d *S3Driver) WriteStream(path string, offset, size uint64, reader io.ReadC
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *S3Driver) ResumeWritePosition(path string) (uint64, error) {
|
func (d *S3Driver) CurrentSize(path string) (uint64, error) {
|
||||||
_, parts, err := d.getAllParts(path)
|
_, parts, err := d.getAllParts(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
@ -190,11 +190,11 @@ func (d *S3Driver) ResumeWritePosition(path string) (uint64, error) {
|
||||||
return (((uint64(len(parts)) - 1) * uint64(parts[0].Size)) + uint64(parts[len(parts)-1].Size)), nil
|
return (((uint64(len(parts)) - 1) * uint64(parts[0].Size)) + uint64(parts[len(parts)-1].Size)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *S3Driver) List(prefix string) ([]string, error) {
|
func (d *S3Driver) List(path string) ([]string, error) {
|
||||||
if prefix[len(prefix)-1] != '/' {
|
if path[len(path)-1] != '/' {
|
||||||
prefix = prefix + "/"
|
path = path + "/"
|
||||||
}
|
}
|
||||||
listResponse, err := d.Bucket.List(prefix, "/", "", listPartsMax)
|
listResponse, err := d.Bucket.List(path, "/", "", listPartsMax)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -212,7 +212,7 @@ func (d *S3Driver) List(prefix string) ([]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if listResponse.IsTruncated {
|
if listResponse.IsTruncated {
|
||||||
listResponse, err = d.Bucket.List(prefix, "/", listResponse.NextMarker, listPartsMax)
|
listResponse, err = d.Bucket.List(path, "/", listResponse.NextMarker, listPartsMax)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,12 +25,12 @@ type StorageDriver interface {
|
||||||
// the given path
|
// the given path
|
||||||
// The driver will know it has received the full contents when it has read "size" bytes
|
// The driver will know it has received the full contents when it has read "size" bytes
|
||||||
// May be used to resume writing a stream by providing a nonzero offset
|
// May be used to resume writing a stream by providing a nonzero offset
|
||||||
// The offset must be no larger than the ResumeWritePosition for this path
|
// The offset must be no larger than the CurrentSize for this path
|
||||||
WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error
|
WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error
|
||||||
|
|
||||||
// ResumeWritePosition retrieves the byte offset at which it is safe to continue writing at the
|
// CurrentSize retrieves the curernt size in bytes of the object at the given path
|
||||||
// given path
|
// It should be safe to read or write anywhere up to this point
|
||||||
ResumeWritePosition(path string) (uint64, error)
|
CurrentSize(path string) (uint64, error)
|
||||||
|
|
||||||
// List returns a list of the objects that are direct descendants of the given path
|
// List returns a list of the objects that are direct descendants of the given path
|
||||||
List(path string) ([]string, error)
|
List(path string) ([]string, error)
|
||||||
|
|
|
@ -160,7 +160,7 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *C) {
|
||||||
err := suite.StorageDriver.WriteStream(filename, 0, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(contentsChunk1)))
|
err := suite.StorageDriver.WriteStream(filename, 0, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(contentsChunk1)))
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
offset, err := suite.StorageDriver.ResumeWritePosition(filename)
|
offset, err := suite.StorageDriver.CurrentSize(filename)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
if offset > chunkSize {
|
if offset > chunkSize {
|
||||||
c.Fatalf("Offset too large, %d > %d", offset, chunkSize)
|
c.Fatalf("Offset too large, %d > %d", offset, chunkSize)
|
||||||
|
@ -168,7 +168,7 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *C) {
|
||||||
err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize])))
|
err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize])))
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
offset, err = suite.StorageDriver.ResumeWritePosition(filename)
|
offset, err = suite.StorageDriver.CurrentSize(filename)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
if offset > 2*chunkSize {
|
if offset > 2*chunkSize {
|
||||||
c.Fatalf("Offset too large, %d > %d", offset, 2*chunkSize)
|
c.Fatalf("Offset too large, %d > %d", offset, 2*chunkSize)
|
||||||
|
|
Loading…
Reference in a new issue