forked from TrueCloudLab/distribution
Merge pull request #105 from AndreyKostov/storagedriver-s3-add-chunksize-param
Add the chunksize parameter
This commit is contained in:
commit
e0c6f03c11
3 changed files with 44 additions and 22 deletions
|
@ -21,4 +21,6 @@ An implementation of the `storagedriver.StorageDriver` interface which uses Amaz
|
||||||
|
|
||||||
`v4auth`: (optional) Whether you would like to use aws signature version 4 with your requests. This defaults to true if not specified (note that the eu-central-1 region does not work with version 2 signatures, so the driver will error out if initialized with this region and v4auth set to false)
|
`v4auth`: (optional) Whether you would like to use aws signature version 4 with your requests. This defaults to true if not specified (note that the eu-central-1 region does not work with version 2 signatures, so the driver will error out if initialized with this region and v4auth set to false)
|
||||||
|
|
||||||
|
`chunksize`: (optional) The default part size for multipart uploads (performed by WriteStream) to s3. The default is 10 MB. Keep in mind that the minimum part size for s3 is 5MB. You might experience better performance for larger chunk sizes depending on the speed of your connection to s3.
|
||||||
|
|
||||||
`rootdirectory`: (optional) The root directory tree in which all registry files will be stored. Defaults to the empty string (bucket root).
|
`rootdirectory`: (optional) The root directory tree in which all registry files will be stored. Defaults to the empty string (bucket root).
|
||||||
|
|
|
@ -34,7 +34,9 @@ const driverName = "s3"
|
||||||
|
|
||||||
// minChunkSize defines the minimum multipart upload chunk size
|
// minChunkSize defines the minimum multipart upload chunk size
|
||||||
// S3 API requires multipart upload chunks to be at least 5MB
|
// S3 API requires multipart upload chunks to be at least 5MB
|
||||||
const chunkSize = 5 * 1024 * 1024
|
const minChunkSize = 5 << 20
|
||||||
|
|
||||||
|
const defaultChunkSize = 2 * minChunkSize
|
||||||
|
|
||||||
// listMax is the largest amount of objects you can request from S3 in a list call
|
// listMax is the largest amount of objects you can request from S3 in a list call
|
||||||
const listMax = 1000
|
const listMax = 1000
|
||||||
|
@ -48,6 +50,7 @@ type DriverParameters struct {
|
||||||
Encrypt bool
|
Encrypt bool
|
||||||
Secure bool
|
Secure bool
|
||||||
V4Auth bool
|
V4Auth bool
|
||||||
|
ChunkSize int64
|
||||||
RootDirectory string
|
RootDirectory string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,8 +70,9 @@ func (factory *s3DriverFactory) Create(parameters map[string]interface{}) (stora
|
||||||
type Driver struct {
|
type Driver struct {
|
||||||
S3 *s3.S3
|
S3 *s3.S3
|
||||||
Bucket *s3.Bucket
|
Bucket *s3.Bucket
|
||||||
|
ChunkSize int64
|
||||||
Encrypt bool
|
Encrypt bool
|
||||||
rootDirectory string
|
RootDirectory string
|
||||||
}
|
}
|
||||||
|
|
||||||
// FromParameters constructs a new Driver with a given parameters map
|
// FromParameters constructs a new Driver with a given parameters map
|
||||||
|
@ -132,6 +136,15 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chunkSize := int64(defaultChunkSize)
|
||||||
|
chunkSizeParam, ok := parameters["chunksize"]
|
||||||
|
if ok {
|
||||||
|
chunkSize, ok = chunkSizeParam.(int64)
|
||||||
|
if !ok || chunkSize < minChunkSize {
|
||||||
|
return nil, fmt.Errorf("The chunksize parameter should be a number that is larger than 5*1024*1024")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rootDirectory, ok := parameters["rootdirectory"]
|
rootDirectory, ok := parameters["rootdirectory"]
|
||||||
if !ok {
|
if !ok {
|
||||||
rootDirectory = ""
|
rootDirectory = ""
|
||||||
|
@ -145,6 +158,7 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
||||||
encryptBool,
|
encryptBool,
|
||||||
secureBool,
|
secureBool,
|
||||||
v4AuthBool,
|
v4AuthBool,
|
||||||
|
chunkSize,
|
||||||
fmt.Sprint(rootDirectory),
|
fmt.Sprint(rootDirectory),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +209,12 @@ func New(params DriverParameters) (*Driver, error) {
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
||||||
return &Driver{s3obj, bucket, params.Encrypt, params.RootDirectory}, nil
|
return &Driver{
|
||||||
|
S3: s3obj,
|
||||||
|
Bucket: bucket,
|
||||||
|
ChunkSize: params.ChunkSize,
|
||||||
|
Encrypt: params.Encrypt,
|
||||||
|
RootDirectory: params.RootDirectory}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implement the storagedriver.StorageDriver interface
|
// Implement the storagedriver.StorageDriver interface
|
||||||
|
@ -273,8 +292,8 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, chunkSize)
|
buf := make([]byte, d.ChunkSize)
|
||||||
zeroBuf := make([]byte, chunkSize)
|
zeroBuf := make([]byte, d.ChunkSize)
|
||||||
|
|
||||||
// We never want to leave a dangling multipart upload, our only consistent state is
|
// We never want to leave a dangling multipart upload, our only consistent state is
|
||||||
// when there is a whole object at path. This is in order to remain consistent with
|
// when there is a whole object at path. This is in order to remain consistent with
|
||||||
|
@ -323,7 +342,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
// Fills from parameter to chunkSize from reader
|
// Fills from parameter to chunkSize from reader
|
||||||
fromReader := func(from int64) error {
|
fromReader := func(from int64) error {
|
||||||
bytesRead = 0
|
bytesRead = 0
|
||||||
for from+int64(bytesRead) < chunkSize {
|
for from+int64(bytesRead) < d.ChunkSize {
|
||||||
nn, err := reader.Read(buf[from+int64(bytesRead):])
|
nn, err := reader.Read(buf[from+int64(bytesRead):])
|
||||||
totalRead += int64(nn)
|
totalRead += int64(nn)
|
||||||
bytesRead += nn
|
bytesRead += nn
|
||||||
|
@ -364,7 +383,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
}
|
}
|
||||||
|
|
||||||
if currentLength >= offset {
|
if currentLength >= offset {
|
||||||
if offset < chunkSize {
|
if offset < d.ChunkSize {
|
||||||
// chunkSize > currentLength >= offset
|
// chunkSize > currentLength >= offset
|
||||||
if err = fromSmallCurrent(offset); err != nil {
|
if err = fromSmallCurrent(offset); err != nil {
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
|
@ -374,7 +393,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if totalRead+offset < chunkSize {
|
if totalRead+offset < d.ChunkSize {
|
||||||
return totalRead, nil
|
return totalRead, nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -407,23 +426,23 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
// Fills between parameters with 0s, making new parts
|
// Fills between parameters with 0s, making new parts
|
||||||
fromZeroFillLarge := func(from, to int64) error {
|
fromZeroFillLarge := func(from, to int64) error {
|
||||||
bytesRead64 := int64(0)
|
bytesRead64 := int64(0)
|
||||||
for to-(from+bytesRead64) >= chunkSize {
|
for to-(from+bytesRead64) >= d.ChunkSize {
|
||||||
part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf))
|
part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
bytesRead64 += chunkSize
|
bytesRead64 += d.ChunkSize
|
||||||
|
|
||||||
parts = append(parts, part)
|
parts = append(parts, part)
|
||||||
partNumber++
|
partNumber++
|
||||||
}
|
}
|
||||||
|
|
||||||
return fromZeroFillSmall(0, (to-from)%chunkSize)
|
return fromZeroFillSmall(0, (to-from)%d.ChunkSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
// currentLength < offset
|
// currentLength < offset
|
||||||
if currentLength < chunkSize {
|
if currentLength < d.ChunkSize {
|
||||||
if offset < chunkSize {
|
if offset < d.ChunkSize {
|
||||||
// chunkSize > offset > currentLength
|
// chunkSize > offset > currentLength
|
||||||
if err = fromSmallCurrent(currentLength); err != nil {
|
if err = fromSmallCurrent(currentLength); err != nil {
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
|
@ -437,7 +456,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if totalRead+offset < chunkSize {
|
if totalRead+offset < d.ChunkSize {
|
||||||
return totalRead, nil
|
return totalRead, nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -446,7 +465,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = fromZeroFillSmall(currentLength, chunkSize); err != nil {
|
if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil {
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -459,15 +478,15 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
partNumber++
|
partNumber++
|
||||||
|
|
||||||
//Zero fill from chunkSize up to offset, then some reader
|
//Zero fill from chunkSize up to offset, then some reader
|
||||||
if err = fromZeroFillLarge(chunkSize, offset); err != nil {
|
if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil {
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = fromReader(offset % chunkSize); err != nil {
|
if err = fromReader(offset % d.ChunkSize); err != nil {
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if totalRead+(offset%chunkSize) < chunkSize {
|
if totalRead+(offset%d.ChunkSize) < d.ChunkSize {
|
||||||
return totalRead, nil
|
return totalRead, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -488,11 +507,11 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = fromReader((offset - currentLength) % chunkSize); err != nil {
|
if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil {
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if totalRead+((offset-currentLength)%chunkSize) < chunkSize {
|
if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize {
|
||||||
return totalRead, nil
|
return totalRead, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -505,7 +524,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if int64(bytesRead) < chunkSize {
|
if int64(bytesRead) < d.ChunkSize {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -670,7 +689,7 @@ func (d *Driver) URLFor(path string, options map[string]interface{}) (string, er
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Driver) s3Path(path string) string {
|
func (d *Driver) s3Path(path string) string {
|
||||||
return strings.TrimLeft(strings.TrimRight(d.rootDirectory, "/")+path, "/")
|
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseError(path string, err error) error {
|
func parseError(path string, err error) error {
|
||||||
|
|
|
@ -63,6 +63,7 @@ func init() {
|
||||||
encryptBool,
|
encryptBool,
|
||||||
secureBool,
|
secureBool,
|
||||||
v4AuthBool,
|
v4AuthBool,
|
||||||
|
minChunkSize,
|
||||||
root,
|
root,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue