a3481c5f1c
This only works for a specific whitelist of error types, which is currently all errors in the storagedriver package. Also improves storagedriver tests to enforce proper error types are returned
328 lines
8.4 KiB
Go
328 lines
8.4 KiB
Go
package s3
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strconv"
|
|
|
|
"github.com/crowdmob/goamz/aws"
|
|
"github.com/crowdmob/goamz/s3"
|
|
"github.com/docker/docker-registry/storagedriver"
|
|
"github.com/docker/docker-registry/storagedriver/factory"
|
|
)
|
|
|
|
const driverName = "s3"
|
|
|
|
// minChunkSize defines the minimum multipart upload chunk size
|
|
// S3 API requires multipart upload chunks to be at least 5MB
|
|
const minChunkSize = uint64(5 * 1024 * 1024)
|
|
|
|
// listPartsMax is the largest amount of parts you can request from S3
|
|
const listPartsMax = 1000
|
|
|
|
func init() {
|
|
factory.Register(driverName, &s3DriverFactory{})
|
|
}
|
|
|
|
// s3DriverFactory implements the factory.StorageDriverFactory interface
|
|
type s3DriverFactory struct{}
|
|
|
|
func (factory *s3DriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) {
|
|
return FromParameters(parameters)
|
|
}
|
|
|
|
// Driver is a storagedriver.StorageDriver implementation backed by Amazon S3
|
|
// Objects are stored at absolute keys in the provided bucket
|
|
type Driver struct {
|
|
S3 *s3.S3
|
|
Bucket *s3.Bucket
|
|
Encrypt bool
|
|
}
|
|
|
|
// FromParameters constructs a new Driver with a given parameters map
|
|
// Required parameters:
|
|
// - accesskey
|
|
// - secretkey
|
|
// - region
|
|
// - bucket
|
|
// - encrypt
|
|
func FromParameters(parameters map[string]string) (*Driver, error) {
|
|
accessKey, ok := parameters["accesskey"]
|
|
if !ok || accessKey == "" {
|
|
return nil, fmt.Errorf("No accesskey parameter provided")
|
|
}
|
|
|
|
secretKey, ok := parameters["secretkey"]
|
|
if !ok || secretKey == "" {
|
|
return nil, fmt.Errorf("No secretkey parameter provided")
|
|
}
|
|
|
|
regionName, ok := parameters["region"]
|
|
if !ok || regionName == "" {
|
|
return nil, fmt.Errorf("No region parameter provided")
|
|
}
|
|
region := aws.GetRegion(regionName)
|
|
if region.Name == "" {
|
|
return nil, fmt.Errorf("Invalid region provided: %v", region)
|
|
}
|
|
|
|
bucket, ok := parameters["bucket"]
|
|
if !ok || bucket == "" {
|
|
return nil, fmt.Errorf("No bucket parameter provided")
|
|
}
|
|
|
|
encrypt, ok := parameters["encrypt"]
|
|
if !ok {
|
|
return nil, fmt.Errorf("No encrypt parameter provided")
|
|
}
|
|
|
|
encryptBool, err := strconv.ParseBool(encrypt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Unable to parse the encrypt parameter: %v", err)
|
|
}
|
|
return New(accessKey, secretKey, region, encryptBool, bucket)
|
|
}
|
|
|
|
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
|
|
// bucketName
|
|
func New(accessKey string, secretKey string, region aws.Region, encrypt bool, bucketName string) (*Driver, error) {
|
|
auth := aws.Auth{AccessKey: accessKey, SecretKey: secretKey}
|
|
s3obj := s3.New(auth, region)
|
|
bucket := s3obj.Bucket(bucketName)
|
|
|
|
if err := bucket.PutBucket(getPermissions()); err != nil {
|
|
s3Err, ok := err.(*s3.Error)
|
|
if !(ok && s3Err.Code == "BucketAlreadyOwnedByYou") {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &Driver{s3obj, bucket, encrypt}, nil
|
|
}
|
|
|
|
// Implement the storagedriver.StorageDriver interface
|
|
|
|
// GetContent retrieves the content stored at "path" as a []byte.
|
|
func (d *Driver) GetContent(path string) ([]byte, error) {
|
|
content, err := d.Bucket.Get(path)
|
|
if err != nil {
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
return content, nil
|
|
}
|
|
|
|
// PutContent stores the []byte content at a location designated by "path".
|
|
func (d *Driver) PutContent(path string, contents []byte) error {
|
|
return d.Bucket.Put(path, contents, d.getContentType(), getPermissions(), d.getOptions())
|
|
}
|
|
|
|
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
|
// given byte offset.
|
|
func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
|
|
headers := make(http.Header)
|
|
headers.Add("Range", "bytes="+strconv.FormatUint(offset, 10)+"-")
|
|
|
|
resp, err := d.Bucket.GetResponseWithHeaders(path, headers)
|
|
if err != nil {
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
return resp.Body, nil
|
|
}
|
|
|
|
// WriteStream stores the contents of the provided io.ReadCloser at a location
|
|
// designated by the given path.
|
|
func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
|
|
defer reader.Close()
|
|
|
|
chunkSize := minChunkSize
|
|
for size/chunkSize >= listPartsMax {
|
|
chunkSize *= 2
|
|
}
|
|
|
|
partNumber := 1
|
|
totalRead := uint64(0)
|
|
multi, parts, err := d.getAllParts(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if (offset) > uint64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) {
|
|
return storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
|
}
|
|
|
|
if len(parts) > 0 {
|
|
partNumber = int(offset/chunkSize) + 1
|
|
totalRead = offset
|
|
parts = parts[0 : partNumber-1]
|
|
}
|
|
|
|
buf := make([]byte, chunkSize)
|
|
for {
|
|
bytesRead, err := io.ReadFull(reader, buf)
|
|
totalRead += uint64(bytesRead)
|
|
|
|
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
|
|
return err
|
|
} else if (uint64(bytesRead) < chunkSize) && totalRead != size {
|
|
break
|
|
} else {
|
|
part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead]))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
parts = append(parts, part)
|
|
if totalRead == size {
|
|
multi.Complete(parts)
|
|
break
|
|
}
|
|
|
|
partNumber++
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CurrentSize retrieves the curernt size in bytes of the object at the given
|
|
// path.
|
|
func (d *Driver) CurrentSize(path string) (uint64, error) {
|
|
_, parts, err := d.getAllParts(path)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if len(parts) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
return (((uint64(len(parts)) - 1) * uint64(parts[0].Size)) + uint64(parts[len(parts)-1].Size)), nil
|
|
}
|
|
|
|
// List returns a list of the objects that are direct descendants of the given
|
|
// path.
|
|
func (d *Driver) List(path string) ([]string, error) {
|
|
if path[len(path)-1] != '/' {
|
|
path = path + "/"
|
|
}
|
|
listResponse, err := d.Bucket.List(path, "/", "", listPartsMax)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
files := []string{}
|
|
directories := []string{}
|
|
|
|
for {
|
|
for _, key := range listResponse.Contents {
|
|
files = append(files, key.Key)
|
|
}
|
|
|
|
for _, commonPrefix := range listResponse.CommonPrefixes {
|
|
directories = append(directories, commonPrefix[0:len(commonPrefix)-1])
|
|
}
|
|
|
|
if listResponse.IsTruncated {
|
|
listResponse, err = d.Bucket.List(path, "/", listResponse.NextMarker, listPartsMax)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
return append(files, directories...), nil
|
|
}
|
|
|
|
// Move moves an object stored at sourcePath to destPath, removing the original
|
|
// object.
|
|
func (d *Driver) Move(sourcePath string, destPath string) error {
|
|
/* This is terrible, but aws doesn't have an actual move. */
|
|
_, err := d.Bucket.PutCopy(destPath, getPermissions(),
|
|
s3.CopyOptions{Options: d.getOptions(), MetadataDirective: "", ContentType: d.getContentType()},
|
|
d.Bucket.Name+"/"+sourcePath)
|
|
if err != nil {
|
|
return storagedriver.PathNotFoundError{Path: sourcePath}
|
|
}
|
|
|
|
return d.Delete(sourcePath)
|
|
}
|
|
|
|
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
|
func (d *Driver) Delete(path string) error {
|
|
listResponse, err := d.Bucket.List(path, "", "", listPartsMax)
|
|
if err != nil || len(listResponse.Contents) == 0 {
|
|
return storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
|
|
s3Objects := make([]s3.Object, listPartsMax)
|
|
|
|
for len(listResponse.Contents) > 0 {
|
|
for index, key := range listResponse.Contents {
|
|
s3Objects[index].Key = key.Key
|
|
}
|
|
|
|
err := d.Bucket.DelMulti(s3.Delete{Quiet: false, Objects: s3Objects[0:len(listResponse.Contents)]})
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
listResponse, err = d.Bucket.List(path, "", "", listPartsMax)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *Driver) getHighestIDMulti(path string) (multi *s3.Multi, err error) {
|
|
multis, _, err := d.Bucket.ListMulti(path, "")
|
|
if err != nil && !hasCode(err, "NoSuchUpload") {
|
|
return nil, err
|
|
}
|
|
|
|
uploadID := ""
|
|
|
|
if len(multis) > 0 {
|
|
for _, m := range multis {
|
|
if m.Key == path && m.UploadId >= uploadID {
|
|
uploadID = m.UploadId
|
|
multi = m
|
|
}
|
|
}
|
|
return multi, nil
|
|
}
|
|
multi, err = d.Bucket.InitMulti(path, d.getContentType(), getPermissions(), d.getOptions())
|
|
return multi, err
|
|
}
|
|
|
|
func (d *Driver) getAllParts(path string) (*s3.Multi, []s3.Part, error) {
|
|
multi, err := d.getHighestIDMulti(path)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
parts, err := multi.ListParts()
|
|
return multi, parts, err
|
|
}
|
|
|
|
func hasCode(err error, code string) bool {
|
|
s3err, ok := err.(*aws.Error)
|
|
return ok && s3err.Code == code
|
|
}
|
|
|
|
func (d *Driver) getOptions() s3.Options {
|
|
return s3.Options{SSE: d.Encrypt}
|
|
}
|
|
|
|
func getPermissions() s3.ACL {
|
|
return s3.Private
|
|
}
|
|
|
|
func (d *Driver) getContentType() string {
|
|
return "application/octet-stream"
|
|
}
|