From d5f4c746970c647b8b49e32b7de36d61c98bbd5e Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 25 Jun 2020 16:11:05 +0100 Subject: [PATCH] s3: implement cleanup and backend command to list & remove multipart uploads This implements `rclone cleanup` to remove multipart uploads over 24 hours old. It also implements the backend command `list-multipart-uploads` to see which ones are available and `cleanup` to delete them with a configurable expiry interval. See #4302 --- backend/s3/s3.go | 191 ++++++++++++++++++++++++++++++++++++--- docs/content/overview.md | 2 +- docs/content/s3.md | 10 ++ 3 files changed, 190 insertions(+), 13 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 225d003e8..10b0bc525 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -1,18 +1,6 @@ // Package s3 provides an interface to Amazon S3 oject storage package s3 -// FIXME need to prevent anything but ListDir working for s3:// - -/* -Progress of port to aws-sdk - - * Don't really need o.meta at all? - -What happens if you CTRL-C a multipart upload - * get an incomplete upload - * disappears when you delete the bucket -*/ - import ( "bytes" "context" @@ -2127,6 +2115,58 @@ if not. "lifetime": "Lifetime of the active copy in days", "description": "The optional description for the job.", }, +}, { + Name: "list-multipart-uploads", + Short: "List the unfinished multipart uploads", + Long: `This command lists the unfinished multipart uploads in JSON format. + + rclone backend list-multipart s3:bucket/path/to/object + +It returns a dictionary of buckets with values as lists of unfinished +multipart uploads. + +You can call it with no bucket in which case it lists all bucket, with +a bucket or with a bucket and path. + + { + "rclone": [ + { + "Initiated": "2020-06-26T14:20:36Z", + "Initiator": { + "DisplayName": "XXX", + "ID": "arn:aws:iam::XXX:user/XXX" + }, + "Key": "KEY", + "Owner": { + "DisplayName": null, + "ID": "XXX" + }, + "StorageClass": "STANDARD", + "UploadId": "XXX" + } + ], + "rclone-1000files": [], + "rclone-dst": [] + } + +`, +}, { + Name: "cleanup", + Short: "Remove unfinished multipart uploads.", + Long: `This command removes unfinished multipart uploads of age greater than +max-age which defaults to 24 hours. + +Note that you can use -i/--dry-run with this command to see what it +would do. + + rclone backend cleanup s3:bucket/path/to/object + rclone backend cleanup -o max-age=7w s3:bucket/path/to/object + +Durations are parsed as per the rest of rclone, 2h, 7d, 7w etc. +`, + Opts: map[string]string{ + "max-age": "Max age of upload to delete", + }, }} // Command the backend to run a named command @@ -2201,11 +2241,137 @@ func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[str return out, err } return out, nil + case "list-multipart-uploads": + return f.listMultipartUploadsAll(ctx) + case "cleanup": + maxAge := 24 * time.Hour + if opt["max-age"] != "" { + maxAge, err = fs.ParseDuration(opt["max-age"]) + if err != nil { + return nil, errors.Wrap(err, "bad max-age") + } + } + return nil, f.cleanUp(ctx, maxAge) default: return nil, fs.ErrorCommandNotFound } } +// listMultipartUploads lists all outstanding multipart uploads for (bucket, key) +// +// Note that rather lazily we treat key as a prefix so it matches +// directories and objects. This could suprise the user if they ask +// for "dir" and it returns "dirKey" +func (f *Fs) listMultipartUploads(ctx context.Context, bucket, key string) (uploads []*s3.MultipartUpload, err error) { + var ( + keyMarker *string + uploadIDMarker *string + ) + uploads = []*s3.MultipartUpload{} + for { + req := s3.ListMultipartUploadsInput{ + Bucket: &bucket, + MaxUploads: &f.opt.ListChunk, + KeyMarker: keyMarker, + UploadIdMarker: uploadIDMarker, + Prefix: &key, + } + var resp *s3.ListMultipartUploadsOutput + err = f.pacer.Call(func() (bool, error) { + resp, err = f.c.ListMultipartUploads(&req) + return f.shouldRetry(err) + }) + if err != nil { + return nil, errors.Wrapf(err, "list multipart uploads bucket %q key %q", bucket, key) + } + uploads = append(uploads, resp.Uploads...) + if !aws.BoolValue(resp.IsTruncated) { + break + } + keyMarker = resp.NextKeyMarker + uploadIDMarker = resp.NextUploadIdMarker + } + return uploads, nil +} + +func (f *Fs) listMultipartUploadsAll(ctx context.Context) (uploadsMap map[string][]*s3.MultipartUpload, err error) { + uploadsMap = make(map[string][]*s3.MultipartUpload) + bucket, directory := f.split("") + if bucket != "" { + uploads, err := f.listMultipartUploads(ctx, bucket, directory) + if err != nil { + return uploadsMap, err + } + uploadsMap[bucket] = uploads + return uploadsMap, nil + } + entries, err := f.listBuckets(ctx) + if err != nil { + return uploadsMap, err + } + for _, entry := range entries { + bucket := entry.Remote() + uploads, listErr := f.listMultipartUploads(ctx, bucket, "") + if listErr != nil { + err = listErr + fs.Errorf(f, "%v", err) + } + uploadsMap[bucket] = uploads + } + return uploadsMap, err +} + +// cleanUpBucket removes all pending multipart uploads for a given bucket over the age of maxAge +func (f *Fs) cleanUpBucket(ctx context.Context, bucket string, maxAge time.Duration, uploads []*s3.MultipartUpload) (err error) { + fs.Infof(f, "cleaning bucket %q of pending multipart uploads older than %v", bucket, maxAge) + for _, upload := range uploads { + if upload.Initiated != nil && upload.Key != nil && upload.UploadId != nil { + age := time.Since(*upload.Initiated) + what := fmt.Sprintf("pending multipart upload for bucket %q key %q dated %v (%v ago)", bucket, *upload.Key, upload.Initiated, age) + if age > maxAge { + fs.Infof(f, "removing %s", what) + if operations.SkipDestructive(ctx, what, "remove pending upload") { + continue + } + req := s3.AbortMultipartUploadInput{ + Bucket: &bucket, + UploadId: upload.UploadId, + Key: upload.Key, + } + _, abortErr := f.c.AbortMultipartUpload(&req) + if abortErr != nil { + err = errors.Wrapf(abortErr, "failed to remove %s", what) + fs.Errorf(f, "%v", err) + } + } else { + fs.Debugf(f, "ignoring %s", what) + } + } + } + return err +} + +// CleanUp removes all pending multipart uploads +func (f *Fs) cleanUp(ctx context.Context, maxAge time.Duration) (err error) { + uploadsMap, err := f.listMultipartUploadsAll(ctx) + if err != nil { + return err + } + for bucket, uploads := range uploadsMap { + cleanErr := f.cleanUpBucket(ctx, bucket, maxAge, uploads) + if err != nil { + fs.Errorf(f, "Failed to cleanup bucket %q: %v", bucket, cleanErr) + err = cleanErr + } + } + return err +} + +// CleanUp removes all pending multipart uploads older than 24 hours +func (f *Fs) CleanUp(ctx context.Context) (err error) { + return f.cleanUp(ctx, 24*time.Hour) +} + // ------------------------------------------------------------ // Fs returns the parent Fs @@ -2824,6 +2990,7 @@ var ( _ fs.PutStreamer = &Fs{} _ fs.ListRer = &Fs{} _ fs.Commander = &Fs{} + _ fs.CleanUpper = &Fs{} _ fs.Object = &Object{} _ fs.MimeTyper = &Object{} _ fs.GetTierer = &Object{} diff --git a/docs/content/overview.md b/docs/content/overview.md index 31ad2c87d..5cc94d156 100644 --- a/docs/content/overview.md +++ b/docs/content/overview.md @@ -320,7 +320,7 @@ operations more efficient. | ---------------------------- |:-----:|:----:|:----:|:-------:|:-------:|:-----:|:------------:|:------------:|:-----:| :------: | | 1Fichier | No | No | No | No | No | No | No | No | No | Yes | | Amazon Drive | Yes | No | Yes | Yes | No [#575](https://github.com/rclone/rclone/issues/575) | No | No | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | Yes | -| Amazon S3 | No | Yes | No | No | No | Yes | Yes | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | No | +| Amazon S3 | No | Yes | No | No | Yes | Yes | Yes | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | No | | Backblaze B2 | No | Yes | No | No | Yes | Yes | Yes | Yes | No | No | | Box | Yes | Yes | Yes | Yes | Yes ‡‡ | No | Yes | Yes | No | Yes | | Citrix ShareFile | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes | diff --git a/docs/content/s3.md b/docs/content/s3.md index 6d585264f..4590e0aa6 100644 --- a/docs/content/s3.md +++ b/docs/content/s3.md @@ -276,6 +276,16 @@ side copy to update the modification if the object can be copied in a single par In the case the object is larger than 5Gb or is in Glacier or Glacier Deep Archive storage the object will be uploaded rather than copied. +### Cleanup ### + +If you run `rclone cleanup s3:bucket` then it will remove all pending +multipart uploads older than 24 hours. You can use the `-i` flag to +see exactly what it will do. If you want more control over the expiry +date then run `rclone backend cleanup s3:bucket -o max-age=1h` to +expire all uploads older than one hour. You can use `rclone backend +list-multipart-uploads s3:bucket` to see the pending multipart +uploads. + #### Restricted filename characters S3 allows any valid UTF-8 string as a key.