s3: implement cleanup and backend command to list & remove multipart uploads

This implements `rclone cleanup` to remove multipart uploads over 24
hours old. It also implements the backend command
`list-multipart-uploads` to see which ones are available and `cleanup`
to delete them with a configurable expiry interval.

See #4302
This commit is contained in:
Nick Craig-Wood 2020-06-25 16:11:05 +01:00
parent 8f42532b6d
commit d5f4c74697
3 changed files with 190 additions and 13 deletions

View file

@ -1,18 +1,6 @@
// Package s3 provides an interface to Amazon S3 oject storage
package s3
// FIXME need to prevent anything but ListDir working for s3://
/*
Progress of port to aws-sdk
* Don't really need o.meta at all?
What happens if you CTRL-C a multipart upload
* get an incomplete upload
* disappears when you delete the bucket
*/
import (
"bytes"
"context"
@ -2127,6 +2115,58 @@ if not.
"lifetime": "Lifetime of the active copy in days",
"description": "The optional description for the job.",
},
}, {
Name: "list-multipart-uploads",
Short: "List the unfinished multipart uploads",
Long: `This command lists the unfinished multipart uploads in JSON format.
rclone backend list-multipart s3:bucket/path/to/object
It returns a dictionary of buckets with values as lists of unfinished
multipart uploads.
You can call it with no bucket in which case it lists all bucket, with
a bucket or with a bucket and path.
{
"rclone": [
{
"Initiated": "2020-06-26T14:20:36Z",
"Initiator": {
"DisplayName": "XXX",
"ID": "arn:aws:iam::XXX:user/XXX"
},
"Key": "KEY",
"Owner": {
"DisplayName": null,
"ID": "XXX"
},
"StorageClass": "STANDARD",
"UploadId": "XXX"
}
],
"rclone-1000files": [],
"rclone-dst": []
}
`,
}, {
Name: "cleanup",
Short: "Remove unfinished multipart uploads.",
Long: `This command removes unfinished multipart uploads of age greater than
max-age which defaults to 24 hours.
Note that you can use -i/--dry-run with this command to see what it
would do.
rclone backend cleanup s3:bucket/path/to/object
rclone backend cleanup -o max-age=7w s3:bucket/path/to/object
Durations are parsed as per the rest of rclone, 2h, 7d, 7w etc.
`,
Opts: map[string]string{
"max-age": "Max age of upload to delete",
},
}}
// Command the backend to run a named command
@ -2201,11 +2241,137 @@ func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[str
return out, err
}
return out, nil
case "list-multipart-uploads":
return f.listMultipartUploadsAll(ctx)
case "cleanup":
maxAge := 24 * time.Hour
if opt["max-age"] != "" {
maxAge, err = fs.ParseDuration(opt["max-age"])
if err != nil {
return nil, errors.Wrap(err, "bad max-age")
}
}
return nil, f.cleanUp(ctx, maxAge)
default:
return nil, fs.ErrorCommandNotFound
}
}
// listMultipartUploads lists all outstanding multipart uploads for (bucket, key)
//
// Note that rather lazily we treat key as a prefix so it matches
// directories and objects. This could suprise the user if they ask
// for "dir" and it returns "dirKey"
func (f *Fs) listMultipartUploads(ctx context.Context, bucket, key string) (uploads []*s3.MultipartUpload, err error) {
var (
keyMarker *string
uploadIDMarker *string
)
uploads = []*s3.MultipartUpload{}
for {
req := s3.ListMultipartUploadsInput{
Bucket: &bucket,
MaxUploads: &f.opt.ListChunk,
KeyMarker: keyMarker,
UploadIdMarker: uploadIDMarker,
Prefix: &key,
}
var resp *s3.ListMultipartUploadsOutput
err = f.pacer.Call(func() (bool, error) {
resp, err = f.c.ListMultipartUploads(&req)
return f.shouldRetry(err)
})
if err != nil {
return nil, errors.Wrapf(err, "list multipart uploads bucket %q key %q", bucket, key)
}
uploads = append(uploads, resp.Uploads...)
if !aws.BoolValue(resp.IsTruncated) {
break
}
keyMarker = resp.NextKeyMarker
uploadIDMarker = resp.NextUploadIdMarker
}
return uploads, nil
}
func (f *Fs) listMultipartUploadsAll(ctx context.Context) (uploadsMap map[string][]*s3.MultipartUpload, err error) {
uploadsMap = make(map[string][]*s3.MultipartUpload)
bucket, directory := f.split("")
if bucket != "" {
uploads, err := f.listMultipartUploads(ctx, bucket, directory)
if err != nil {
return uploadsMap, err
}
uploadsMap[bucket] = uploads
return uploadsMap, nil
}
entries, err := f.listBuckets(ctx)
if err != nil {
return uploadsMap, err
}
for _, entry := range entries {
bucket := entry.Remote()
uploads, listErr := f.listMultipartUploads(ctx, bucket, "")
if listErr != nil {
err = listErr
fs.Errorf(f, "%v", err)
}
uploadsMap[bucket] = uploads
}
return uploadsMap, err
}
// cleanUpBucket removes all pending multipart uploads for a given bucket over the age of maxAge
func (f *Fs) cleanUpBucket(ctx context.Context, bucket string, maxAge time.Duration, uploads []*s3.MultipartUpload) (err error) {
fs.Infof(f, "cleaning bucket %q of pending multipart uploads older than %v", bucket, maxAge)
for _, upload := range uploads {
if upload.Initiated != nil && upload.Key != nil && upload.UploadId != nil {
age := time.Since(*upload.Initiated)
what := fmt.Sprintf("pending multipart upload for bucket %q key %q dated %v (%v ago)", bucket, *upload.Key, upload.Initiated, age)
if age > maxAge {
fs.Infof(f, "removing %s", what)
if operations.SkipDestructive(ctx, what, "remove pending upload") {
continue
}
req := s3.AbortMultipartUploadInput{
Bucket: &bucket,
UploadId: upload.UploadId,
Key: upload.Key,
}
_, abortErr := f.c.AbortMultipartUpload(&req)
if abortErr != nil {
err = errors.Wrapf(abortErr, "failed to remove %s", what)
fs.Errorf(f, "%v", err)
}
} else {
fs.Debugf(f, "ignoring %s", what)
}
}
}
return err
}
// CleanUp removes all pending multipart uploads
func (f *Fs) cleanUp(ctx context.Context, maxAge time.Duration) (err error) {
uploadsMap, err := f.listMultipartUploadsAll(ctx)
if err != nil {
return err
}
for bucket, uploads := range uploadsMap {
cleanErr := f.cleanUpBucket(ctx, bucket, maxAge, uploads)
if err != nil {
fs.Errorf(f, "Failed to cleanup bucket %q: %v", bucket, cleanErr)
err = cleanErr
}
}
return err
}
// CleanUp removes all pending multipart uploads older than 24 hours
func (f *Fs) CleanUp(ctx context.Context) (err error) {
return f.cleanUp(ctx, 24*time.Hour)
}
// ------------------------------------------------------------
// Fs returns the parent Fs
@ -2824,6 +2990,7 @@ var (
_ fs.PutStreamer = &Fs{}
_ fs.ListRer = &Fs{}
_ fs.Commander = &Fs{}
_ fs.CleanUpper = &Fs{}
_ fs.Object = &Object{}
_ fs.MimeTyper = &Object{}
_ fs.GetTierer = &Object{}

View file

@ -320,7 +320,7 @@ operations more efficient.
| ---------------------------- |:-----:|:----:|:----:|:-------:|:-------:|:-----:|:------------:|:------------:|:-----:| :------: |
| 1Fichier | No | No | No | No | No | No | No | No | No | Yes |
| Amazon Drive | Yes | No | Yes | Yes | No [#575](https://github.com/rclone/rclone/issues/575) | No | No | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | Yes |
| Amazon S3 | No | Yes | No | No | No | Yes | Yes | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | No |
| Amazon S3 | No | Yes | No | No | Yes | Yes | Yes | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | No |
| Backblaze B2 | No | Yes | No | No | Yes | Yes | Yes | Yes | No | No |
| Box | Yes | Yes | Yes | Yes | Yes ‡‡ | No | Yes | Yes | No | Yes |
| Citrix ShareFile | Yes | Yes | Yes | Yes | No | No | Yes | No | No | Yes |

View file

@ -276,6 +276,16 @@ side copy to update the modification if the object can be copied in a single par
In the case the object is larger than 5Gb or is in Glacier or Glacier Deep Archive
storage the object will be uploaded rather than copied.
### Cleanup ###
If you run `rclone cleanup s3:bucket` then it will remove all pending
multipart uploads older than 24 hours. You can use the `-i` flag to
see exactly what it will do. If you want more control over the expiry
date then run `rclone backend cleanup s3:bucket -o max-age=1h` to
expire all uploads older than one hour. You can use `rclone backend
list-multipart-uploads s3:bucket` to see the pending multipart
uploads.
#### Restricted filename characters
S3 allows any valid UTF-8 string as a key.