From 0da6f242217aae7aa95b07773e31422a38af8e7a Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 23 Dec 2014 12:09:02 +0000 Subject: [PATCH] s3: use official github.com/aws/aws-sdk-go including multipart upload - fixes #101 --- s3/s3.go | 354 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 227 insertions(+), 127 deletions(-) diff --git a/s3/s3.go b/s3/s3.go index f69142354..7b02a2e58 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -3,19 +3,36 @@ package s3 // FIXME need to prevent anything but ListDir working for s3:// +/* +Progress of port to aws-sdk + + * Don't really need o.meta at all? + +What happens if you CTRL-C a multipart upload + * get an incomplete upload + * disappears when you delete the bucket + +Doesn't support v2 signing so can't interface with Ceph + * http://tracker.ceph.com/issues/10333 + * https://github.com/aws/aws-sdk-go/issues/291 + +*/ + import ( "errors" "fmt" "io" - "net/http" + "log" "path" "regexp" - "strconv" "strings" "time" - "github.com/ncw/goamz/aws" - "github.com/ncw/goamz/s3" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/ncw/rclone/fs" "github.com/ncw/swift" ) @@ -33,39 +50,48 @@ func init() { Name: "secret_access_key", Help: "AWS Secret Access Key (password). ", }, { - Name: "endpoint", - Help: "Endpoint for S3 API.", + Name: "region", + Help: "Region to connect to.", Examples: []fs.OptionExample{{ - Value: "https://s3.amazonaws.com/", + Value: "us-east-1", Help: "The default endpoint - a good choice if you are unsure.\nUS Region, Northern Virginia or Pacific Northwest.\nLeave location constraint empty.", }, { - Value: "https://s3-external-1.amazonaws.com", - Help: "US Region, Northern Virginia only.\nLeave location constraint empty.", - }, { - Value: "https://s3-us-west-2.amazonaws.com", + Value: "us-west-2", Help: "US West (Oregon) Region\nNeeds location constraint us-west-2.", }, { - Value: "https://s3-us-west-1.amazonaws.com", + Value: "us-west-1", Help: "US West (Northern California) Region\nNeeds location constraint us-west-1.", }, { - Value: "https://s3-eu-west-1.amazonaws.com", + Value: "eu-west-1", Help: "EU (Ireland) Region Region\nNeeds location constraint EU or eu-west-1.", }, { - Value: "https://s3-ap-southeast-1.amazonaws.com", + Value: "eu-central-1", + Help: "EU (Frankfurt) Region\nNeeds location constraint eu-central-1.", + }, { + Value: "ap-southeast-1", Help: "Asia Pacific (Singapore) Region\nNeeds location constraint ap-southeast-1.", }, { - Value: "https://s3-ap-southeast-2.amazonaws.com", - Help: "Asia Pacific (Sydney) Region\nNeeds location constraint .", + Value: "ap-southeast-2", + Help: "Asia Pacific (Sydney) Region\nNeeds location constraint ap-southeast-2.", }, { - Value: "https://s3-ap-northeast-1.amazonaws.com", + Value: "ap-northeast-1", Help: "Asia Pacific (Tokyo) Region\nNeeds location constraint ap-northeast-1.", }, { - Value: "https://s3-sa-east-1.amazonaws.com", + Value: "sa-east-1", Help: "South America (Sao Paulo) Region\nNeeds location constraint sa-east-1.", + }, { + Value: "other-v2-signature", + Help: "If using an S3 clone that only understands v2 signatures - eg Ceph - set this and make sure you set the endpoint.", + }, { + Value: "other-v4-signature", + Help: "If using an S3 clone that understands v4 signatures set this and make sure you set the endpoint.", }}, + }, { + Name: "endpoint", + Help: "Endpoint for S3 API.\nLeave blank if using AWS to use the default endpoint for the region.\nSpecify if using an S3 clone such as Ceph.", }, { Name: "location_constraint", - Help: "Location constraint - must be set to match the Endpoint.", + Help: "Location constraint - must be set to match the Region. Used when creating buckets only.", Examples: []fs.OptionExample{{ Value: "", Help: "Empty for US Region, Northern Virginia or Pacific Northwest.", @@ -100,17 +126,18 @@ func init() { // Constants const ( - metaMtime = "X-Amz-Meta-Mtime" // the meta key to store mtime in - listChunkSize = 1024 // number of items to read at once + metaMtime = "Mtime" // the meta key to store mtime in - eg X-Amz-Meta-Mtime + listChunkSize = 1024 // number of items to read at once + maxRetries = 10 // number of retries to make of operations ) // FsS3 represents a remote s3 server type FsS3 struct { - c *s3.S3 // the connection to the s3 server - b *s3.Bucket // the connection to the bucket - bucket string // the bucket we are working on - perm s3.ACL // permissions for new buckets / objects - root string // root of the bucket - ignore all objects above this + c *s3.S3 // the connection to the s3 server + bucket string // the bucket we are working on + perm string // permissions for new buckets / objects + root string // root of the bucket - ignore all objects above this + locationConstraint string // location constraint of new buckets } // FsObjectS3 describes a s3 object @@ -119,12 +146,12 @@ type FsObjectS3 struct { // // List will read everything but meta - to fill that in need to call // readMetaData - s3 *FsS3 // what this object is part of - remote string // The remote path - etag string // md5sum of the object - bytes int64 // size of the object - lastModified time.Time // Last modified - meta s3.Headers // The object metadata if known - may be nil + s3 *FsS3 // what this object is part of + remote string // The remote path + etag string // md5sum of the object + bytes int64 // size of the object + lastModified time.Time // Last modified + meta map[string]*string // The object metadata if known - may be nil } // ------------------------------------------------------------ @@ -163,28 +190,26 @@ func s3Connection(name string) (*s3.S3, error) { if secretAccessKey == "" { return nil, errors.New("secret_access_key not found") } - auth := aws.Auth{AccessKey: accessKeyId, SecretKey: secretAccessKey} + auth := credentials.NewStaticCredentials(accessKeyId, secretAccessKey, "") - // FIXME look through all the regions by name and use one of them if found - - // Synthesize the region - s3Endpoint := fs.ConfigFile.MustValue(name, "endpoint") - if s3Endpoint == "" { - s3Endpoint = "https://s3.amazonaws.com/" + endpoint := fs.ConfigFile.MustValue(name, "endpoint") + region := fs.ConfigFile.MustValue(name, "region") + if region == "" && endpoint == "" { + endpoint = "https://s3.amazonaws.com/" } - region := aws.Region{ - Name: "s3", - S3Endpoint: s3Endpoint, - S3LocationConstraint: false, + if region == "" { + region = "us-east-1" } - s3LocationConstraint := fs.ConfigFile.MustValue(name, "location_constraint") - if s3LocationConstraint != "" { - region.Name = s3LocationConstraint - region.S3LocationConstraint = true + awsConfig := aws.NewConfig(). + WithRegion(region). + WithMaxRetries(maxRetries). + WithCredentials(auth). + WithEndpoint(endpoint). + WithHTTPClient(fs.Config.Client()) + c := s3.New(awsConfig) + if region == "other-v2-signature" { + log.Fatal("Sorry v2 signatures not supported yet :-(") } - - c := s3.New(auth, region) - c.Client = fs.Config.Client() return c, nil } @@ -201,14 +226,18 @@ func NewFs(name, root string) (fs.Fs, error) { f := &FsS3{ c: c, bucket: bucket, - b: c.Bucket(bucket), - perm: s3.Private, // FIXME need user to specify - root: directory, + // FIXME perm: s3.Private, // FIXME need user to specify + root: directory, + locationConstraint: fs.ConfigFile.MustValue(name, "location_constraint"), } if f.root != "" { f.root += "/" // Check to see if the object exists - _, err = f.b.Head(directory, nil) + req := s3.HeadObjectInput{ + Bucket: &f.bucket, + Key: &directory, + } + _, err = f.c.HeadObject(&req) if err == nil { remote := path.Base(directory) f.root = path.Dir(directory) @@ -222,27 +251,28 @@ func NewFs(name, root string) (fs.Fs, error) { return fs.NewLimited(f, obj), nil } } + // f.listMultipartUploads() return f, nil } // Return an FsObject from a path // // May return nil if an error occurred -func (f *FsS3) newFsObjectWithInfo(remote string, info *s3.Key) fs.Object { +func (f *FsS3) newFsObjectWithInfo(remote string, info *s3.Object) fs.Object { o := &FsObjectS3{ s3: f, remote: remote, } if info != nil { // Set info but not meta - var err error - o.lastModified, err = time.Parse(time.RFC3339, info.LastModified) - if err != nil { - fs.Log(o, "Failed to read last modified: %s", err) + if info.LastModified == nil { + fs.Log(o, "Failed to read last modified") o.lastModified = time.Now() + } else { + o.lastModified = *info.LastModified } - o.etag = info.ETag - o.bytes = info.Size + o.etag = aws.StringValue(info.ETag) + o.bytes = aws.Int64Value(info.Size) } else { err := o.readMetaData() // reads info and meta, returning an error if err != nil { @@ -263,50 +293,66 @@ func (f *FsS3) NewFsObject(remote string) fs.Object { // list the objects into the function supplied // // If directories is set it only sends directories -func (f *FsS3) list(directories bool, fn func(string, *s3.Key)) { +func (f *FsS3) list(directories bool, fn func(string, *s3.Object)) { + maxKeys := int64(listChunkSize) delimiter := "" if directories { delimiter = "/" } - marker := "" + var marker *string for { - objects, err := f.b.List(f.root, delimiter, marker, listChunkSize) + // FIXME need to implement ALL loop + req := s3.ListObjectsInput{ + Bucket: &f.bucket, + Delimiter: &delimiter, + Prefix: &f.root, + MaxKeys: &maxKeys, + Marker: marker, + } + resp, err := f.c.ListObjects(&req) if err != nil { fs.Stats.Error() fs.ErrorLog(f, "Couldn't read bucket %q: %s", f.bucket, err) + break } else { rootLength := len(f.root) if directories { - for _, remote := range objects.CommonPrefixes { + for _, commonPrefix := range resp.CommonPrefixes { + if commonPrefix.Prefix == nil { + fs.Log(f, "Nil common prefix received") + continue + } + remote := *commonPrefix.Prefix if !strings.HasPrefix(remote, f.root) { fs.Log(f, "Odd name received %q", remote) continue } - remote := remote[rootLength:] + remote = remote[rootLength:] if strings.HasSuffix(remote, "/") { remote = remote[:len(remote)-1] } - fn(remote, &s3.Key{Key: remote}) + fn(remote, &s3.Object{Key: &remote}) } } else { - for i := range objects.Contents { - object := &objects.Contents[i] - if !strings.HasPrefix(object.Key, f.root) { - fs.Log(f, "Odd name received %q", object.Key) + for _, object := range resp.Contents { + key := aws.StringValue(object.Key) + if !strings.HasPrefix(key, f.root) { + fs.Log(f, "Odd name received %q", key) continue } - remote := object.Key[rootLength:] + remote := key[rootLength:] fn(remote, object) } } - } - if !objects.IsTruncated { - break - } - // Use NextMarker if set, otherwise use last Key - marker = objects.NextMarker - if marker == "" { - marker = objects.Contents[len(objects.Contents)-1].Key + if !aws.BoolValue(resp.IsTruncated) { + break + } + // Use NextMarker if set, otherwise use last Key + if resp.NextMarker == nil || *resp.NextMarker == "" { + marker = resp.Contents[len(resp.Contents)-1].Key + } else { + marker = resp.NextMarker + } } } } @@ -322,7 +368,7 @@ func (f *FsS3) List() fs.ObjectsChan { } else { go func() { defer close(out) - f.list(false, func(remote string, object *s3.Key) { + f.list(false, func(remote string, object *s3.Object) { if fs := f.newFsObjectWithInfo(remote, object); fs != nil { out <- fs } @@ -339,15 +385,16 @@ func (f *FsS3) ListDir() fs.DirChan { // List the buckets go func() { defer close(out) - buckets, err := f.c.ListBuckets() + req := s3.ListBucketsInput{} + resp, err := f.c.ListBuckets(&req) if err != nil { fs.Stats.Error() fs.ErrorLog(f, "Couldn't list buckets: %s", err) } else { - for _, bucket := range buckets { + for _, bucket := range resp.Buckets { out <- &fs.Dir{ - Name: bucket.Name, - When: bucket.CreationDate, + Name: aws.StringValue(bucket.Name), + When: aws.TimeValue(bucket.CreationDate), Bytes: -1, Count: -1, } @@ -358,10 +405,14 @@ func (f *FsS3) ListDir() fs.DirChan { // List the directories in the path in the bucket go func() { defer close(out) - f.list(true, func(remote string, object *s3.Key) { + f.list(true, func(remote string, object *s3.Object) { + size := int64(0) + if object.Size != nil { + size = *object.Size + } out <- &fs.Dir{ Name: remote, - Bytes: object.Size, + Bytes: size, Count: 0, } }) @@ -379,9 +430,18 @@ func (f *FsS3) Put(in io.Reader, remote string, modTime time.Time, size int64) ( // Mkdir creates the bucket if it doesn't exist func (f *FsS3) Mkdir() error { - err := f.b.PutBucket(f.perm) - if err, ok := err.(*s3.Error); ok { - if err.Code == "BucketAlreadyOwnedByYou" { + req := s3.CreateBucketInput{ + Bucket: &f.bucket, + ACL: &f.perm, + } + if f.locationConstraint != "" { + req.CreateBucketConfiguration = &s3.CreateBucketConfiguration{ + LocationConstraint: &f.locationConstraint, + } + } + _, err := f.c.CreateBucket(&req) + if err, ok := err.(awserr.Error); ok { + if err.Code() == "BucketAlreadyOwnedByYou" { return nil } } @@ -392,7 +452,11 @@ func (f *FsS3) Mkdir() error { // // Returns an error if it isn't empty func (f *FsS3) Rmdir() error { - return f.b.DelBucket() + req := s3.DeleteBucketInput{ + Bucket: &f.bucket, + } + _, err := f.c.DeleteBucket(&req) + return err } // Return the precision @@ -440,28 +504,17 @@ func (o *FsObjectS3) Size() int64 { // readMetaData gets the metadata if it hasn't already been fetched // -// if we get a 404 error then we retry a few times for eventual -// consistency reasons -// // it also sets the info func (o *FsObjectS3) readMetaData() (err error) { if o.meta != nil { return nil } - var headers s3.Headers - - // Try reading the metadata a few times (with exponential - // backoff) to get around eventual consistency on 404 error - for tries := uint(0); tries < 10; tries++ { - headers, err = o.s3.b.Head(o.s3.root+o.remote, nil) - if s3Err, ok := err.(*s3.Error); ok { - if s3Err.StatusCode == http.StatusNotFound { - time.Sleep(5 * time.Millisecond << tries) - continue - } - } - break + key := o.s3.root + o.remote + req := s3.HeadObjectInput{ + Bucket: &o.s3.bucket, + Key: &key, } + resp, err := o.s3.c.HeadObject(&req) if err != nil { fs.Debug(o, "Failed to read info: %s", err) return err @@ -469,19 +522,17 @@ func (o *FsObjectS3) readMetaData() (err error) { var size int64 // Ignore missing Content-Length assuming it is 0 // Some versions of ceph do this due their apache proxies - if contentLength, ok := headers["Content-Length"]; ok { - size, err = strconv.ParseInt(contentLength, 10, 64) - if err != nil { - fs.Debug(o, "Failed to read size from: %q", headers) - return err - } + if resp.ContentLength != nil { + size = *resp.ContentLength } - o.etag = headers["Etag"] + o.etag = aws.StringValue(resp.ETag) o.bytes = size - o.meta = headers - if o.lastModified, err = time.Parse(http.TimeFormat, headers["Last-Modified"]); err != nil { + o.meta = resp.Metadata + if resp.LastModified == nil { fs.Log(o, "Failed to read last modified from HEAD: %s", err) o.lastModified = time.Now() + } else { + o.lastModified = *resp.LastModified } return nil } @@ -498,11 +549,11 @@ func (o *FsObjectS3) ModTime() time.Time { } // read mtime out of metadata if available d, ok := o.meta[metaMtime] - if !ok { + if !ok || d == nil { // fs.Debug(o, "No metadata") return o.lastModified } - modTime, err := swift.FloatStringToTime(d) + modTime, err := swift.FloatStringToTime(*d) if err != nil { fs.Log(o, "Failed to read mtime from object: %s", err) return o.lastModified @@ -518,8 +569,21 @@ func (o *FsObjectS3) SetModTime(modTime time.Time) { fs.ErrorLog(o, "Failed to read metadata: %s", err) return } - o.meta[metaMtime] = swift.TimeToFloatString(modTime) - _, err = o.s3.b.Update(o.s3.root+o.remote, o.s3.perm, o.meta) + o.meta[metaMtime] = aws.String(swift.TimeToFloatString(modTime)) + + // Copy the object to itself to update the metadata + key := o.s3.root + o.remote + sourceKey := o.s3.bucket + "/" + key + directive := s3.MetadataDirectiveReplace // replace metadata with that passed in + req := s3.CopyObjectInput{ + Bucket: &o.s3.bucket, + ACL: &o.s3.perm, + Key: &key, + CopySource: &sourceKey, + Metadata: o.meta, + MetadataDirective: &directive, + } + _, err = o.s3.c.CopyObject(&req) if err != nil { fs.Stats.Error() fs.ErrorLog(o, "Failed to update remote mtime: %s", err) @@ -533,21 +597,51 @@ func (o *FsObjectS3) Storable() bool { // Open an object for read func (o *FsObjectS3) Open() (in io.ReadCloser, err error) { - in, err = o.s3.b.GetReader(o.s3.root + o.remote) - return + key := o.s3.root + o.remote + req := s3.GetObjectInput{ + Bucket: &o.s3.bucket, + Key: &key, + } + resp, err := o.s3.c.GetObject(&req) + if err != nil { + return nil, err + } + return resp.Body, nil } // Update the Object from in with modTime and size func (o *FsObjectS3) Update(in io.Reader, modTime time.Time, size int64) error { - // Set the mtime in the headers - headers := s3.Headers{ - metaMtime: swift.TimeToFloatString(modTime), + opts := s3manager.UploadOptions{ + // PartSize: 64 * 1024 * 1024, use default + Concurrency: 2, // limit concurrency + LeavePartsOnError: false, + S3: o.s3.c, + } + uploader := s3manager.NewUploader(&opts) + + // Set the mtime in the meta data + metadata := map[string]*string{ + metaMtime: aws.String(swift.TimeToFloatString(modTime)), } - _, err := o.s3.b.PutReaderHeaders(o.s3.root+o.remote, in, size, fs.MimeType(o), o.s3.perm, headers) + // Guess the content type + contentType := fs.MimeType(o) + + key := o.s3.root + o.remote + req := s3manager.UploadInput{ + Bucket: &o.s3.bucket, + ACL: &o.s3.perm, + Key: &key, + Body: in, + ContentType: &contentType, + Metadata: metadata, + //ContentLength: &size, + } + _, err := uploader.Upload(&req) if err != nil { return err } + // Read the metadata from the newly created object o.meta = nil // wipe old metadata err = o.readMetaData() @@ -556,7 +650,13 @@ func (o *FsObjectS3) Update(in io.Reader, modTime time.Time, size int64) error { // Remove an object func (o *FsObjectS3) Remove() error { - return o.s3.b.Del(o.s3.root + o.remote) + key := o.s3.root + o.remote + req := s3.DeleteObjectInput{ + Bucket: &o.s3.bucket, + Key: &key, + } + _, err := o.s3.c.DeleteObject(&req) + return err } // Check the interfaces are satisfied