forked from TrueCloudLab/distribution
Merge pull request #1385 from BrianBland/s3UseAWSLibrary
[driver/s3] Use aws/aws-sdk-go instead of goamz for s3 driver and cloudfront
This commit is contained in:
commit
881ef1096f
5 changed files with 1204 additions and 22 deletions
|
@ -8,12 +8,14 @@ import (
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/service/cloudfront/sign"
|
||||||
"github.com/docker/distribution/context"
|
"github.com/docker/distribution/context"
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
|
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
|
||||||
"github.com/docker/goamz/cloudfront"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// cloudFrontStorageMiddleware provides an simple implementation of layerHandler that
|
// cloudFrontStorageMiddleware provides an simple implementation of layerHandler that
|
||||||
|
@ -21,7 +23,8 @@ import (
|
||||||
// then issues HTTP Temporary Redirects to this CloudFront content URL.
|
// then issues HTTP Temporary Redirects to this CloudFront content URL.
|
||||||
type cloudFrontStorageMiddleware struct {
|
type cloudFrontStorageMiddleware struct {
|
||||||
storagedriver.StorageDriver
|
storagedriver.StorageDriver
|
||||||
cloudfront *cloudfront.CloudFront
|
urlSigner *sign.URLSigner
|
||||||
|
baseURL string
|
||||||
duration time.Duration
|
duration time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,15 +36,24 @@ var _ storagedriver.StorageDriver = &cloudFrontStorageMiddleware{}
|
||||||
func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) {
|
func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||||
base, ok := options["baseurl"]
|
base, ok := options["baseurl"]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("No baseurl provided")
|
return nil, fmt.Errorf("no baseurl provided")
|
||||||
}
|
}
|
||||||
baseURL, ok := base.(string)
|
baseURL, ok := base.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("baseurl must be a string")
|
return nil, fmt.Errorf("baseurl must be a string")
|
||||||
}
|
}
|
||||||
|
if !strings.Contains(baseURL, "://") {
|
||||||
|
baseURL = "https://" + baseURL
|
||||||
|
}
|
||||||
|
if !strings.HasSuffix(baseURL, "/") {
|
||||||
|
baseURL += "/"
|
||||||
|
}
|
||||||
|
if _, err := url.Parse(baseURL); err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid baseurl: %v", err)
|
||||||
|
}
|
||||||
pk, ok := options["privatekey"]
|
pk, ok := options["privatekey"]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("No privatekey provided")
|
return nil, fmt.Errorf("no privatekey provided")
|
||||||
}
|
}
|
||||||
pkPath, ok := pk.(string)
|
pkPath, ok := pk.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -49,7 +61,7 @@ func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, o
|
||||||
}
|
}
|
||||||
kpid, ok := options["keypairid"]
|
kpid, ok := options["keypairid"]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("No keypairid provided")
|
return nil, fmt.Errorf("no keypairid provided")
|
||||||
}
|
}
|
||||||
keypairID, ok := kpid.(string)
|
keypairID, ok := kpid.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -58,19 +70,19 @@ func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, o
|
||||||
|
|
||||||
pkBytes, err := ioutil.ReadFile(pkPath)
|
pkBytes, err := ioutil.ReadFile(pkPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to read privatekey file: %s", err)
|
return nil, fmt.Errorf("failed to read privatekey file: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
block, _ := pem.Decode([]byte(pkBytes))
|
block, _ := pem.Decode([]byte(pkBytes))
|
||||||
if block == nil {
|
if block == nil {
|
||||||
return nil, fmt.Errorf("Failed to decode private key as an rsa private key")
|
return nil, fmt.Errorf("failed to decode private key as an rsa private key")
|
||||||
}
|
}
|
||||||
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
|
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cf := cloudfront.New(baseURL, privateKey, keypairID)
|
urlSigner := sign.NewURLSigner(keypairID, privateKey)
|
||||||
|
|
||||||
duration := 20 * time.Minute
|
duration := 20 * time.Minute
|
||||||
d, ok := options["duration"]
|
d, ok := options["duration"]
|
||||||
|
@ -81,13 +93,18 @@ func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, o
|
||||||
case string:
|
case string:
|
||||||
dur, err := time.ParseDuration(d)
|
dur, err := time.ParseDuration(d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Invalid duration: %s", err)
|
return nil, fmt.Errorf("invalid duration: %s", err)
|
||||||
}
|
}
|
||||||
duration = dur
|
duration = dur
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &cloudFrontStorageMiddleware{StorageDriver: storageDriver, cloudfront: cf, duration: duration}, nil
|
return &cloudFrontStorageMiddleware{
|
||||||
|
StorageDriver: storageDriver,
|
||||||
|
urlSigner: urlSigner,
|
||||||
|
baseURL: baseURL,
|
||||||
|
duration: duration,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// S3BucketKeyer is any type that is capable of returning the S3 bucket key
|
// S3BucketKeyer is any type that is capable of returning the S3 bucket key
|
||||||
|
@ -106,7 +123,7 @@ func (lh *cloudFrontStorageMiddleware) URLFor(ctx context.Context, path string,
|
||||||
return lh.StorageDriver.URLFor(ctx, path, options)
|
return lh.StorageDriver.URLFor(ctx, path, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
cfURL, err := lh.cloudfront.CannedSignedURL(keyer.S3BucketKey(path), "", time.Now().Add(lh.duration))
|
cfURL, err := lh.urlSigner.Sign(lh.baseURL+keyer.S3BucketKey(path), time.Now().Add(lh.duration))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
966
docs/storage/driver/s3-aws/s3.go
Normal file
966
docs/storage/driver/s3-aws/s3.go
Normal file
|
@ -0,0 +1,966 @@
|
||||||
|
// Package s3 provides a storagedriver.StorageDriver implementation to
|
||||||
|
// store blobs in Amazon S3 cloud storage.
|
||||||
|
//
|
||||||
|
// This package leverages the official aws client library for interfacing with
|
||||||
|
// S3.
|
||||||
|
//
|
||||||
|
// Because S3 is a key, value store the Stat call does not support last modification
|
||||||
|
// time for directories (directories are an abstraction for key, value stores)
|
||||||
|
//
|
||||||
|
// Keep in mind that S3 guarantees only read-after-write consistency for new
|
||||||
|
// objects, but no read-after-update or list-after-write consistency.
|
||||||
|
package s3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"reflect"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/request"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
|
||||||
|
"github.com/docker/distribution/context"
|
||||||
|
"github.com/docker/distribution/registry/client/transport"
|
||||||
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
|
"github.com/docker/distribution/registry/storage/driver/base"
|
||||||
|
"github.com/docker/distribution/registry/storage/driver/factory"
|
||||||
|
)
|
||||||
|
|
||||||
|
const driverName = "s3aws"
|
||||||
|
|
||||||
|
// minChunkSize defines the minimum multipart upload chunk size
|
||||||
|
// S3 API requires multipart upload chunks to be at least 5MB
|
||||||
|
const minChunkSize = 5 << 20
|
||||||
|
|
||||||
|
const defaultChunkSize = 2 * minChunkSize
|
||||||
|
|
||||||
|
// listMax is the largest amount of objects you can request from S3 in a list call
|
||||||
|
const listMax = 1000
|
||||||
|
|
||||||
|
// validRegions maps known s3 region identifiers to region descriptors
|
||||||
|
var validRegions = map[string]struct{}{}
|
||||||
|
|
||||||
|
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
|
||||||
|
type DriverParameters struct {
|
||||||
|
AccessKey string
|
||||||
|
SecretKey string
|
||||||
|
Bucket string
|
||||||
|
Region string
|
||||||
|
Encrypt bool
|
||||||
|
Secure bool
|
||||||
|
ChunkSize int64
|
||||||
|
RootDirectory string
|
||||||
|
StorageClass string
|
||||||
|
UserAgent string
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
for _, region := range []string{
|
||||||
|
"us-east-1",
|
||||||
|
"us-west-1",
|
||||||
|
"us-west-2",
|
||||||
|
"eu-west-1",
|
||||||
|
"eu-central-1",
|
||||||
|
"ap-southeast-1",
|
||||||
|
"ap-southeast-2",
|
||||||
|
"ap-northeast-1",
|
||||||
|
"ap-northeast-2",
|
||||||
|
"sa-east-1",
|
||||||
|
} {
|
||||||
|
validRegions[region] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register this as the default s3 driver in addition to s3aws
|
||||||
|
factory.Register("s3", &s3DriverFactory{})
|
||||||
|
factory.Register(driverName, &s3DriverFactory{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// s3DriverFactory implements the factory.StorageDriverFactory interface
|
||||||
|
type s3DriverFactory struct{}
|
||||||
|
|
||||||
|
func (factory *s3DriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||||
|
return FromParameters(parameters)
|
||||||
|
}
|
||||||
|
|
||||||
|
type driver struct {
|
||||||
|
S3 *s3.S3
|
||||||
|
Bucket string
|
||||||
|
ChunkSize int64
|
||||||
|
Encrypt bool
|
||||||
|
RootDirectory string
|
||||||
|
StorageClass string
|
||||||
|
|
||||||
|
pool sync.Pool // pool []byte buffers used for WriteStream
|
||||||
|
zeros []byte // shared, zero-valued buffer used for WriteStream
|
||||||
|
}
|
||||||
|
|
||||||
|
type baseEmbed struct {
|
||||||
|
base.Base
|
||||||
|
}
|
||||||
|
|
||||||
|
// Driver is a storagedriver.StorageDriver implementation backed by Amazon S3
|
||||||
|
// Objects are stored at absolute keys in the provided bucket.
|
||||||
|
type Driver struct {
|
||||||
|
baseEmbed
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromParameters constructs a new Driver with a given parameters map
|
||||||
|
// Required parameters:
|
||||||
|
// - accesskey
|
||||||
|
// - secretkey
|
||||||
|
// - region
|
||||||
|
// - bucket
|
||||||
|
// - encrypt
|
||||||
|
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
||||||
|
// Providing no values for these is valid in case the user is authenticating
|
||||||
|
// with an IAM on an ec2 instance (in which case the instance credentials will
|
||||||
|
// be summoned when GetAuth is called)
|
||||||
|
accessKey, ok := parameters["accesskey"]
|
||||||
|
if !ok {
|
||||||
|
accessKey = ""
|
||||||
|
}
|
||||||
|
secretKey, ok := parameters["secretkey"]
|
||||||
|
if !ok {
|
||||||
|
secretKey = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
regionName, ok := parameters["region"]
|
||||||
|
if !ok || fmt.Sprint(regionName) == "" {
|
||||||
|
return nil, fmt.Errorf("No region parameter provided")
|
||||||
|
}
|
||||||
|
region := fmt.Sprint(regionName)
|
||||||
|
_, ok = validRegions[region]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("Invalid region provided: %v", region)
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket, ok := parameters["bucket"]
|
||||||
|
if !ok || fmt.Sprint(bucket) == "" {
|
||||||
|
return nil, fmt.Errorf("No bucket parameter provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
encryptBool := false
|
||||||
|
encrypt, ok := parameters["encrypt"]
|
||||||
|
if ok {
|
||||||
|
encryptBool, ok = encrypt.(bool)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("The encrypt parameter should be a boolean")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
secureBool := true
|
||||||
|
secure, ok := parameters["secure"]
|
||||||
|
if ok {
|
||||||
|
secureBool, ok = secure.(bool)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("The secure parameter should be a boolean")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
chunkSize := int64(defaultChunkSize)
|
||||||
|
chunkSizeParam, ok := parameters["chunksize"]
|
||||||
|
if ok {
|
||||||
|
switch v := chunkSizeParam.(type) {
|
||||||
|
case string:
|
||||||
|
vv, err := strconv.ParseInt(v, 0, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
|
||||||
|
}
|
||||||
|
chunkSize = vv
|
||||||
|
case int64:
|
||||||
|
chunkSize = v
|
||||||
|
case int, uint, int32, uint32, uint64:
|
||||||
|
chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
|
||||||
|
}
|
||||||
|
|
||||||
|
if chunkSize < minChunkSize {
|
||||||
|
return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rootDirectory, ok := parameters["rootdirectory"]
|
||||||
|
if !ok {
|
||||||
|
rootDirectory = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
storageClass := s3.StorageClassStandard
|
||||||
|
storageClassParam, ok := parameters["storageclass"]
|
||||||
|
if ok {
|
||||||
|
storageClassString, ok := storageClassParam.(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("The storageclass parameter must be one of %v, %v invalid", []string{s3.StorageClassStandard, s3.StorageClassReducedRedundancy}, storageClassParam)
|
||||||
|
}
|
||||||
|
// All valid storage class parameters are UPPERCASE, so be a bit more flexible here
|
||||||
|
storageClassString = strings.ToUpper(storageClassString)
|
||||||
|
if storageClassString != s3.StorageClassStandard && storageClassString != s3.StorageClassReducedRedundancy {
|
||||||
|
return nil, fmt.Errorf("The storageclass parameter must be one of %v, %v invalid", []string{s3.StorageClassStandard, s3.StorageClassReducedRedundancy}, storageClassParam)
|
||||||
|
}
|
||||||
|
storageClass = storageClassString
|
||||||
|
}
|
||||||
|
|
||||||
|
userAgent, ok := parameters["useragent"]
|
||||||
|
if !ok {
|
||||||
|
userAgent = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
params := DriverParameters{
|
||||||
|
fmt.Sprint(accessKey),
|
||||||
|
fmt.Sprint(secretKey),
|
||||||
|
fmt.Sprint(bucket),
|
||||||
|
region,
|
||||||
|
encryptBool,
|
||||||
|
secureBool,
|
||||||
|
chunkSize,
|
||||||
|
fmt.Sprint(rootDirectory),
|
||||||
|
storageClass,
|
||||||
|
fmt.Sprint(userAgent),
|
||||||
|
}
|
||||||
|
|
||||||
|
return New(params)
|
||||||
|
}
|
||||||
|
|
||||||
|
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
|
||||||
|
// bucketName
|
||||||
|
func New(params DriverParameters) (*Driver, error) {
|
||||||
|
awsConfig := aws.NewConfig()
|
||||||
|
creds := credentials.NewChainCredentials([]credentials.Provider{
|
||||||
|
&credentials.StaticProvider{
|
||||||
|
Value: credentials.Value{
|
||||||
|
AccessKeyID: params.AccessKey,
|
||||||
|
SecretAccessKey: params.SecretKey,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&credentials.EnvProvider{},
|
||||||
|
&credentials.SharedCredentialsProvider{},
|
||||||
|
&ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())},
|
||||||
|
})
|
||||||
|
|
||||||
|
awsConfig.WithCredentials(creds)
|
||||||
|
awsConfig.WithRegion(params.Region)
|
||||||
|
awsConfig.WithDisableSSL(!params.Secure)
|
||||||
|
// awsConfig.WithMaxRetries(10)
|
||||||
|
|
||||||
|
if params.UserAgent != "" {
|
||||||
|
awsConfig.WithHTTPClient(&http.Client{
|
||||||
|
Transport: transport.NewTransport(http.DefaultTransport, transport.NewHeaderRequestModifier(http.Header{http.CanonicalHeaderKey("User-Agent"): []string{params.UserAgent}})),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
s3obj := s3.New(session.New(awsConfig))
|
||||||
|
|
||||||
|
// TODO Currently multipart uploads have no timestamps, so this would be unwise
|
||||||
|
// if you initiated a new s3driver while another one is running on the same bucket.
|
||||||
|
// multis, _, err := bucket.ListMulti("", "")
|
||||||
|
// if err != nil {
|
||||||
|
// return nil, err
|
||||||
|
// }
|
||||||
|
|
||||||
|
// for _, multi := range multis {
|
||||||
|
// err := multi.Abort()
|
||||||
|
// //TODO appropriate to do this error checking?
|
||||||
|
// if err != nil {
|
||||||
|
// return nil, err
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
d := &driver{
|
||||||
|
S3: s3obj,
|
||||||
|
Bucket: params.Bucket,
|
||||||
|
ChunkSize: params.ChunkSize,
|
||||||
|
Encrypt: params.Encrypt,
|
||||||
|
RootDirectory: params.RootDirectory,
|
||||||
|
StorageClass: params.StorageClass,
|
||||||
|
zeros: make([]byte, params.ChunkSize),
|
||||||
|
}
|
||||||
|
|
||||||
|
d.pool.New = func() interface{} {
|
||||||
|
return make([]byte, d.ChunkSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Driver{
|
||||||
|
baseEmbed: baseEmbed{
|
||||||
|
Base: base.Base{
|
||||||
|
StorageDriver: d,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implement the storagedriver.StorageDriver interface
|
||||||
|
|
||||||
|
func (d *driver) Name() string {
|
||||||
|
return driverName
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetContent retrieves the content stored at "path" as a []byte.
|
||||||
|
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
||||||
|
reader, err := d.ReadStream(ctx, path, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return ioutil.ReadAll(reader)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutContent stores the []byte content at a location designated by "path".
|
||||||
|
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
||||||
|
_, err := d.S3.PutObject(&s3.PutObjectInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
ContentType: d.getContentType(),
|
||||||
|
ACL: d.getACL(),
|
||||||
|
ServerSideEncryption: d.getEncryptionMode(),
|
||||||
|
StorageClass: d.getStorageClass(),
|
||||||
|
Body: bytes.NewReader(contents),
|
||||||
|
})
|
||||||
|
return parseError(path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
||||||
|
// given byte offset.
|
||||||
|
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||||
|
resp, err := d.S3.GetObject(&s3.GetObjectInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
Range: aws.String("bytes=" + strconv.FormatInt(offset, 10) + "-"),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "InvalidRange" {
|
||||||
|
return ioutil.NopCloser(bytes.NewReader(nil)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, parseError(path, err)
|
||||||
|
}
|
||||||
|
return resp.Body, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteStream stores the contents of the provided io.Reader at a
|
||||||
|
// location designated by the given path. The driver will know it has
|
||||||
|
// received the full contents when the reader returns io.EOF. The number
|
||||||
|
// of successfully READ bytes will be returned, even if an error is
|
||||||
|
// returned. May be used to resume writing a stream by providing a nonzero
|
||||||
|
// offset. Offsets past the current size will write from the position
|
||||||
|
// beyond the end of the file.
|
||||||
|
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
|
||||||
|
var partNumber int64 = 1
|
||||||
|
bytesRead := 0
|
||||||
|
var putErrChan chan error
|
||||||
|
parts := []*s3.CompletedPart{}
|
||||||
|
done := make(chan struct{}) // stopgap to free up waiting goroutines
|
||||||
|
|
||||||
|
resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
ContentType: d.getContentType(),
|
||||||
|
ACL: d.getACL(),
|
||||||
|
ServerSideEncryption: d.getEncryptionMode(),
|
||||||
|
StorageClass: d.getStorageClass(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
uploadID := resp.UploadId
|
||||||
|
|
||||||
|
buf := d.getbuf()
|
||||||
|
|
||||||
|
// We never want to leave a dangling multipart upload, our only consistent state is
|
||||||
|
// when there is a whole object at path. This is in order to remain consistent with
|
||||||
|
// the stat call.
|
||||||
|
//
|
||||||
|
// Note that if the machine dies before executing the defer, we will be left with a dangling
|
||||||
|
// multipart upload, which will eventually be cleaned up, but we will lose all of the progress
|
||||||
|
// made prior to the machine crashing.
|
||||||
|
defer func() {
|
||||||
|
if putErrChan != nil {
|
||||||
|
if putErr := <-putErrChan; putErr != nil {
|
||||||
|
err = putErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(parts) > 0 {
|
||||||
|
_, err := d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
UploadId: uploadID,
|
||||||
|
MultipartUpload: &s3.CompletedMultipartUpload{
|
||||||
|
Parts: parts,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// TODO (brianbland): log errors here
|
||||||
|
d.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
UploadId: uploadID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
d.putbuf(buf) // needs to be here to pick up new buf value
|
||||||
|
close(done) // free up any waiting goroutines
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Fills from 0 to total from current
|
||||||
|
fromSmallCurrent := func(total int64) error {
|
||||||
|
current, err := d.ReadStream(ctx, path, 0)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
bytesRead = 0
|
||||||
|
for int64(bytesRead) < total {
|
||||||
|
//The loop should very rarely enter a second iteration
|
||||||
|
nn, err := current.Read(buf[bytesRead:total])
|
||||||
|
bytesRead += nn
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fills from parameter to chunkSize from reader
|
||||||
|
fromReader := func(from int64) error {
|
||||||
|
bytesRead = 0
|
||||||
|
for from+int64(bytesRead) < d.ChunkSize {
|
||||||
|
nn, err := reader.Read(buf[from+int64(bytesRead):])
|
||||||
|
totalRead += int64(nn)
|
||||||
|
bytesRead += nn
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if putErrChan == nil {
|
||||||
|
putErrChan = make(chan error)
|
||||||
|
} else {
|
||||||
|
if putErr := <-putErrChan; putErr != nil {
|
||||||
|
putErrChan = nil
|
||||||
|
return putErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(bytesRead int, from int64, buf []byte) {
|
||||||
|
defer d.putbuf(buf) // this buffer gets dropped after this call
|
||||||
|
|
||||||
|
// DRAGONS(stevvooe): There are few things one might want to know
|
||||||
|
// about this section. First, the putErrChan is expecting an error
|
||||||
|
// and a nil or just a nil to come through the channel. This is
|
||||||
|
// covered by the silly defer below. The other aspect is the s3
|
||||||
|
// retry backoff to deal with RequestTimeout errors. Even though
|
||||||
|
// the underlying s3 library should handle it, it doesn't seem to
|
||||||
|
// be part of the shouldRetry function (see AdRoll/goamz/s3).
|
||||||
|
defer func() {
|
||||||
|
select {
|
||||||
|
case putErrChan <- nil: // for some reason, we do this no matter what.
|
||||||
|
case <-done:
|
||||||
|
return // ensure we don't leak the goroutine
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if bytesRead <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := d.S3.UploadPart(&s3.UploadPartInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
PartNumber: aws.Int64(partNumber),
|
||||||
|
UploadId: uploadID,
|
||||||
|
Body: bytes.NewReader(buf[0 : int64(bytesRead)+from]),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("error putting part, aborting: %v", err)
|
||||||
|
select {
|
||||||
|
case putErrChan <- err:
|
||||||
|
case <-done:
|
||||||
|
return // don't leak the goroutine
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parts and partNumber are safe, because this function is the
|
||||||
|
// only one modifying them and we force it to be executed
|
||||||
|
// serially.
|
||||||
|
parts = append(parts, &s3.CompletedPart{
|
||||||
|
ETag: resp.ETag,
|
||||||
|
PartNumber: aws.Int64(partNumber),
|
||||||
|
})
|
||||||
|
partNumber++
|
||||||
|
}(bytesRead, from, buf)
|
||||||
|
|
||||||
|
buf = d.getbuf() // use a new buffer for the next call
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if offset > 0 {
|
||||||
|
resp, err := d.S3.HeadObject(&s3.HeadObjectInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
if s3Err, ok := err.(awserr.Error); !ok || s3Err.Code() != "NoSuchKey" {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
currentLength := int64(0)
|
||||||
|
if err == nil && resp.ContentLength != nil {
|
||||||
|
currentLength = *resp.ContentLength
|
||||||
|
}
|
||||||
|
|
||||||
|
if currentLength >= offset {
|
||||||
|
if offset < d.ChunkSize {
|
||||||
|
// chunkSize > currentLength >= offset
|
||||||
|
if err = fromSmallCurrent(offset); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = fromReader(offset); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if totalRead+offset < d.ChunkSize {
|
||||||
|
return totalRead, nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// currentLength >= offset >= chunkSize
|
||||||
|
resp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
PartNumber: aws.Int64(partNumber),
|
||||||
|
UploadId: uploadID,
|
||||||
|
CopySource: aws.String(d.Bucket + "/" + d.s3Path(path)),
|
||||||
|
CopySourceRange: aws.String("bytes=0-" + strconv.FormatInt(offset-1, 10)),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
parts = append(parts, &s3.CompletedPart{
|
||||||
|
ETag: resp.CopyPartResult.ETag,
|
||||||
|
PartNumber: aws.Int64(partNumber),
|
||||||
|
})
|
||||||
|
partNumber++
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Fills between parameters with 0s but only when to - from <= chunkSize
|
||||||
|
fromZeroFillSmall := func(from, to int64) error {
|
||||||
|
bytesRead = 0
|
||||||
|
for from+int64(bytesRead) < to {
|
||||||
|
nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to])
|
||||||
|
bytesRead += nn
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fills between parameters with 0s, making new parts
|
||||||
|
fromZeroFillLarge := func(from, to int64) error {
|
||||||
|
bytesRead64 := int64(0)
|
||||||
|
for to-(from+bytesRead64) >= d.ChunkSize {
|
||||||
|
resp, err := d.S3.UploadPart(&s3.UploadPartInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
PartNumber: aws.Int64(partNumber),
|
||||||
|
UploadId: uploadID,
|
||||||
|
Body: bytes.NewReader(d.zeros),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bytesRead64 += d.ChunkSize
|
||||||
|
|
||||||
|
parts = append(parts, &s3.CompletedPart{
|
||||||
|
ETag: resp.ETag,
|
||||||
|
PartNumber: aws.Int64(partNumber),
|
||||||
|
})
|
||||||
|
partNumber++
|
||||||
|
}
|
||||||
|
|
||||||
|
return fromZeroFillSmall(0, (to-from)%d.ChunkSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// currentLength < offset
|
||||||
|
if currentLength < d.ChunkSize {
|
||||||
|
if offset < d.ChunkSize {
|
||||||
|
// chunkSize > offset > currentLength
|
||||||
|
if err = fromSmallCurrent(currentLength); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = fromZeroFillSmall(currentLength, offset); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = fromReader(offset); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if totalRead+offset < d.ChunkSize {
|
||||||
|
return totalRead, nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// offset >= chunkSize > currentLength
|
||||||
|
if err = fromSmallCurrent(currentLength); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := d.S3.UploadPart(&s3.UploadPartInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
PartNumber: aws.Int64(partNumber),
|
||||||
|
UploadId: uploadID,
|
||||||
|
Body: bytes.NewReader(buf),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
parts = append(parts, &s3.CompletedPart{
|
||||||
|
ETag: resp.ETag,
|
||||||
|
PartNumber: aws.Int64(partNumber),
|
||||||
|
})
|
||||||
|
partNumber++
|
||||||
|
|
||||||
|
//Zero fill from chunkSize up to offset, then some reader
|
||||||
|
if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = fromReader(offset % d.ChunkSize); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if totalRead+(offset%d.ChunkSize) < d.ChunkSize {
|
||||||
|
return totalRead, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// offset > currentLength >= chunkSize
|
||||||
|
resp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
PartNumber: aws.Int64(partNumber),
|
||||||
|
UploadId: uploadID,
|
||||||
|
CopySource: aws.String(d.Bucket + "/" + d.s3Path(path)),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
parts = append(parts, &s3.CompletedPart{
|
||||||
|
ETag: resp.CopyPartResult.ETag,
|
||||||
|
PartNumber: aws.Int64(partNumber),
|
||||||
|
})
|
||||||
|
partNumber++
|
||||||
|
|
||||||
|
//Zero fill from currentLength up to offset, then some reader
|
||||||
|
if err = fromZeroFillLarge(currentLength, offset); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize {
|
||||||
|
return totalRead, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
if err = fromReader(0); err != nil {
|
||||||
|
return totalRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if int64(bytesRead) < d.ChunkSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalRead, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stat retrieves the FileInfo for the given path, including the current size
|
||||||
|
// in bytes and the creation time.
|
||||||
|
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
|
||||||
|
resp, err := d.S3.ListObjects(&s3.ListObjectsInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Prefix: aws.String(d.s3Path(path)),
|
||||||
|
MaxKeys: aws.Int64(1),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
fi := storagedriver.FileInfoFields{
|
||||||
|
Path: path,
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(resp.Contents) == 1 {
|
||||||
|
if *resp.Contents[0].Key != d.s3Path(path) {
|
||||||
|
fi.IsDir = true
|
||||||
|
} else {
|
||||||
|
fi.IsDir = false
|
||||||
|
fi.Size = *resp.Contents[0].Size
|
||||||
|
fi.ModTime = *resp.Contents[0].LastModified
|
||||||
|
}
|
||||||
|
} else if len(resp.CommonPrefixes) == 1 {
|
||||||
|
fi.IsDir = true
|
||||||
|
} else {
|
||||||
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
|
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List returns a list of the objects that are direct descendants of the given path.
|
||||||
|
func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
|
||||||
|
path := opath
|
||||||
|
if path != "/" && path[len(path)-1] != '/' {
|
||||||
|
path = path + "/"
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is to cover for the cases when the rootDirectory of the driver is either "" or "/".
|
||||||
|
// In those cases, there is no root prefix to replace and we must actually add a "/" to all
|
||||||
|
// results in order to keep them as valid paths as recognized by storagedriver.PathRegexp
|
||||||
|
prefix := ""
|
||||||
|
if d.s3Path("") == "" {
|
||||||
|
prefix = "/"
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := d.S3.ListObjects(&s3.ListObjectsInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Prefix: aws.String(d.s3Path(path)),
|
||||||
|
Delimiter: aws.String("/"),
|
||||||
|
MaxKeys: aws.Int64(listMax),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, parseError(opath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
files := []string{}
|
||||||
|
directories := []string{}
|
||||||
|
|
||||||
|
for {
|
||||||
|
for _, key := range resp.Contents {
|
||||||
|
files = append(files, strings.Replace(*key.Key, d.s3Path(""), prefix, 1))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, commonPrefix := range resp.CommonPrefixes {
|
||||||
|
commonPrefix := *commonPrefix.Prefix
|
||||||
|
directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.s3Path(""), prefix, 1))
|
||||||
|
}
|
||||||
|
|
||||||
|
if *resp.IsTruncated {
|
||||||
|
resp, err = d.S3.ListObjects(&s3.ListObjectsInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Prefix: aws.String(d.s3Path(path)),
|
||||||
|
Delimiter: aws.String("/"),
|
||||||
|
MaxKeys: aws.Int64(listMax),
|
||||||
|
Marker: resp.NextMarker,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if opath != "/" {
|
||||||
|
if len(files) == 0 && len(directories) == 0 {
|
||||||
|
// Treat empty response as missing directory, since we don't actually
|
||||||
|
// have directories in s3.
|
||||||
|
return nil, storagedriver.PathNotFoundError{Path: opath}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return append(files, directories...), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move moves an object stored at sourcePath to destPath, removing the original
|
||||||
|
// object.
|
||||||
|
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
||||||
|
/* This is terrible, but aws doesn't have an actual move. */
|
||||||
|
_, err := d.S3.CopyObject(&s3.CopyObjectInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(destPath)),
|
||||||
|
ContentType: d.getContentType(),
|
||||||
|
ACL: d.getACL(),
|
||||||
|
ServerSideEncryption: d.getEncryptionMode(),
|
||||||
|
StorageClass: d.getStorageClass(),
|
||||||
|
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return parseError(sourcePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return d.Delete(ctx, sourcePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
||||||
|
func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
|
resp, err := d.S3.ListObjects(&s3.ListObjectsInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Prefix: aws.String(d.s3Path(path)),
|
||||||
|
})
|
||||||
|
if err != nil || len(resp.Contents) == 0 {
|
||||||
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
|
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
|
||||||
|
|
||||||
|
for len(resp.Contents) > 0 {
|
||||||
|
for _, key := range resp.Contents {
|
||||||
|
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
|
||||||
|
Key: key.Key,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Delete: &s3.Delete{
|
||||||
|
Objects: s3Objects,
|
||||||
|
Quiet: aws.Bool(false),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err = d.S3.ListObjects(&s3.ListObjectsInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Prefix: aws.String(d.s3Path(path)),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
|
||||||
|
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
|
||||||
|
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
|
||||||
|
methodString := "GET"
|
||||||
|
method, ok := options["method"]
|
||||||
|
if ok {
|
||||||
|
methodString, ok = method.(string)
|
||||||
|
if !ok || (methodString != "GET" && methodString != "HEAD") {
|
||||||
|
return "", storagedriver.ErrUnsupportedMethod{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expiresIn := 20 * time.Minute
|
||||||
|
expires, ok := options["expiry"]
|
||||||
|
if ok {
|
||||||
|
et, ok := expires.(time.Time)
|
||||||
|
if ok {
|
||||||
|
expiresIn = et.Sub(time.Now())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var req *request.Request
|
||||||
|
|
||||||
|
switch methodString {
|
||||||
|
case "GET":
|
||||||
|
req, _ = d.S3.GetObjectRequest(&s3.GetObjectInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
})
|
||||||
|
case "HEAD":
|
||||||
|
req, _ = d.S3.HeadObjectRequest(&s3.HeadObjectInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Key: aws.String(d.s3Path(path)),
|
||||||
|
})
|
||||||
|
default:
|
||||||
|
panic("unreachable")
|
||||||
|
}
|
||||||
|
|
||||||
|
return req.Presign(expiresIn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *driver) s3Path(path string) string {
|
||||||
|
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
||||||
|
}
|
||||||
|
|
||||||
|
// S3BucketKey returns the s3 bucket key for the given storage driver path.
|
||||||
|
func (d *Driver) S3BucketKey(path string) string {
|
||||||
|
return d.StorageDriver.(*driver).s3Path(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseError(path string, err error) error {
|
||||||
|
if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "NoSuchKey" {
|
||||||
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *driver) getEncryptionMode() *string {
|
||||||
|
if d.Encrypt {
|
||||||
|
return aws.String("AES256")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *driver) getContentType() *string {
|
||||||
|
return aws.String("application/octet-stream")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *driver) getACL() *string {
|
||||||
|
return aws.String("private")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *driver) getStorageClass() *string {
|
||||||
|
return aws.String(d.StorageClass)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
|
||||||
|
func (d *driver) getbuf() []byte {
|
||||||
|
return d.pool.Get().([]byte)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *driver) putbuf(p []byte) {
|
||||||
|
copy(p, d.zeros)
|
||||||
|
d.pool.Put(p)
|
||||||
|
}
|
201
docs/storage/driver/s3-aws/s3_test.go
Normal file
201
docs/storage/driver/s3-aws/s3_test.go
Normal file
|
@ -0,0 +1,201 @@
|
||||||
|
package s3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
|
||||||
|
"github.com/docker/distribution/context"
|
||||||
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
|
"github.com/docker/distribution/registry/storage/driver/testsuites"
|
||||||
|
|
||||||
|
"gopkg.in/check.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Hook up gocheck into the "go test" runner.
|
||||||
|
func Test(t *testing.T) { check.TestingT(t) }
|
||||||
|
|
||||||
|
var s3DriverConstructor func(rootDirectory, storageClass string) (*Driver, error)
|
||||||
|
var skipS3 func() string
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
accessKey := os.Getenv("AWS_ACCESS_KEY")
|
||||||
|
secretKey := os.Getenv("AWS_SECRET_KEY")
|
||||||
|
bucket := os.Getenv("S3_BUCKET")
|
||||||
|
encrypt := os.Getenv("S3_ENCRYPT")
|
||||||
|
secure := os.Getenv("S3_SECURE")
|
||||||
|
region := os.Getenv("AWS_REGION")
|
||||||
|
root, err := ioutil.TempDir("", "driver-")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer os.Remove(root)
|
||||||
|
|
||||||
|
s3DriverConstructor = func(rootDirectory, storageClass string) (*Driver, error) {
|
||||||
|
encryptBool := false
|
||||||
|
if encrypt != "" {
|
||||||
|
encryptBool, err = strconv.ParseBool(encrypt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
secureBool := true
|
||||||
|
if secure != "" {
|
||||||
|
secureBool, err = strconv.ParseBool(secure)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
parameters := DriverParameters{
|
||||||
|
accessKey,
|
||||||
|
secretKey,
|
||||||
|
bucket,
|
||||||
|
region,
|
||||||
|
encryptBool,
|
||||||
|
secureBool,
|
||||||
|
minChunkSize,
|
||||||
|
rootDirectory,
|
||||||
|
storageClass,
|
||||||
|
driverName + "-test",
|
||||||
|
}
|
||||||
|
|
||||||
|
return New(parameters)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip S3 storage driver tests if environment variable parameters are not provided
|
||||||
|
skipS3 = func() string {
|
||||||
|
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
|
||||||
|
return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests"
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
|
||||||
|
return s3DriverConstructor(root, s3.StorageClassStandard)
|
||||||
|
}, skipS3)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEmptyRootList(t *testing.T) {
|
||||||
|
if skipS3() != "" {
|
||||||
|
t.Skip(skipS3())
|
||||||
|
}
|
||||||
|
|
||||||
|
validRoot, err := ioutil.TempDir("", "driver-")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating temporary directory: %v", err)
|
||||||
|
}
|
||||||
|
defer os.Remove(validRoot)
|
||||||
|
|
||||||
|
rootedDriver, err := s3DriverConstructor(validRoot, s3.StorageClassStandard)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating rooted driver: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
emptyRootDriver, err := s3DriverConstructor("", s3.StorageClassStandard)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating empty root driver: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
slashRootDriver, err := s3DriverConstructor("/", s3.StorageClassStandard)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating slash root driver: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
filename := "/test"
|
||||||
|
contents := []byte("contents")
|
||||||
|
ctx := context.Background()
|
||||||
|
err = rootedDriver.PutContent(ctx, filename, contents)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating content: %v", err)
|
||||||
|
}
|
||||||
|
defer rootedDriver.Delete(ctx, filename)
|
||||||
|
|
||||||
|
keys, err := emptyRootDriver.List(ctx, "/")
|
||||||
|
for _, path := range keys {
|
||||||
|
if !storagedriver.PathRegexp.MatchString(path) {
|
||||||
|
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
keys, err = slashRootDriver.List(ctx, "/")
|
||||||
|
for _, path := range keys {
|
||||||
|
if !storagedriver.PathRegexp.MatchString(path) {
|
||||||
|
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStorageClass(t *testing.T) {
|
||||||
|
if skipS3() != "" {
|
||||||
|
t.Skip(skipS3())
|
||||||
|
}
|
||||||
|
|
||||||
|
rootDir, err := ioutil.TempDir("", "driver-")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating temporary directory: %v", err)
|
||||||
|
}
|
||||||
|
defer os.Remove(rootDir)
|
||||||
|
|
||||||
|
standardDriver, err := s3DriverConstructor(rootDir, s3.StorageClassStandard)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating driver with standard storage: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rrDriver, err := s3DriverConstructor(rootDir, s3.StorageClassReducedRedundancy)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating driver with reduced redundancy storage: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
standardFilename := "/test-standard"
|
||||||
|
rrFilename := "/test-rr"
|
||||||
|
contents := []byte("contents")
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
err = standardDriver.PutContent(ctx, standardFilename, contents)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating content: %v", err)
|
||||||
|
}
|
||||||
|
defer standardDriver.Delete(ctx, standardFilename)
|
||||||
|
|
||||||
|
err = rrDriver.PutContent(ctx, rrFilename, contents)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating content: %v", err)
|
||||||
|
}
|
||||||
|
defer rrDriver.Delete(ctx, rrFilename)
|
||||||
|
|
||||||
|
standardDriverUnwrapped := standardDriver.Base.StorageDriver.(*driver)
|
||||||
|
resp, err := standardDriverUnwrapped.S3.GetObject(&s3.GetObjectInput{
|
||||||
|
Bucket: aws.String(standardDriverUnwrapped.Bucket),
|
||||||
|
Key: aws.String(standardDriverUnwrapped.s3Path(standardFilename)),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error retrieving standard storage file: %v", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
// Amazon only populates this header value for non-standard storage classes
|
||||||
|
if resp.StorageClass != nil {
|
||||||
|
t.Fatalf("unexpected storage class for standard file: %v", resp.StorageClass)
|
||||||
|
}
|
||||||
|
|
||||||
|
rrDriverUnwrapped := rrDriver.Base.StorageDriver.(*driver)
|
||||||
|
resp, err = rrDriverUnwrapped.S3.GetObject(&s3.GetObjectInput{
|
||||||
|
Bucket: aws.String(rrDriverUnwrapped.Bucket),
|
||||||
|
Key: aws.String(rrDriverUnwrapped.s3Path(standardFilename)),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error retrieving reduced-redundancy storage file: %v", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StorageClass == nil {
|
||||||
|
t.Fatalf("unexpected storage class for reduced-redundancy file: %v", s3.StorageClassStandard)
|
||||||
|
} else if *resp.StorageClass != s3.StorageClassReducedRedundancy {
|
||||||
|
t.Fatalf("unexpected storage class for reduced-redundancy file: %v", *resp.StorageClass)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -2,16 +2,14 @@
|
||||||
// store blobs in Amazon S3 cloud storage.
|
// store blobs in Amazon S3 cloud storage.
|
||||||
//
|
//
|
||||||
// This package leverages the docker/goamz client library for interfacing with
|
// This package leverages the docker/goamz client library for interfacing with
|
||||||
// s3.
|
// S3. It is intended to be deprecated in favor of the s3-aws driver
|
||||||
|
// implementation.
|
||||||
//
|
//
|
||||||
// Because s3 is a key, value store the Stat call does not support last modification
|
// Because S3 is a key, value store the Stat call does not support last modification
|
||||||
// time for directories (directories are an abstraction for key, value stores)
|
// time for directories (directories are an abstraction for key, value stores)
|
||||||
//
|
//
|
||||||
// Keep in mind that s3 guarantees only eventual consistency, so do not assume
|
// Keep in mind that S3 guarantees only read-after-write consistency for new
|
||||||
// that a successful write will mean immediate access to the data written (although
|
// objects, but no read-after-update or list-after-write consistency.
|
||||||
// in most regions a new object put has guaranteed read after write). The only true
|
|
||||||
// guarantee is that once you call Stat and receive a certain file size, that much of
|
|
||||||
// the file is already accessible.
|
|
||||||
package s3
|
package s3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -37,7 +35,7 @@ import (
|
||||||
"github.com/docker/distribution/registry/storage/driver/factory"
|
"github.com/docker/distribution/registry/storage/driver/factory"
|
||||||
)
|
)
|
||||||
|
|
||||||
const driverName = "s3"
|
const driverName = "s3goamz"
|
||||||
|
|
||||||
// minChunkSize defines the minimum multipart upload chunk size
|
// minChunkSize defines the minimum multipart upload chunk size
|
||||||
// S3 API requires multipart upload chunks to be at least 5MB
|
// S3 API requires multipart upload chunks to be at least 5MB
|
|
@ -71,7 +71,7 @@ func init() {
|
||||||
minChunkSize,
|
minChunkSize,
|
||||||
rootDirectory,
|
rootDirectory,
|
||||||
storageClass,
|
storageClass,
|
||||||
"",
|
driverName + "-test",
|
||||||
}
|
}
|
||||||
|
|
||||||
return New(parameters)
|
return New(parameters)
|
||||||
|
@ -196,6 +196,6 @@ func TestStorageClass(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
if storageClass := resp.Header.Get("x-amz-storage-class"); storageClass != string(s3.ReducedRedundancy) {
|
if storageClass := resp.Header.Get("x-amz-storage-class"); storageClass != string(s3.ReducedRedundancy) {
|
||||||
t.Fatalf("unexpected storage class for standard file: %v", storageClass)
|
t.Fatalf("unexpected storage class for reduced-redundancy file: %v", storageClass)
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in a new issue