forked from TrueCloudLab/rclone
s3: implement --s3-versions flag - See #1776
This commit is contained in:
parent
a59fa2977d
commit
0ae171416f
3 changed files with 402 additions and 52 deletions
276
backend/s3/s3.go
276
backend/s3/s3.go
|
@ -54,6 +54,7 @@ import (
|
|||
"github.com/rclone/rclone/lib/readers"
|
||||
"github.com/rclone/rclone/lib/rest"
|
||||
"github.com/rclone/rclone/lib/structs"
|
||||
"github.com/rclone/rclone/lib/version"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
|
@ -1982,6 +1983,11 @@ circumstances or for testing.
|
|||
`,
|
||||
Default: false,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "versions",
|
||||
Help: "Include old versions in directory listings.",
|
||||
Default: false,
|
||||
Advanced: true,
|
||||
},
|
||||
}})
|
||||
}
|
||||
|
@ -2099,6 +2105,7 @@ type Options struct {
|
|||
DownloadURL string `config:"download_url"`
|
||||
UseMultipartEtag fs.Tristate `config:"use_multipart_etag"`
|
||||
UsePresignedRequest bool `config:"use_presigned_request"`
|
||||
Versions bool `config:"versions"`
|
||||
}
|
||||
|
||||
// Fs represents a remote s3 server
|
||||
|
@ -2136,6 +2143,7 @@ type Object struct {
|
|||
lastModified time.Time // Last modified
|
||||
meta map[string]string // The object metadata if known - may be nil - with lower case keys
|
||||
mimeType string // MimeType of object - may be ""
|
||||
versionID *string // If present this points to an object version
|
||||
|
||||
// Metadata as pointers to strings as they often won't be present
|
||||
storageClass *string // e.g. GLACIER
|
||||
|
@ -2237,7 +2245,17 @@ func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) {
|
|||
|
||||
// split returns bucket and bucketPath from the object
|
||||
func (o *Object) split() (bucket, bucketPath string) {
|
||||
return o.fs.split(o.remote)
|
||||
bucket, bucketPath = o.fs.split(o.remote)
|
||||
// If there is an object version, then the path may have a
|
||||
// version suffix, if so remove it.
|
||||
//
|
||||
// If we are unlucky enough to have a file name with a valid
|
||||
// version path where this wasn't required (eg using
|
||||
// --s3-version-at) then this will go wrong.
|
||||
if o.versionID != nil {
|
||||
_, bucketPath = version.Remove(bucketPath)
|
||||
}
|
||||
return bucket, bucketPath
|
||||
}
|
||||
|
||||
// getClient makes an http client according to the options
|
||||
|
@ -2671,14 +2689,58 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
return f, nil
|
||||
}
|
||||
|
||||
// getMetaDataListing gets the metadata from the object unconditionally from the listing
|
||||
//
|
||||
// This is needed to find versioned objects from their paths.
|
||||
//
|
||||
// It may return info == nil and err == nil if a HEAD would be more appropriate
|
||||
func (f *Fs) getMetaDataListing(ctx context.Context, wantRemote string) (info *s3.Object, versionID *string, err error) {
|
||||
bucket, bucketPath := f.split(wantRemote)
|
||||
timestamp, bucketPath := version.Remove(bucketPath)
|
||||
|
||||
// If the path had no version string return no info, to force caller to look it up
|
||||
if timestamp.IsZero() {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
err = f.list(ctx, bucket, bucketPath, "", false, true, f.opt.Versions, false, true, func(gotRemote string, object *s3.Object, objectVersionID *string, isDirectory bool) error {
|
||||
if isDirectory {
|
||||
return nil
|
||||
}
|
||||
if wantRemote != gotRemote {
|
||||
return nil
|
||||
}
|
||||
info = object
|
||||
versionID = objectVersionID
|
||||
return errEndList // read only 1 item
|
||||
})
|
||||
if err != nil {
|
||||
if err == fs.ErrorDirNotFound {
|
||||
return nil, nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return nil, nil, err
|
||||
}
|
||||
if info == nil {
|
||||
return nil, nil, fs.ErrorObjectNotFound
|
||||
}
|
||||
return info, versionID, nil
|
||||
}
|
||||
|
||||
// Return an Object from a path
|
||||
//
|
||||
// If it can't be found it returns the error ErrorObjectNotFound.
|
||||
func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Object) (fs.Object, error) {
|
||||
func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Object, versionID *string) (obj fs.Object, err error) {
|
||||
o := &Object{
|
||||
fs: f,
|
||||
remote: remote,
|
||||
}
|
||||
if info == nil && f.opt.Versions && version.Match(remote) {
|
||||
// If versions, have to read the listing to find the version ID
|
||||
info, versionID, err = f.getMetaDataListing(ctx, remote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if info != nil {
|
||||
// Set info but not meta
|
||||
if info.LastModified == nil {
|
||||
|
@ -2690,6 +2752,7 @@ func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Obje
|
|||
o.setMD5FromEtag(aws.StringValue(info.ETag))
|
||||
o.bytes = aws.Int64Value(info.Size)
|
||||
o.storageClass = info.StorageClass
|
||||
o.versionID = versionID
|
||||
} else if !o.fs.opt.NoHeadObject {
|
||||
err := o.readMetaData(ctx) // reads info and meta, returning an error
|
||||
if err != nil {
|
||||
|
@ -2702,7 +2765,7 @@ func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Obje
|
|||
// NewObject finds the Object at remote. If it can't be found
|
||||
// it returns the error fs.ErrorObjectNotFound.
|
||||
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||
return f.newObjectWithInfo(ctx, remote, nil)
|
||||
return f.newObjectWithInfo(ctx, remote, nil, nil)
|
||||
}
|
||||
|
||||
// Gets the bucket location
|
||||
|
@ -2752,7 +2815,7 @@ func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error {
|
|||
|
||||
// Common interface for bucket listers
|
||||
type bucketLister interface {
|
||||
List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error)
|
||||
List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error)
|
||||
URLEncodeListings(bool)
|
||||
}
|
||||
|
||||
|
@ -2773,7 +2836,7 @@ func (f *Fs) newV1List(req *s3.ListObjectsV2Input) bucketLister {
|
|||
}
|
||||
|
||||
// List a bucket with V1 listing
|
||||
func (ls *v1List) List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
|
||||
func (ls *v1List) List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
|
||||
respv1, err := ls.f.c.ListObjectsWithContext(ctx, &ls.req)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -2829,7 +2892,7 @@ func (f *Fs) newV2List(req *s3.ListObjectsV2Input) bucketLister {
|
|||
}
|
||||
|
||||
// Do a V2 listing
|
||||
func (ls *v2List) List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
|
||||
func (ls *v2List) List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
|
||||
resp, err = ls.f.c.ListObjectsV2WithContext(ctx, &ls.req)
|
||||
ls.req.ContinuationToken = resp.NextContinuationToken
|
||||
return resp, nil, err
|
||||
|
@ -2844,8 +2907,104 @@ func (ls *v2List) URLEncodeListings(encode bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// Versions bucket lister
|
||||
type versionsList struct {
|
||||
f *Fs
|
||||
req s3.ListObjectVersionsInput
|
||||
}
|
||||
|
||||
// Create a new Versions bucket lister
|
||||
func (f *Fs) newVersionsList(req *s3.ListObjectsV2Input) bucketLister {
|
||||
l := &versionsList{
|
||||
f: f,
|
||||
}
|
||||
// Convert v2 req into withVersions req
|
||||
structs.SetFrom(&l.req, req)
|
||||
return l
|
||||
}
|
||||
|
||||
// Any s3.Objects with this as their size are delete markers
|
||||
var isDeleteMarker = new(int64)
|
||||
|
||||
// List a bucket with versions
|
||||
func (ls *versionsList) List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
|
||||
respVersions, err := ls.f.c.ListObjectVersionsWithContext(ctx, &ls.req)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Set up the request for next time
|
||||
ls.req.KeyMarker = respVersions.NextKeyMarker
|
||||
ls.req.VersionIdMarker = respVersions.NextVersionIdMarker
|
||||
|
||||
// If we are URL encoding then must decode the marker
|
||||
if ls.req.KeyMarker != nil && ls.req.EncodingType != nil {
|
||||
*ls.req.KeyMarker, err = url.QueryUnescape(*ls.req.KeyMarker)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to URL decode KeyMarker %q: %w", *ls.req.KeyMarker, err)
|
||||
}
|
||||
}
|
||||
|
||||
// convert Versions resp into v2 resp
|
||||
resp = new(s3.ListObjectsV2Output)
|
||||
structs.SetFrom(resp, respVersions)
|
||||
|
||||
// Convert the Versions and the DeleteMarkers into an array of s3.Object
|
||||
//
|
||||
// These are returned in the order that they are stored with the most recent first.
|
||||
// With the annoyance that the Versions and DeleteMarkers are split into two
|
||||
objs := make([]*s3.Object, 0, len(respVersions.Versions))
|
||||
for _, objVersion := range respVersions.Versions {
|
||||
var obj = new(s3.Object)
|
||||
structs.SetFrom(obj, objVersion)
|
||||
// Adjust the file names
|
||||
if !aws.BoolValue(objVersion.IsLatest) {
|
||||
if obj.Key != nil && objVersion.LastModified != nil {
|
||||
*obj.Key = version.Add(*obj.Key, *objVersion.LastModified)
|
||||
}
|
||||
}
|
||||
objs = append(objs, obj)
|
||||
versionIDs = append(versionIDs, objVersion.VersionId)
|
||||
}
|
||||
|
||||
// If hidden is set, put the delete markers in too, but set
|
||||
// their sizes to a sentinel delete marker size
|
||||
if hidden {
|
||||
for _, deleteMarker := range respVersions.DeleteMarkers {
|
||||
var obj = new(s3.Object)
|
||||
structs.SetFrom(obj, deleteMarker)
|
||||
obj.Size = isDeleteMarker
|
||||
// Adjust the file names
|
||||
if !aws.BoolValue(deleteMarker.IsLatest) {
|
||||
if obj.Key != nil && deleteMarker.LastModified != nil {
|
||||
*obj.Key = version.Add(*obj.Key, *deleteMarker.LastModified)
|
||||
}
|
||||
}
|
||||
objs = append(objs, obj)
|
||||
versionIDs = append(versionIDs, deleteMarker.VersionId)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
resp.Contents = objs
|
||||
return resp, versionIDs, nil
|
||||
}
|
||||
|
||||
// URL Encode the listings
|
||||
func (ls *versionsList) URLEncodeListings(encode bool) {
|
||||
if encode {
|
||||
ls.req.EncodingType = aws.String(s3.EncodingTypeUrl)
|
||||
} else {
|
||||
ls.req.EncodingType = nil
|
||||
}
|
||||
}
|
||||
|
||||
// listFn is called from list to handle an object.
|
||||
type listFn func(remote string, object *s3.Object, isDirectory bool) error
|
||||
type listFn func(remote string, object *s3.Object, versionID *string, isDirectory bool) error
|
||||
|
||||
// errEndList is a sentinel used to end the list iteration now.
|
||||
// listFn should return it to end the iteration with no errors.
|
||||
var errEndList = errors.New("end list")
|
||||
|
||||
// list lists the objects into the function supplied from
|
||||
// the bucket and directory supplied. The remote has prefix
|
||||
|
@ -2853,13 +3012,17 @@ type listFn func(remote string, object *s3.Object, isDirectory bool) error
|
|||
// bucket to the start.
|
||||
//
|
||||
// Set recurse to read sub directories
|
||||
func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) error {
|
||||
//
|
||||
// if findFile is set it will look for files called (bucket, directory)
|
||||
func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, withVersions bool, hidden bool, findFile bool, fn listFn) error {
|
||||
if !findFile {
|
||||
if prefix != "" {
|
||||
prefix += "/"
|
||||
}
|
||||
if directory != "" {
|
||||
directory += "/"
|
||||
}
|
||||
}
|
||||
delimiter := ""
|
||||
if !recurse {
|
||||
delimiter = "/"
|
||||
|
@ -2891,6 +3054,8 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
|
|||
}
|
||||
var listBucket bucketLister
|
||||
switch {
|
||||
case withVersions:
|
||||
listBucket = f.newVersionsList(&req)
|
||||
case f.opt.ListVersion == 1:
|
||||
listBucket = f.newV1List(&req)
|
||||
default:
|
||||
|
@ -2899,9 +3064,10 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
|
|||
for {
|
||||
var resp *s3.ListObjectsV2Output
|
||||
var err error
|
||||
var versionIDs []*string
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
listBucket.URLEncodeListings(urlEncodeListings)
|
||||
resp, _, err = listBucket.List(ctx)
|
||||
resp, versionIDs, err = listBucket.List(ctx, hidden)
|
||||
if err != nil && !urlEncodeListings {
|
||||
if awsErr, ok := err.(awserr.RequestFailure); ok {
|
||||
if origErr := awsErr.OrigErr(); origErr != nil {
|
||||
|
@ -2959,13 +3125,16 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
|
|||
remote = path.Join(bucket, remote)
|
||||
}
|
||||
remote = strings.TrimSuffix(remote, "/")
|
||||
err = fn(remote, &s3.Object{Key: &remote}, true)
|
||||
err = fn(remote, &s3.Object{Key: &remote}, nil, true)
|
||||
if err != nil {
|
||||
if err == errEndList {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, object := range resp.Contents {
|
||||
for i, object := range resp.Contents {
|
||||
remote := aws.StringValue(object.Key)
|
||||
if urlEncodeListings {
|
||||
remote, err = url.QueryUnescape(remote)
|
||||
|
@ -2988,8 +3157,15 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
|
|||
if isDirectory && object.Size != nil && *object.Size == 0 {
|
||||
continue // skip directory marker
|
||||
}
|
||||
err = fn(remote, object, false)
|
||||
if versionIDs != nil {
|
||||
err = fn(remote, object, versionIDs[i], false)
|
||||
} else {
|
||||
err = fn(remote, object, nil, false)
|
||||
}
|
||||
if err != nil {
|
||||
if err == errEndList {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -3001,7 +3177,7 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
|
|||
}
|
||||
|
||||
// Convert a list item into a DirEntry
|
||||
func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Object, isDirectory bool) (fs.DirEntry, error) {
|
||||
func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Object, versionID *string, isDirectory bool) (fs.DirEntry, error) {
|
||||
if isDirectory {
|
||||
size := int64(0)
|
||||
if object.Size != nil {
|
||||
|
@ -3010,7 +3186,7 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Objec
|
|||
d := fs.NewDir(remote, time.Time{}).SetSize(size)
|
||||
return d, nil
|
||||
}
|
||||
o, err := f.newObjectWithInfo(ctx, remote, object)
|
||||
o, err := f.newObjectWithInfo(ctx, remote, object, versionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -3020,8 +3196,8 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Objec
|
|||
// listDir lists files and directories to out
|
||||
func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) {
|
||||
// List the objects and directories
|
||||
err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error {
|
||||
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
|
||||
err = f.list(ctx, bucket, directory, prefix, addBucket, false, f.opt.Versions, false, false, func(remote string, object *s3.Object, versionID *string, isDirectory bool) error {
|
||||
entry, err := f.itemToDirEntry(ctx, remote, object, versionID, isDirectory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3098,8 +3274,8 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|||
bucket, directory := f.split(dir)
|
||||
list := walk.NewListRHelper(callback)
|
||||
listR := func(bucket, directory, prefix string, addBucket bool) error {
|
||||
return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *s3.Object, isDirectory bool) error {
|
||||
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
|
||||
return f.list(ctx, bucket, directory, prefix, addBucket, true, f.opt.Versions, false, false, func(remote string, object *s3.Object, versionID *string, isDirectory bool) error {
|
||||
entry, err := f.itemToDirEntry(ctx, remote, object, versionID, isDirectory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3254,6 +3430,9 @@ func (f *Fs) copy(ctx context.Context, req *s3.CopyObjectInput, dstBucket, dstPa
|
|||
req.ACL = &f.opt.ACL
|
||||
req.Key = &dstPath
|
||||
source := pathEscape(path.Join(srcBucket, srcPath))
|
||||
if src.versionID != nil {
|
||||
source += fmt.Sprintf("?versionId=%s", *src.versionID)
|
||||
}
|
||||
req.CopySource = &source
|
||||
if f.opt.RequesterPays {
|
||||
req.RequestPayer = aws.String(s3.RequestPayerRequester)
|
||||
|
@ -3448,17 +3627,20 @@ func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration,
|
|||
if strings.HasSuffix(remote, "/") {
|
||||
return "", fs.ErrorCantShareDirectories
|
||||
}
|
||||
if _, err := f.NewObject(ctx, remote); err != nil {
|
||||
obj, err := f.NewObject(ctx, remote)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
o := obj.(*Object)
|
||||
if expire > maxExpireDuration {
|
||||
fs.Logf(f, "Public Link: Reducing expiry to %v as %v is greater than the max time allowed", maxExpireDuration, expire)
|
||||
expire = maxExpireDuration
|
||||
}
|
||||
bucket, bucketPath := f.split(remote)
|
||||
bucket, bucketPath := o.split()
|
||||
httpReq, _ := f.c.GetObjectRequest(&s3.GetObjectInput{
|
||||
Bucket: &bucket,
|
||||
Key: &bucketPath,
|
||||
VersionId: o.versionID,
|
||||
})
|
||||
|
||||
return httpReq.Presign(time.Duration(expire))
|
||||
|
@ -3637,6 +3819,7 @@ func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[str
|
|||
reqCopy := req
|
||||
reqCopy.Bucket = &bucket
|
||||
reqCopy.Key = &bucketPath
|
||||
reqCopy.VersionId = o.versionID
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
_, err = f.c.RestoreObject(&reqCopy)
|
||||
return f.shouldRetry(ctx, err)
|
||||
|
@ -3912,6 +4095,7 @@ func (o *Object) headObject(ctx context.Context) (resp *s3.HeadObjectOutput, err
|
|||
req := s3.HeadObjectInput{
|
||||
Bucket: &bucket,
|
||||
Key: &bucketPath,
|
||||
VersionId: o.versionID,
|
||||
}
|
||||
if o.fs.opt.RequesterPays {
|
||||
req.RequestPayer = aws.String(s3.RequestPayerRequester)
|
||||
|
@ -4153,6 +4337,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
|||
req := s3.GetObjectInput{
|
||||
Bucket: &bucket,
|
||||
Key: &bucketPath,
|
||||
VersionId: o.versionID,
|
||||
}
|
||||
if o.fs.opt.RequesterPays {
|
||||
req.RequestPayer = aws.String(s3.RequestPayerRequester)
|
||||
|
@ -4222,7 +4407,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
|||
|
||||
var warnStreamUpload sync.Once
|
||||
|
||||
func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, err error) {
|
||||
func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, versionID *string, err error) {
|
||||
f := o.fs
|
||||
|
||||
// make concurrency machinery
|
||||
|
@ -4265,7 +4450,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
return etag, fmt.Errorf("multipart upload failed to initialise: %w", err)
|
||||
return etag, nil, fmt.Errorf("multipart upload failed to initialise: %w", err)
|
||||
}
|
||||
uid := cout.UploadId
|
||||
|
||||
|
@ -4338,7 +4523,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
finished = true
|
||||
} else if err != nil {
|
||||
free()
|
||||
return etag, fmt.Errorf("multipart upload failed to read source: %w", err)
|
||||
return etag, nil, fmt.Errorf("multipart upload failed to read source: %w", err)
|
||||
}
|
||||
buf = buf[:n]
|
||||
|
||||
|
@ -4393,7 +4578,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
}
|
||||
err = g.Wait()
|
||||
if err != nil {
|
||||
return etag, err
|
||||
return etag, nil, err
|
||||
}
|
||||
|
||||
// sort the completed parts by part number
|
||||
|
@ -4401,8 +4586,9 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
return *parts[i].PartNumber < *parts[j].PartNumber
|
||||
})
|
||||
|
||||
var resp *s3.CompleteMultipartUploadOutput
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
_, err := f.c.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
|
||||
resp, err = f.c.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
|
||||
Bucket: req.Bucket,
|
||||
Key: req.Key,
|
||||
MultipartUpload: &s3.CompletedMultipartUpload{
|
||||
|
@ -4414,11 +4600,14 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
return etag, fmt.Errorf("multipart upload failed to finalise: %w", err)
|
||||
return etag, nil, fmt.Errorf("multipart upload failed to finalise: %w", err)
|
||||
}
|
||||
hashOfHashes := md5.Sum(md5s)
|
||||
etag = fmt.Sprintf("%s-%d", hex.EncodeToString(hashOfHashes[:]), len(parts))
|
||||
return etag, nil
|
||||
if resp != nil {
|
||||
versionID = resp.VersionId
|
||||
}
|
||||
return etag, versionID, nil
|
||||
}
|
||||
|
||||
// unWrapAwsError unwraps AWS errors, looking for a non AWS error
|
||||
|
@ -4445,7 +4634,7 @@ func unWrapAwsError(err error) (found bool, outErr error) {
|
|||
}
|
||||
|
||||
// Upload a single part using PutObject
|
||||
func (o *Object) uploadSinglepartPutObject(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, err error) {
|
||||
func (o *Object) uploadSinglepartPutObject(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, versionID *string, err error) {
|
||||
r, resp := o.fs.c.PutObjectRequest(req)
|
||||
if req.ContentLength != nil && *req.ContentLength == 0 {
|
||||
// Can't upload zero length files like this for some reason
|
||||
|
@ -4472,15 +4661,18 @@ func (o *Object) uploadSinglepartPutObject(ctx context.Context, req *s3.PutObjec
|
|||
err = newErr
|
||||
}
|
||||
}
|
||||
return etag, lastModified, err
|
||||
return etag, lastModified, nil, err
|
||||
}
|
||||
lastModified = time.Now()
|
||||
if resp != nil {
|
||||
etag = aws.StringValue(resp.ETag)
|
||||
return etag, lastModified, nil
|
||||
versionID = resp.VersionId
|
||||
}
|
||||
return etag, lastModified, versionID, nil
|
||||
}
|
||||
|
||||
// Upload a single part using a presigned request
|
||||
func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, err error) {
|
||||
func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, versionID *string, err error) {
|
||||
// Create the request
|
||||
putObj, _ := o.fs.c.PutObjectRequest(req)
|
||||
|
||||
|
@ -4490,7 +4682,7 @@ func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.P
|
|||
// PutObject so we used this work-around.
|
||||
url, headers, err := putObj.PresignRequest(15 * time.Minute)
|
||||
if err != nil {
|
||||
return etag, lastModified, fmt.Errorf("s3 upload: sign request: %w", err)
|
||||
return etag, lastModified, nil, fmt.Errorf("s3 upload: sign request: %w", err)
|
||||
}
|
||||
|
||||
if o.fs.opt.V2Auth && headers == nil {
|
||||
|
@ -4505,7 +4697,7 @@ func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.P
|
|||
// create the vanilla http request
|
||||
httpReq, err := http.NewRequestWithContext(ctx, "PUT", url, in)
|
||||
if err != nil {
|
||||
return etag, lastModified, fmt.Errorf("s3 upload: new request: %w", err)
|
||||
return etag, lastModified, nil, fmt.Errorf("s3 upload: new request: %w", err)
|
||||
}
|
||||
|
||||
// set the headers we signed and the length
|
||||
|
@ -4530,15 +4722,19 @@ func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.P
|
|||
return fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
|
||||
})
|
||||
if err != nil {
|
||||
return etag, lastModified, err
|
||||
return etag, lastModified, nil, err
|
||||
}
|
||||
if resp != nil {
|
||||
if date, err := http.ParseTime(resp.Header.Get("Date")); err != nil {
|
||||
lastModified = date
|
||||
}
|
||||
etag = resp.Header.Get("Etag")
|
||||
vID := resp.Header.Get("x-amz-version-id")
|
||||
if vID != "" {
|
||||
versionID = &vID
|
||||
}
|
||||
return etag, lastModified, nil
|
||||
}
|
||||
return etag, lastModified, versionID, nil
|
||||
}
|
||||
|
||||
// Update the Object from in with modTime and size
|
||||
|
@ -4692,18 +4888,20 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
var wantETag string // Multipart upload Etag to check
|
||||
var gotEtag string // Etag we got from the upload
|
||||
var lastModified time.Time // Time we got from the upload
|
||||
var versionID *string // versionID we got from the upload
|
||||
if multipart {
|
||||
wantETag, err = o.uploadMultipart(ctx, &req, size, in)
|
||||
wantETag, versionID, err = o.uploadMultipart(ctx, &req, size, in)
|
||||
} else {
|
||||
if o.fs.opt.UsePresignedRequest {
|
||||
gotEtag, lastModified, err = o.uploadSinglepartPresignedRequest(ctx, &req, size, in)
|
||||
gotEtag, lastModified, versionID, err = o.uploadSinglepartPresignedRequest(ctx, &req, size, in)
|
||||
} else {
|
||||
gotEtag, lastModified, err = o.uploadSinglepartPutObject(ctx, &req, size, in)
|
||||
gotEtag, lastModified, versionID, err = o.uploadSinglepartPutObject(ctx, &req, size, in)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.versionID = versionID
|
||||
|
||||
// User requested we don't HEAD the object after uploading it
|
||||
// so make up the object as best we can assuming it got
|
||||
|
@ -4721,6 +4919,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
lastModified = time.Now()
|
||||
}
|
||||
head.LastModified = &lastModified
|
||||
head.VersionId = versionID
|
||||
o.setMetaData(&head)
|
||||
return nil
|
||||
}
|
||||
|
@ -4748,6 +4947,7 @@ func (o *Object) Remove(ctx context.Context) error {
|
|||
req := s3.DeleteObjectInput{
|
||||
Bucket: &bucket,
|
||||
Key: &bucketPath,
|
||||
VersionId: o.versionID,
|
||||
}
|
||||
if o.fs.opt.RequesterPays {
|
||||
req.RequestPayer = aws.String(s3.RequestPayerRequester)
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/fstest/fstests"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
"github.com/rclone/rclone/lib/version"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -84,9 +85,85 @@ func (f *Fs) InternalTestNoHead(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func (f *Fs) InternalTestVersions(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Enable versioning for this bucket during this test
|
||||
_, err := f.setGetVersioning(ctx, "Enabled")
|
||||
if err != nil {
|
||||
t.Skipf("Couldn't enable versioning: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
// Disable versioning for this bucket
|
||||
_, err := f.setGetVersioning(ctx, "Suspended")
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
// Create an object
|
||||
const fileName = "test-versions.txt"
|
||||
contents := random.String(100)
|
||||
item := fstest.NewItem(fileName, contents, fstest.Time("2001-05-06T04:05:06.499999999Z"))
|
||||
obj := fstests.PutTestContents(ctx, t, f, &item, contents, true)
|
||||
defer func() {
|
||||
assert.NoError(t, obj.Remove(ctx))
|
||||
}()
|
||||
|
||||
// Remove it
|
||||
assert.NoError(t, obj.Remove(ctx))
|
||||
|
||||
// And create it with different size and contents
|
||||
newContents := random.String(101)
|
||||
newItem := fstest.NewItem(fileName, newContents, fstest.Time("2002-05-06T04:05:06.499999999Z"))
|
||||
_ = fstests.PutTestContents(ctx, t, f, &newItem, newContents, true)
|
||||
|
||||
// Add the expected version suffix to the old version
|
||||
item.Path = version.Add(item.Path, obj.(*Object).lastModified)
|
||||
|
||||
t.Run("S3Version", func(t *testing.T) {
|
||||
// Set --s3-versions for this test
|
||||
f.opt.Versions = true
|
||||
defer func() {
|
||||
f.opt.Versions = false
|
||||
}()
|
||||
|
||||
// Check listing
|
||||
items := append([]fstest.Item{item, newItem}, fstests.InternalTestFiles...)
|
||||
fstest.CheckListing(t, f, items)
|
||||
|
||||
// Read the contents
|
||||
entries, err := f.List(ctx, "")
|
||||
require.NoError(t, err)
|
||||
tests := 0
|
||||
for _, entry := range entries {
|
||||
switch entry.Remote() {
|
||||
case newItem.Path:
|
||||
t.Run("ReadCurrent", func(t *testing.T) {
|
||||
assert.Equal(t, newContents, fstests.ReadObject(ctx, t, entry.(fs.Object), -1))
|
||||
})
|
||||
tests++
|
||||
case item.Path:
|
||||
t.Run("ReadVersion", func(t *testing.T) {
|
||||
assert.Equal(t, contents, fstests.ReadObject(ctx, t, entry.(fs.Object), -1))
|
||||
})
|
||||
tests++
|
||||
}
|
||||
}
|
||||
assert.Equal(t, 2, tests)
|
||||
|
||||
// Check we can read the object with a version suffix
|
||||
t.Run("NewObject", func(t *testing.T) {
|
||||
o, err := f.NewObject(ctx, item.Path)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, o)
|
||||
assert.Equal(t, int64(100), o.Size(), o.Remote())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (f *Fs) InternalTest(t *testing.T) {
|
||||
t.Run("Metadata", f.InternalTestMetadata)
|
||||
t.Run("NoHead", f.InternalTestNoHead)
|
||||
t.Run("Versions", f.InternalTestVersions)
|
||||
}
|
||||
|
||||
var _ fstests.InternalTester = (*Fs)(nil)
|
||||
|
|
|
@ -384,6 +384,68 @@ This will mean that these objects do not have an MD5 checksum.
|
|||
Note that reading this from the object takes an additional `HEAD`
|
||||
request as the metadata isn't returned in object listings.
|
||||
|
||||
### Versions
|
||||
|
||||
When bucket versioning is enabled (this can be done with rclone with
|
||||
the [`rclone backend versioning`](#versioning) command) when rclone
|
||||
uploads a new version of a file it creates a
|
||||
[new version of it](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Versioning.html)
|
||||
Likewise when you delete a file, the old version will be marked hidden
|
||||
and still be available.
|
||||
|
||||
Old versions of files, where available, are visible using the
|
||||
`--s3-versions` flag.
|
||||
|
||||
If you wish to remove all the old versions then you can use the
|
||||
[`rclone backend cleanup-hidden remote:bucket`](#cleanup-hidden)
|
||||
command which will delete all the old hidden versions of files,
|
||||
leaving the current ones intact. You can also supply a path and only
|
||||
old versions under that path will be deleted, e.g.
|
||||
`rclone backend cleanup-hidden remote:bucket/path/to/stuff`.
|
||||
|
||||
When you `purge` a bucket, the current and the old versions will be
|
||||
deleted then the bucket will be deleted.
|
||||
|
||||
However `delete` will cause the current versions of the files to
|
||||
become hidden old versions.
|
||||
|
||||
Here is a session showing the listing and retrieval of an old
|
||||
version followed by a `cleanup` of the old versions.
|
||||
|
||||
Show current version and all the versions with `--s3-versions` flag.
|
||||
|
||||
```
|
||||
$ rclone -q ls s3:cleanup-test
|
||||
9 one.txt
|
||||
|
||||
$ rclone -q --s3-versions ls s3:cleanup-test
|
||||
9 one.txt
|
||||
8 one-v2016-07-04-141032-000.txt
|
||||
16 one-v2016-07-04-141003-000.txt
|
||||
15 one-v2016-07-02-155621-000.txt
|
||||
```
|
||||
|
||||
Retrieve an old version
|
||||
|
||||
```
|
||||
$ rclone -q --s3-versions copy s3:cleanup-test/one-v2016-07-04-141003-000.txt /tmp
|
||||
|
||||
$ ls -l /tmp/one-v2016-07-04-141003-000.txt
|
||||
-rw-rw-r-- 1 ncw ncw 16 Jul 2 17:46 /tmp/one-v2016-07-04-141003-000.txt
|
||||
```
|
||||
|
||||
Clean up all the old versions and show that they've gone.
|
||||
|
||||
```
|
||||
$ rclone -q backend cleanup-hidden s3:cleanup-test
|
||||
|
||||
$ rclone -q ls s3:cleanup-test
|
||||
9 one.txt
|
||||
|
||||
$ rclone -q --s3-versions ls s3:cleanup-test
|
||||
9 one.txt
|
||||
```
|
||||
|
||||
### Cleanup
|
||||
|
||||
If you run `rclone cleanup s3:bucket` then it will remove all pending
|
||||
|
@ -2562,6 +2624,17 @@ Properties:
|
|||
- Type: bool
|
||||
- Default: false
|
||||
|
||||
#### --s3-versions
|
||||
|
||||
Include old versions in directory listings.
|
||||
|
||||
Properties:
|
||||
|
||||
- Config: versions
|
||||
- Env Var: RCLONE_S3_VERSIONS
|
||||
- Type: bool
|
||||
- Default: false
|
||||
|
||||
### Metadata
|
||||
|
||||
User metadata is stored as x-amz-meta- keys. S3 metadata keys are case insensitive and are always returned in lower case.
|
||||
|
|
Loading…
Reference in a new issue