From d5a38e4c5f23e794e30b986cbb3b13b8d0bf5b87 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Thu, 21 Jan 2016 18:17:53 -0800 Subject: [PATCH] Adds new s3 driver using aws-sdk-go instead of goamz Keeps old s3 driver, renames to s3goamz, registers new s3 driver as both "s3" and "s3aws" Changes cloudfront middleware to use aws-sdk-go Signed-off-by: Brian Bland --- .../middleware/cloudfront/middleware.go | 41 +- docs/storage/driver/s3-aws/s3.go | 966 ++++++++++++++++++ docs/storage/driver/s3-aws/s3_test.go | 201 ++++ docs/storage/driver/{s3 => s3-goamz}/s3.go | 14 +- .../driver/{s3 => s3-goamz}/s3_test.go | 4 +- 5 files changed, 1204 insertions(+), 22 deletions(-) create mode 100644 docs/storage/driver/s3-aws/s3.go create mode 100644 docs/storage/driver/s3-aws/s3_test.go rename docs/storage/driver/{s3 => s3-goamz}/s3.go (98%) rename docs/storage/driver/{s3 => s3-goamz}/s3_test.go (98%) diff --git a/docs/storage/driver/middleware/cloudfront/middleware.go b/docs/storage/driver/middleware/cloudfront/middleware.go index 56edda3a1..9162c09de 100644 --- a/docs/storage/driver/middleware/cloudfront/middleware.go +++ b/docs/storage/driver/middleware/cloudfront/middleware.go @@ -8,12 +8,14 @@ import ( "encoding/pem" "fmt" "io/ioutil" + "net/url" + "strings" "time" + "github.com/aws/aws-sdk-go/service/cloudfront/sign" "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" - "github.com/docker/goamz/cloudfront" ) // cloudFrontStorageMiddleware provides an simple implementation of layerHandler that @@ -21,8 +23,9 @@ import ( // then issues HTTP Temporary Redirects to this CloudFront content URL. type cloudFrontStorageMiddleware struct { storagedriver.StorageDriver - cloudfront *cloudfront.CloudFront - duration time.Duration + urlSigner *sign.URLSigner + baseURL string + duration time.Duration } var _ storagedriver.StorageDriver = &cloudFrontStorageMiddleware{} @@ -33,15 +36,24 @@ var _ storagedriver.StorageDriver = &cloudFrontStorageMiddleware{} func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) { base, ok := options["baseurl"] if !ok { - return nil, fmt.Errorf("No baseurl provided") + return nil, fmt.Errorf("no baseurl provided") } baseURL, ok := base.(string) if !ok { return nil, fmt.Errorf("baseurl must be a string") } + if !strings.Contains(baseURL, "://") { + baseURL = "https://" + baseURL + } + if !strings.HasSuffix(baseURL, "/") { + baseURL += "/" + } + if _, err := url.Parse(baseURL); err != nil { + return nil, fmt.Errorf("invalid baseurl: %v", err) + } pk, ok := options["privatekey"] if !ok { - return nil, fmt.Errorf("No privatekey provided") + return nil, fmt.Errorf("no privatekey provided") } pkPath, ok := pk.(string) if !ok { @@ -49,7 +61,7 @@ func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, o } kpid, ok := options["keypairid"] if !ok { - return nil, fmt.Errorf("No keypairid provided") + return nil, fmt.Errorf("no keypairid provided") } keypairID, ok := kpid.(string) if !ok { @@ -58,19 +70,19 @@ func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, o pkBytes, err := ioutil.ReadFile(pkPath) if err != nil { - return nil, fmt.Errorf("Failed to read privatekey file: %s", err) + return nil, fmt.Errorf("failed to read privatekey file: %s", err) } block, _ := pem.Decode([]byte(pkBytes)) if block == nil { - return nil, fmt.Errorf("Failed to decode private key as an rsa private key") + return nil, fmt.Errorf("failed to decode private key as an rsa private key") } privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes) if err != nil { return nil, err } - cf := cloudfront.New(baseURL, privateKey, keypairID) + urlSigner := sign.NewURLSigner(keypairID, privateKey) duration := 20 * time.Minute d, ok := options["duration"] @@ -81,13 +93,18 @@ func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, o case string: dur, err := time.ParseDuration(d) if err != nil { - return nil, fmt.Errorf("Invalid duration: %s", err) + return nil, fmt.Errorf("invalid duration: %s", err) } duration = dur } } - return &cloudFrontStorageMiddleware{StorageDriver: storageDriver, cloudfront: cf, duration: duration}, nil + return &cloudFrontStorageMiddleware{ + StorageDriver: storageDriver, + urlSigner: urlSigner, + baseURL: baseURL, + duration: duration, + }, nil } // S3BucketKeyer is any type that is capable of returning the S3 bucket key @@ -106,7 +123,7 @@ func (lh *cloudFrontStorageMiddleware) URLFor(ctx context.Context, path string, return lh.StorageDriver.URLFor(ctx, path, options) } - cfURL, err := lh.cloudfront.CannedSignedURL(keyer.S3BucketKey(path), "", time.Now().Add(lh.duration)) + cfURL, err := lh.urlSigner.Sign(lh.baseURL+keyer.S3BucketKey(path), time.Now().Add(lh.duration)) if err != nil { return "", err } diff --git a/docs/storage/driver/s3-aws/s3.go b/docs/storage/driver/s3-aws/s3.go new file mode 100644 index 000000000..af62d3f07 --- /dev/null +++ b/docs/storage/driver/s3-aws/s3.go @@ -0,0 +1,966 @@ +// Package s3 provides a storagedriver.StorageDriver implementation to +// store blobs in Amazon S3 cloud storage. +// +// This package leverages the official aws client library for interfacing with +// S3. +// +// Because S3 is a key, value store the Stat call does not support last modification +// time for directories (directories are an abstraction for key, value stores) +// +// Keep in mind that S3 guarantees only read-after-write consistency for new +// objects, but no read-after-update or list-after-write consistency. +package s3 + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/Sirupsen/logrus" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + + "github.com/docker/distribution/context" + "github.com/docker/distribution/registry/client/transport" + storagedriver "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/base" + "github.com/docker/distribution/registry/storage/driver/factory" +) + +const driverName = "s3aws" + +// minChunkSize defines the minimum multipart upload chunk size +// S3 API requires multipart upload chunks to be at least 5MB +const minChunkSize = 5 << 20 + +const defaultChunkSize = 2 * minChunkSize + +// listMax is the largest amount of objects you can request from S3 in a list call +const listMax = 1000 + +// validRegions maps known s3 region identifiers to region descriptors +var validRegions = map[string]struct{}{} + +//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set +type DriverParameters struct { + AccessKey string + SecretKey string + Bucket string + Region string + Encrypt bool + Secure bool + ChunkSize int64 + RootDirectory string + StorageClass string + UserAgent string +} + +func init() { + for _, region := range []string{ + "us-east-1", + "us-west-1", + "us-west-2", + "eu-west-1", + "eu-central-1", + "ap-southeast-1", + "ap-southeast-2", + "ap-northeast-1", + "ap-northeast-2", + "sa-east-1", + } { + validRegions[region] = struct{}{} + } + + // Register this as the default s3 driver in addition to s3aws + factory.Register("s3", &s3DriverFactory{}) + factory.Register(driverName, &s3DriverFactory{}) +} + +// s3DriverFactory implements the factory.StorageDriverFactory interface +type s3DriverFactory struct{} + +func (factory *s3DriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { + return FromParameters(parameters) +} + +type driver struct { + S3 *s3.S3 + Bucket string + ChunkSize int64 + Encrypt bool + RootDirectory string + StorageClass string + + pool sync.Pool // pool []byte buffers used for WriteStream + zeros []byte // shared, zero-valued buffer used for WriteStream +} + +type baseEmbed struct { + base.Base +} + +// Driver is a storagedriver.StorageDriver implementation backed by Amazon S3 +// Objects are stored at absolute keys in the provided bucket. +type Driver struct { + baseEmbed +} + +// FromParameters constructs a new Driver with a given parameters map +// Required parameters: +// - accesskey +// - secretkey +// - region +// - bucket +// - encrypt +func FromParameters(parameters map[string]interface{}) (*Driver, error) { + // Providing no values for these is valid in case the user is authenticating + // with an IAM on an ec2 instance (in which case the instance credentials will + // be summoned when GetAuth is called) + accessKey, ok := parameters["accesskey"] + if !ok { + accessKey = "" + } + secretKey, ok := parameters["secretkey"] + if !ok { + secretKey = "" + } + + regionName, ok := parameters["region"] + if !ok || fmt.Sprint(regionName) == "" { + return nil, fmt.Errorf("No region parameter provided") + } + region := fmt.Sprint(regionName) + _, ok = validRegions[region] + if !ok { + return nil, fmt.Errorf("Invalid region provided: %v", region) + } + + bucket, ok := parameters["bucket"] + if !ok || fmt.Sprint(bucket) == "" { + return nil, fmt.Errorf("No bucket parameter provided") + } + + encryptBool := false + encrypt, ok := parameters["encrypt"] + if ok { + encryptBool, ok = encrypt.(bool) + if !ok { + return nil, fmt.Errorf("The encrypt parameter should be a boolean") + } + } + + secureBool := true + secure, ok := parameters["secure"] + if ok { + secureBool, ok = secure.(bool) + if !ok { + return nil, fmt.Errorf("The secure parameter should be a boolean") + } + } + + chunkSize := int64(defaultChunkSize) + chunkSizeParam, ok := parameters["chunksize"] + if ok { + switch v := chunkSizeParam.(type) { + case string: + vv, err := strconv.ParseInt(v, 0, 64) + if err != nil { + return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam) + } + chunkSize = vv + case int64: + chunkSize = v + case int, uint, int32, uint32, uint64: + chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int() + default: + return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam) + } + + if chunkSize < minChunkSize { + return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize) + } + } + + rootDirectory, ok := parameters["rootdirectory"] + if !ok { + rootDirectory = "" + } + + storageClass := s3.StorageClassStandard + storageClassParam, ok := parameters["storageclass"] + if ok { + storageClassString, ok := storageClassParam.(string) + if !ok { + return nil, fmt.Errorf("The storageclass parameter must be one of %v, %v invalid", []string{s3.StorageClassStandard, s3.StorageClassReducedRedundancy}, storageClassParam) + } + // All valid storage class parameters are UPPERCASE, so be a bit more flexible here + storageClassString = strings.ToUpper(storageClassString) + if storageClassString != s3.StorageClassStandard && storageClassString != s3.StorageClassReducedRedundancy { + return nil, fmt.Errorf("The storageclass parameter must be one of %v, %v invalid", []string{s3.StorageClassStandard, s3.StorageClassReducedRedundancy}, storageClassParam) + } + storageClass = storageClassString + } + + userAgent, ok := parameters["useragent"] + if !ok { + userAgent = "" + } + + params := DriverParameters{ + fmt.Sprint(accessKey), + fmt.Sprint(secretKey), + fmt.Sprint(bucket), + region, + encryptBool, + secureBool, + chunkSize, + fmt.Sprint(rootDirectory), + storageClass, + fmt.Sprint(userAgent), + } + + return New(params) +} + +// New constructs a new Driver with the given AWS credentials, region, encryption flag, and +// bucketName +func New(params DriverParameters) (*Driver, error) { + awsConfig := aws.NewConfig() + creds := credentials.NewChainCredentials([]credentials.Provider{ + &credentials.StaticProvider{ + Value: credentials.Value{ + AccessKeyID: params.AccessKey, + SecretAccessKey: params.SecretKey, + }, + }, + &credentials.EnvProvider{}, + &credentials.SharedCredentialsProvider{}, + &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, + }) + + awsConfig.WithCredentials(creds) + awsConfig.WithRegion(params.Region) + awsConfig.WithDisableSSL(!params.Secure) + // awsConfig.WithMaxRetries(10) + + if params.UserAgent != "" { + awsConfig.WithHTTPClient(&http.Client{ + Transport: transport.NewTransport(http.DefaultTransport, transport.NewHeaderRequestModifier(http.Header{http.CanonicalHeaderKey("User-Agent"): []string{params.UserAgent}})), + }) + } + + s3obj := s3.New(session.New(awsConfig)) + + // TODO Currently multipart uploads have no timestamps, so this would be unwise + // if you initiated a new s3driver while another one is running on the same bucket. + // multis, _, err := bucket.ListMulti("", "") + // if err != nil { + // return nil, err + // } + + // for _, multi := range multis { + // err := multi.Abort() + // //TODO appropriate to do this error checking? + // if err != nil { + // return nil, err + // } + // } + + d := &driver{ + S3: s3obj, + Bucket: params.Bucket, + ChunkSize: params.ChunkSize, + Encrypt: params.Encrypt, + RootDirectory: params.RootDirectory, + StorageClass: params.StorageClass, + zeros: make([]byte, params.ChunkSize), + } + + d.pool.New = func() interface{} { + return make([]byte, d.ChunkSize) + } + + return &Driver{ + baseEmbed: baseEmbed{ + Base: base.Base{ + StorageDriver: d, + }, + }, + }, nil +} + +// Implement the storagedriver.StorageDriver interface + +func (d *driver) Name() string { + return driverName +} + +// GetContent retrieves the content stored at "path" as a []byte. +func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { + reader, err := d.ReadStream(ctx, path, 0) + if err != nil { + return nil, err + } + return ioutil.ReadAll(reader) +} + +// PutContent stores the []byte content at a location designated by "path". +func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { + _, err := d.S3.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + ContentType: d.getContentType(), + ACL: d.getACL(), + ServerSideEncryption: d.getEncryptionMode(), + StorageClass: d.getStorageClass(), + Body: bytes.NewReader(contents), + }) + return parseError(path, err) +} + +// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a +// given byte offset. +func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { + resp, err := d.S3.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + Range: aws.String("bytes=" + strconv.FormatInt(offset, 10) + "-"), + }) + + if err != nil { + if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "InvalidRange" { + return ioutil.NopCloser(bytes.NewReader(nil)), nil + } + + return nil, parseError(path, err) + } + return resp.Body, nil +} + +// WriteStream stores the contents of the provided io.Reader at a +// location designated by the given path. The driver will know it has +// received the full contents when the reader returns io.EOF. The number +// of successfully READ bytes will be returned, even if an error is +// returned. May be used to resume writing a stream by providing a nonzero +// offset. Offsets past the current size will write from the position +// beyond the end of the file. +func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { + var partNumber int64 = 1 + bytesRead := 0 + var putErrChan chan error + parts := []*s3.CompletedPart{} + done := make(chan struct{}) // stopgap to free up waiting goroutines + + resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + ContentType: d.getContentType(), + ACL: d.getACL(), + ServerSideEncryption: d.getEncryptionMode(), + StorageClass: d.getStorageClass(), + }) + if err != nil { + return 0, err + } + + uploadID := resp.UploadId + + buf := d.getbuf() + + // We never want to leave a dangling multipart upload, our only consistent state is + // when there is a whole object at path. This is in order to remain consistent with + // the stat call. + // + // Note that if the machine dies before executing the defer, we will be left with a dangling + // multipart upload, which will eventually be cleaned up, but we will lose all of the progress + // made prior to the machine crashing. + defer func() { + if putErrChan != nil { + if putErr := <-putErrChan; putErr != nil { + err = putErr + } + } + + if len(parts) > 0 { + _, err := d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + UploadId: uploadID, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: parts, + }, + }) + if err != nil { + // TODO (brianbland): log errors here + d.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + UploadId: uploadID, + }) + } + } + + d.putbuf(buf) // needs to be here to pick up new buf value + close(done) // free up any waiting goroutines + }() + + // Fills from 0 to total from current + fromSmallCurrent := func(total int64) error { + current, err := d.ReadStream(ctx, path, 0) + if err != nil { + return err + } + + bytesRead = 0 + for int64(bytesRead) < total { + //The loop should very rarely enter a second iteration + nn, err := current.Read(buf[bytesRead:total]) + bytesRead += nn + if err != nil { + if err != io.EOF { + return err + } + + break + } + + } + return nil + } + + // Fills from parameter to chunkSize from reader + fromReader := func(from int64) error { + bytesRead = 0 + for from+int64(bytesRead) < d.ChunkSize { + nn, err := reader.Read(buf[from+int64(bytesRead):]) + totalRead += int64(nn) + bytesRead += nn + + if err != nil { + if err != io.EOF { + return err + } + + break + } + } + + if putErrChan == nil { + putErrChan = make(chan error) + } else { + if putErr := <-putErrChan; putErr != nil { + putErrChan = nil + return putErr + } + } + + go func(bytesRead int, from int64, buf []byte) { + defer d.putbuf(buf) // this buffer gets dropped after this call + + // DRAGONS(stevvooe): There are few things one might want to know + // about this section. First, the putErrChan is expecting an error + // and a nil or just a nil to come through the channel. This is + // covered by the silly defer below. The other aspect is the s3 + // retry backoff to deal with RequestTimeout errors. Even though + // the underlying s3 library should handle it, it doesn't seem to + // be part of the shouldRetry function (see AdRoll/goamz/s3). + defer func() { + select { + case putErrChan <- nil: // for some reason, we do this no matter what. + case <-done: + return // ensure we don't leak the goroutine + } + }() + + if bytesRead <= 0 { + return + } + + resp, err := d.S3.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + PartNumber: aws.Int64(partNumber), + UploadId: uploadID, + Body: bytes.NewReader(buf[0 : int64(bytesRead)+from]), + }) + if err != nil { + logrus.Errorf("error putting part, aborting: %v", err) + select { + case putErrChan <- err: + case <-done: + return // don't leak the goroutine + } + } + + // parts and partNumber are safe, because this function is the + // only one modifying them and we force it to be executed + // serially. + parts = append(parts, &s3.CompletedPart{ + ETag: resp.ETag, + PartNumber: aws.Int64(partNumber), + }) + partNumber++ + }(bytesRead, from, buf) + + buf = d.getbuf() // use a new buffer for the next call + return nil + } + + if offset > 0 { + resp, err := d.S3.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + }) + if err != nil { + if s3Err, ok := err.(awserr.Error); !ok || s3Err.Code() != "NoSuchKey" { + return 0, err + } + } + + currentLength := int64(0) + if err == nil && resp.ContentLength != nil { + currentLength = *resp.ContentLength + } + + if currentLength >= offset { + if offset < d.ChunkSize { + // chunkSize > currentLength >= offset + if err = fromSmallCurrent(offset); err != nil { + return totalRead, err + } + + if err = fromReader(offset); err != nil { + return totalRead, err + } + + if totalRead+offset < d.ChunkSize { + return totalRead, nil + } + } else { + // currentLength >= offset >= chunkSize + resp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + PartNumber: aws.Int64(partNumber), + UploadId: uploadID, + CopySource: aws.String(d.Bucket + "/" + d.s3Path(path)), + CopySourceRange: aws.String("bytes=0-" + strconv.FormatInt(offset-1, 10)), + }) + if err != nil { + return 0, err + } + + parts = append(parts, &s3.CompletedPart{ + ETag: resp.CopyPartResult.ETag, + PartNumber: aws.Int64(partNumber), + }) + partNumber++ + } + } else { + // Fills between parameters with 0s but only when to - from <= chunkSize + fromZeroFillSmall := func(from, to int64) error { + bytesRead = 0 + for from+int64(bytesRead) < to { + nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to]) + bytesRead += nn + if err != nil { + return err + } + } + + return nil + } + + // Fills between parameters with 0s, making new parts + fromZeroFillLarge := func(from, to int64) error { + bytesRead64 := int64(0) + for to-(from+bytesRead64) >= d.ChunkSize { + resp, err := d.S3.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + PartNumber: aws.Int64(partNumber), + UploadId: uploadID, + Body: bytes.NewReader(d.zeros), + }) + if err != nil { + return err + } + bytesRead64 += d.ChunkSize + + parts = append(parts, &s3.CompletedPart{ + ETag: resp.ETag, + PartNumber: aws.Int64(partNumber), + }) + partNumber++ + } + + return fromZeroFillSmall(0, (to-from)%d.ChunkSize) + } + + // currentLength < offset + if currentLength < d.ChunkSize { + if offset < d.ChunkSize { + // chunkSize > offset > currentLength + if err = fromSmallCurrent(currentLength); err != nil { + return totalRead, err + } + + if err = fromZeroFillSmall(currentLength, offset); err != nil { + return totalRead, err + } + + if err = fromReader(offset); err != nil { + return totalRead, err + } + + if totalRead+offset < d.ChunkSize { + return totalRead, nil + } + } else { + // offset >= chunkSize > currentLength + if err = fromSmallCurrent(currentLength); err != nil { + return totalRead, err + } + + if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil { + return totalRead, err + } + + resp, err := d.S3.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + PartNumber: aws.Int64(partNumber), + UploadId: uploadID, + Body: bytes.NewReader(buf), + }) + if err != nil { + return totalRead, err + } + + parts = append(parts, &s3.CompletedPart{ + ETag: resp.ETag, + PartNumber: aws.Int64(partNumber), + }) + partNumber++ + + //Zero fill from chunkSize up to offset, then some reader + if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil { + return totalRead, err + } + + if err = fromReader(offset % d.ChunkSize); err != nil { + return totalRead, err + } + + if totalRead+(offset%d.ChunkSize) < d.ChunkSize { + return totalRead, nil + } + } + } else { + // offset > currentLength >= chunkSize + resp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + PartNumber: aws.Int64(partNumber), + UploadId: uploadID, + CopySource: aws.String(d.Bucket + "/" + d.s3Path(path)), + }) + if err != nil { + return 0, err + } + + parts = append(parts, &s3.CompletedPart{ + ETag: resp.CopyPartResult.ETag, + PartNumber: aws.Int64(partNumber), + }) + partNumber++ + + //Zero fill from currentLength up to offset, then some reader + if err = fromZeroFillLarge(currentLength, offset); err != nil { + return totalRead, err + } + + if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil { + return totalRead, err + } + + if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize { + return totalRead, nil + } + } + + } + } + + for { + if err = fromReader(0); err != nil { + return totalRead, err + } + + if int64(bytesRead) < d.ChunkSize { + break + } + } + + return totalRead, nil +} + +// Stat retrieves the FileInfo for the given path, including the current size +// in bytes and the creation time. +func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { + resp, err := d.S3.ListObjects(&s3.ListObjectsInput{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(d.s3Path(path)), + MaxKeys: aws.Int64(1), + }) + if err != nil { + return nil, err + } + + fi := storagedriver.FileInfoFields{ + Path: path, + } + + if len(resp.Contents) == 1 { + if *resp.Contents[0].Key != d.s3Path(path) { + fi.IsDir = true + } else { + fi.IsDir = false + fi.Size = *resp.Contents[0].Size + fi.ModTime = *resp.Contents[0].LastModified + } + } else if len(resp.CommonPrefixes) == 1 { + fi.IsDir = true + } else { + return nil, storagedriver.PathNotFoundError{Path: path} + } + + return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil +} + +// List returns a list of the objects that are direct descendants of the given path. +func (d *driver) List(ctx context.Context, opath string) ([]string, error) { + path := opath + if path != "/" && path[len(path)-1] != '/' { + path = path + "/" + } + + // This is to cover for the cases when the rootDirectory of the driver is either "" or "/". + // In those cases, there is no root prefix to replace and we must actually add a "/" to all + // results in order to keep them as valid paths as recognized by storagedriver.PathRegexp + prefix := "" + if d.s3Path("") == "" { + prefix = "/" + } + + resp, err := d.S3.ListObjects(&s3.ListObjectsInput{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(d.s3Path(path)), + Delimiter: aws.String("/"), + MaxKeys: aws.Int64(listMax), + }) + if err != nil { + return nil, parseError(opath, err) + } + + files := []string{} + directories := []string{} + + for { + for _, key := range resp.Contents { + files = append(files, strings.Replace(*key.Key, d.s3Path(""), prefix, 1)) + } + + for _, commonPrefix := range resp.CommonPrefixes { + commonPrefix := *commonPrefix.Prefix + directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.s3Path(""), prefix, 1)) + } + + if *resp.IsTruncated { + resp, err = d.S3.ListObjects(&s3.ListObjectsInput{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(d.s3Path(path)), + Delimiter: aws.String("/"), + MaxKeys: aws.Int64(listMax), + Marker: resp.NextMarker, + }) + if err != nil { + return nil, err + } + } else { + break + } + } + + if opath != "/" { + if len(files) == 0 && len(directories) == 0 { + // Treat empty response as missing directory, since we don't actually + // have directories in s3. + return nil, storagedriver.PathNotFoundError{Path: opath} + } + } + + return append(files, directories...), nil +} + +// Move moves an object stored at sourcePath to destPath, removing the original +// object. +func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { + /* This is terrible, but aws doesn't have an actual move. */ + _, err := d.S3.CopyObject(&s3.CopyObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(destPath)), + ContentType: d.getContentType(), + ACL: d.getACL(), + ServerSideEncryption: d.getEncryptionMode(), + StorageClass: d.getStorageClass(), + CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)), + }) + if err != nil { + return parseError(sourcePath, err) + } + + return d.Delete(ctx, sourcePath) +} + +// Delete recursively deletes all objects stored at "path" and its subpaths. +func (d *driver) Delete(ctx context.Context, path string) error { + resp, err := d.S3.ListObjects(&s3.ListObjectsInput{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(d.s3Path(path)), + }) + if err != nil || len(resp.Contents) == 0 { + return storagedriver.PathNotFoundError{Path: path} + } + + s3Objects := make([]*s3.ObjectIdentifier, 0, listMax) + + for len(resp.Contents) > 0 { + for _, key := range resp.Contents { + s3Objects = append(s3Objects, &s3.ObjectIdentifier{ + Key: key.Key, + }) + } + + _, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(d.Bucket), + Delete: &s3.Delete{ + Objects: s3Objects, + Quiet: aws.Bool(false), + }, + }) + if err != nil { + return nil + } + + resp, err = d.S3.ListObjects(&s3.ListObjectsInput{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(d.s3Path(path)), + }) + if err != nil { + return err + } + } + + return nil +} + +// URLFor returns a URL which may be used to retrieve the content stored at the given path. +// May return an UnsupportedMethodErr in certain StorageDriver implementations. +func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { + methodString := "GET" + method, ok := options["method"] + if ok { + methodString, ok = method.(string) + if !ok || (methodString != "GET" && methodString != "HEAD") { + return "", storagedriver.ErrUnsupportedMethod{} + } + } + + expiresIn := 20 * time.Minute + expires, ok := options["expiry"] + if ok { + et, ok := expires.(time.Time) + if ok { + expiresIn = et.Sub(time.Now()) + } + } + + var req *request.Request + + switch methodString { + case "GET": + req, _ = d.S3.GetObjectRequest(&s3.GetObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + }) + case "HEAD": + req, _ = d.S3.HeadObjectRequest(&s3.HeadObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(d.s3Path(path)), + }) + default: + panic("unreachable") + } + + return req.Presign(expiresIn) +} + +func (d *driver) s3Path(path string) string { + return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/") +} + +// S3BucketKey returns the s3 bucket key for the given storage driver path. +func (d *Driver) S3BucketKey(path string) string { + return d.StorageDriver.(*driver).s3Path(path) +} + +func parseError(path string, err error) error { + if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "NoSuchKey" { + return storagedriver.PathNotFoundError{Path: path} + } + + return err +} + +func (d *driver) getEncryptionMode() *string { + if d.Encrypt { + return aws.String("AES256") + } + return nil +} + +func (d *driver) getContentType() *string { + return aws.String("application/octet-stream") +} + +func (d *driver) getACL() *string { + return aws.String("private") +} + +func (d *driver) getStorageClass() *string { + return aws.String(d.StorageClass) +} + +// getbuf returns a buffer from the driver's pool with length d.ChunkSize. +func (d *driver) getbuf() []byte { + return d.pool.Get().([]byte) +} + +func (d *driver) putbuf(p []byte) { + copy(p, d.zeros) + d.pool.Put(p) +} diff --git a/docs/storage/driver/s3-aws/s3_test.go b/docs/storage/driver/s3-aws/s3_test.go new file mode 100644 index 000000000..6816e6509 --- /dev/null +++ b/docs/storage/driver/s3-aws/s3_test.go @@ -0,0 +1,201 @@ +package s3 + +import ( + "io/ioutil" + "os" + "strconv" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + + "github.com/docker/distribution/context" + storagedriver "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/testsuites" + + "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { check.TestingT(t) } + +var s3DriverConstructor func(rootDirectory, storageClass string) (*Driver, error) +var skipS3 func() string + +func init() { + accessKey := os.Getenv("AWS_ACCESS_KEY") + secretKey := os.Getenv("AWS_SECRET_KEY") + bucket := os.Getenv("S3_BUCKET") + encrypt := os.Getenv("S3_ENCRYPT") + secure := os.Getenv("S3_SECURE") + region := os.Getenv("AWS_REGION") + root, err := ioutil.TempDir("", "driver-") + if err != nil { + panic(err) + } + defer os.Remove(root) + + s3DriverConstructor = func(rootDirectory, storageClass string) (*Driver, error) { + encryptBool := false + if encrypt != "" { + encryptBool, err = strconv.ParseBool(encrypt) + if err != nil { + return nil, err + } + } + + secureBool := true + if secure != "" { + secureBool, err = strconv.ParseBool(secure) + if err != nil { + return nil, err + } + } + + parameters := DriverParameters{ + accessKey, + secretKey, + bucket, + region, + encryptBool, + secureBool, + minChunkSize, + rootDirectory, + storageClass, + driverName + "-test", + } + + return New(parameters) + } + + // Skip S3 storage driver tests if environment variable parameters are not provided + skipS3 = func() string { + if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" { + return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests" + } + return "" + } + + testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) { + return s3DriverConstructor(root, s3.StorageClassStandard) + }, skipS3) +} + +func TestEmptyRootList(t *testing.T) { + if skipS3() != "" { + t.Skip(skipS3()) + } + + validRoot, err := ioutil.TempDir("", "driver-") + if err != nil { + t.Fatalf("unexpected error creating temporary directory: %v", err) + } + defer os.Remove(validRoot) + + rootedDriver, err := s3DriverConstructor(validRoot, s3.StorageClassStandard) + if err != nil { + t.Fatalf("unexpected error creating rooted driver: %v", err) + } + + emptyRootDriver, err := s3DriverConstructor("", s3.StorageClassStandard) + if err != nil { + t.Fatalf("unexpected error creating empty root driver: %v", err) + } + + slashRootDriver, err := s3DriverConstructor("/", s3.StorageClassStandard) + if err != nil { + t.Fatalf("unexpected error creating slash root driver: %v", err) + } + + filename := "/test" + contents := []byte("contents") + ctx := context.Background() + err = rootedDriver.PutContent(ctx, filename, contents) + if err != nil { + t.Fatalf("unexpected error creating content: %v", err) + } + defer rootedDriver.Delete(ctx, filename) + + keys, err := emptyRootDriver.List(ctx, "/") + for _, path := range keys { + if !storagedriver.PathRegexp.MatchString(path) { + t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp) + } + } + + keys, err = slashRootDriver.List(ctx, "/") + for _, path := range keys { + if !storagedriver.PathRegexp.MatchString(path) { + t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp) + } + } +} + +func TestStorageClass(t *testing.T) { + if skipS3() != "" { + t.Skip(skipS3()) + } + + rootDir, err := ioutil.TempDir("", "driver-") + if err != nil { + t.Fatalf("unexpected error creating temporary directory: %v", err) + } + defer os.Remove(rootDir) + + standardDriver, err := s3DriverConstructor(rootDir, s3.StorageClassStandard) + if err != nil { + t.Fatalf("unexpected error creating driver with standard storage: %v", err) + } + + rrDriver, err := s3DriverConstructor(rootDir, s3.StorageClassReducedRedundancy) + if err != nil { + t.Fatalf("unexpected error creating driver with reduced redundancy storage: %v", err) + } + + standardFilename := "/test-standard" + rrFilename := "/test-rr" + contents := []byte("contents") + ctx := context.Background() + + err = standardDriver.PutContent(ctx, standardFilename, contents) + if err != nil { + t.Fatalf("unexpected error creating content: %v", err) + } + defer standardDriver.Delete(ctx, standardFilename) + + err = rrDriver.PutContent(ctx, rrFilename, contents) + if err != nil { + t.Fatalf("unexpected error creating content: %v", err) + } + defer rrDriver.Delete(ctx, rrFilename) + + standardDriverUnwrapped := standardDriver.Base.StorageDriver.(*driver) + resp, err := standardDriverUnwrapped.S3.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(standardDriverUnwrapped.Bucket), + Key: aws.String(standardDriverUnwrapped.s3Path(standardFilename)), + }) + if err != nil { + t.Fatalf("unexpected error retrieving standard storage file: %v", err) + } + defer resp.Body.Close() + // Amazon only populates this header value for non-standard storage classes + if resp.StorageClass != nil { + t.Fatalf("unexpected storage class for standard file: %v", resp.StorageClass) + } + + rrDriverUnwrapped := rrDriver.Base.StorageDriver.(*driver) + resp, err = rrDriverUnwrapped.S3.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(rrDriverUnwrapped.Bucket), + Key: aws.String(rrDriverUnwrapped.s3Path(standardFilename)), + }) + if err != nil { + t.Fatalf("unexpected error retrieving reduced-redundancy storage file: %v", err) + } + defer resp.Body.Close() + if resp.StorageClass == nil { + t.Fatalf("unexpected storage class for reduced-redundancy file: %v", s3.StorageClassStandard) + } else if *resp.StorageClass != s3.StorageClassReducedRedundancy { + t.Fatalf("unexpected storage class for reduced-redundancy file: %v", *resp.StorageClass) + } + +} diff --git a/docs/storage/driver/s3/s3.go b/docs/storage/driver/s3-goamz/s3.go similarity index 98% rename from docs/storage/driver/s3/s3.go rename to docs/storage/driver/s3-goamz/s3.go index a1f4c57da..9208965b3 100644 --- a/docs/storage/driver/s3/s3.go +++ b/docs/storage/driver/s3-goamz/s3.go @@ -2,16 +2,14 @@ // store blobs in Amazon S3 cloud storage. // // This package leverages the docker/goamz client library for interfacing with -// s3. +// S3. It is intended to be deprecated in favor of the s3-aws driver +// implementation. // -// Because s3 is a key, value store the Stat call does not support last modification +// Because S3 is a key, value store the Stat call does not support last modification // time for directories (directories are an abstraction for key, value stores) // -// Keep in mind that s3 guarantees only eventual consistency, so do not assume -// that a successful write will mean immediate access to the data written (although -// in most regions a new object put has guaranteed read after write). The only true -// guarantee is that once you call Stat and receive a certain file size, that much of -// the file is already accessible. +// Keep in mind that S3 guarantees only read-after-write consistency for new +// objects, but no read-after-update or list-after-write consistency. package s3 import ( @@ -37,7 +35,7 @@ import ( "github.com/docker/distribution/registry/storage/driver/factory" ) -const driverName = "s3" +const driverName = "s3goamz" // minChunkSize defines the minimum multipart upload chunk size // S3 API requires multipart upload chunks to be at least 5MB diff --git a/docs/storage/driver/s3/s3_test.go b/docs/storage/driver/s3-goamz/s3_test.go similarity index 98% rename from docs/storage/driver/s3/s3_test.go rename to docs/storage/driver/s3-goamz/s3_test.go index 660d5350b..352ec3f5c 100644 --- a/docs/storage/driver/s3/s3_test.go +++ b/docs/storage/driver/s3-goamz/s3_test.go @@ -71,7 +71,7 @@ func init() { minChunkSize, rootDirectory, storageClass, - "", + driverName + "-test", } return New(parameters) @@ -196,6 +196,6 @@ func TestStorageClass(t *testing.T) { } defer resp.Body.Close() if storageClass := resp.Header.Get("x-amz-storage-class"); storageClass != string(s3.ReducedRedundancy) { - t.Fatalf("unexpected storage class for standard file: %v", storageClass) + t.Fatalf("unexpected storage class for reduced-redundancy file: %v", storageClass) } }