s3: fix SetModTime on GLACIER/ARCHIVE objects and implement set/get tier
- Read the storage class for each object
- Implement SetTier/GetTier
- Check the storage class on the **object** before using SetModTime
This updates the fix in 1a2fb52
so that SetModTime works when you are
using objects which have been migrated to GLACIER but you aren't using
GLACIER as a storage class.
Fixes #3522
This commit is contained in:
parent
23dc313fa5
commit
25786cafd3
2 changed files with 68 additions and 47 deletions
110
backend/s3/s3.go
110
backend/s3/s3.go
|
@ -818,6 +818,7 @@ type Object struct {
|
||||||
lastModified time.Time // Last modified
|
lastModified time.Time // Last modified
|
||||||
meta map[string]*string // The object metadata if known - may be nil
|
meta map[string]*string // The object metadata if known - may be nil
|
||||||
mimeType string // MimeType of object - may be ""
|
mimeType string // MimeType of object - may be ""
|
||||||
|
storageClass string // eg GLACIER
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
@ -1089,6 +1090,8 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
WriteMimeType: true,
|
WriteMimeType: true,
|
||||||
BucketBased: true,
|
BucketBased: true,
|
||||||
BucketBasedRootOK: true,
|
BucketBasedRootOK: true,
|
||||||
|
SetTier: true,
|
||||||
|
GetTier: true,
|
||||||
}).Fill(f)
|
}).Fill(f)
|
||||||
if f.rootBucket != "" && f.rootDirectory != "" {
|
if f.rootBucket != "" && f.rootDirectory != "" {
|
||||||
// Check to see if the object exists
|
// Check to see if the object exists
|
||||||
|
@ -1132,6 +1135,7 @@ func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Obje
|
||||||
}
|
}
|
||||||
o.etag = aws.StringValue(info.ETag)
|
o.etag = aws.StringValue(info.ETag)
|
||||||
o.bytes = aws.Int64Value(info.Size)
|
o.bytes = aws.Int64Value(info.Size)
|
||||||
|
o.storageClass = aws.StringValue(info.StorageClass)
|
||||||
} else {
|
} else {
|
||||||
err := o.readMetaData(ctx) // reads info and meta, returning an error
|
err := o.readMetaData(ctx) // reads info and meta, returning an error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1550,6 +1554,31 @@ func pathEscape(s string) string {
|
||||||
return strings.Replace(rest.URLPathEscape(s), "+", "%2B", -1)
|
return strings.Replace(rest.URLPathEscape(s), "+", "%2B", -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// copy does a server side copy
|
||||||
|
//
|
||||||
|
// It adds the boiler plate to the req passed in and calls the s3
|
||||||
|
// method
|
||||||
|
func (f *Fs) copy(ctx context.Context, req *s3.CopyObjectInput, dstBucket, dstPath, srcBucket, srcPath string) error {
|
||||||
|
req.Bucket = &dstBucket
|
||||||
|
req.ACL = &f.opt.ACL
|
||||||
|
req.Key = &dstPath
|
||||||
|
source := pathEscape(path.Join(srcBucket, srcPath))
|
||||||
|
req.CopySource = &source
|
||||||
|
if f.opt.ServerSideEncryption != "" {
|
||||||
|
req.ServerSideEncryption = &f.opt.ServerSideEncryption
|
||||||
|
}
|
||||||
|
if f.opt.SSEKMSKeyID != "" {
|
||||||
|
req.SSEKMSKeyId = &f.opt.SSEKMSKeyID
|
||||||
|
}
|
||||||
|
if req.StorageClass == nil && f.opt.StorageClass != "" {
|
||||||
|
req.StorageClass = &f.opt.StorageClass
|
||||||
|
}
|
||||||
|
return f.pacer.Call(func() (bool, error) {
|
||||||
|
_, err := f.c.CopyObjectWithContext(ctx, req)
|
||||||
|
return f.shouldRetry(err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Copy src to this remote using server side copy operations.
|
// Copy src to this remote using server side copy operations.
|
||||||
//
|
//
|
||||||
// This is stored with the remote path given
|
// This is stored with the remote path given
|
||||||
|
@ -1571,27 +1600,10 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
||||||
return nil, fs.ErrorCantCopy
|
return nil, fs.ErrorCantCopy
|
||||||
}
|
}
|
||||||
srcBucket, srcPath := srcObj.split()
|
srcBucket, srcPath := srcObj.split()
|
||||||
source := pathEscape(path.Join(srcBucket, srcPath))
|
|
||||||
req := s3.CopyObjectInput{
|
req := s3.CopyObjectInput{
|
||||||
Bucket: &dstBucket,
|
|
||||||
ACL: &f.opt.ACL,
|
|
||||||
Key: &dstPath,
|
|
||||||
CopySource: &source,
|
|
||||||
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
|
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
|
||||||
}
|
}
|
||||||
if f.opt.ServerSideEncryption != "" {
|
err = f.copy(ctx, &req, dstBucket, dstPath, srcBucket, srcPath)
|
||||||
req.ServerSideEncryption = &f.opt.ServerSideEncryption
|
|
||||||
}
|
|
||||||
if f.opt.SSEKMSKeyID != "" {
|
|
||||||
req.SSEKMSKeyId = &f.opt.SSEKMSKeyID
|
|
||||||
}
|
|
||||||
if f.opt.StorageClass != "" {
|
|
||||||
req.StorageClass = &f.opt.StorageClass
|
|
||||||
}
|
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
|
||||||
_, err = f.c.CopyObjectWithContext(ctx, &req)
|
|
||||||
return f.shouldRetry(err)
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1691,6 +1703,7 @@ func (o *Object) readMetaData(ctx context.Context) (err error) {
|
||||||
o.etag = aws.StringValue(resp.ETag)
|
o.etag = aws.StringValue(resp.ETag)
|
||||||
o.bytes = size
|
o.bytes = size
|
||||||
o.meta = resp.Metadata
|
o.meta = resp.Metadata
|
||||||
|
o.storageClass = aws.StringValue(resp.StorageClass)
|
||||||
if resp.LastModified == nil {
|
if resp.LastModified == nil {
|
||||||
fs.Logf(o, "Failed to read last modified from HEAD: %v", err)
|
fs.Logf(o, "Failed to read last modified from HEAD: %v", err)
|
||||||
o.lastModified = time.Now()
|
o.lastModified = time.Now()
|
||||||
|
@ -1741,39 +1754,19 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Guess the content type
|
// Can't update metadata here, so return this error to force a recopy
|
||||||
mimeType := fs.MimeType(ctx, o)
|
if o.storageClass == "GLACIER" || o.storageClass == "DEEP_ARCHIVE" {
|
||||||
|
return fs.ErrorCantSetModTime
|
||||||
|
}
|
||||||
|
|
||||||
// Copy the object to itself to update the metadata
|
// Copy the object to itself to update the metadata
|
||||||
bucket, bucketPath := o.split()
|
bucket, bucketPath := o.split()
|
||||||
sourceKey := path.Join(bucket, bucketPath)
|
|
||||||
directive := s3.MetadataDirectiveReplace // replace metadata with that passed in
|
|
||||||
req := s3.CopyObjectInput{
|
req := s3.CopyObjectInput{
|
||||||
Bucket: &bucket,
|
ContentType: aws.String(fs.MimeType(ctx, o)), // Guess the content type
|
||||||
ACL: &o.fs.opt.ACL,
|
|
||||||
Key: &bucketPath,
|
|
||||||
ContentType: &mimeType,
|
|
||||||
CopySource: aws.String(pathEscape(sourceKey)),
|
|
||||||
Metadata: o.meta,
|
Metadata: o.meta,
|
||||||
MetadataDirective: &directive,
|
MetadataDirective: aws.String(s3.MetadataDirectiveReplace), // replace metadata with that passed in
|
||||||
}
|
}
|
||||||
if o.fs.opt.ServerSideEncryption != "" {
|
return o.fs.copy(ctx, &req, bucket, bucketPath, bucket, bucketPath)
|
||||||
req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption
|
|
||||||
}
|
|
||||||
if o.fs.opt.SSEKMSKeyID != "" {
|
|
||||||
req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID
|
|
||||||
}
|
|
||||||
if o.fs.opt.StorageClass == "GLACIER" || o.fs.opt.StorageClass == "DEEP_ARCHIVE" {
|
|
||||||
return fs.ErrorCantSetModTime
|
|
||||||
}
|
|
||||||
if o.fs.opt.StorageClass != "" {
|
|
||||||
req.StorageClass = &o.fs.opt.StorageClass
|
|
||||||
}
|
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
|
||||||
_, err := o.fs.c.CopyObjectWithContext(ctx, &req)
|
|
||||||
return o.fs.shouldRetry(err)
|
|
||||||
})
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Storable raturns a boolean indicating if this object is storable
|
// Storable raturns a boolean indicating if this object is storable
|
||||||
|
@ -1998,6 +1991,31 @@ func (o *Object) MimeType(ctx context.Context) string {
|
||||||
return o.mimeType
|
return o.mimeType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetTier performs changing storage class
|
||||||
|
func (o *Object) SetTier(tier string) (err error) {
|
||||||
|
ctx := context.TODO()
|
||||||
|
tier = strings.ToUpper(tier)
|
||||||
|
bucket, bucketPath := o.split()
|
||||||
|
req := s3.CopyObjectInput{
|
||||||
|
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
|
||||||
|
StorageClass: aws.String(tier),
|
||||||
|
}
|
||||||
|
err = o.fs.copy(ctx, &req, bucket, bucketPath, bucket, bucketPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
o.storageClass = tier
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTier returns storage class as string
|
||||||
|
func (o *Object) GetTier() string {
|
||||||
|
if o.storageClass == "" {
|
||||||
|
return "STANDARD"
|
||||||
|
}
|
||||||
|
return o.storageClass
|
||||||
|
}
|
||||||
|
|
||||||
// Check the interfaces are satisfied
|
// Check the interfaces are satisfied
|
||||||
var (
|
var (
|
||||||
_ fs.Fs = &Fs{}
|
_ fs.Fs = &Fs{}
|
||||||
|
@ -2006,4 +2024,6 @@ var (
|
||||||
_ fs.ListRer = &Fs{}
|
_ fs.ListRer = &Fs{}
|
||||||
_ fs.Object = &Object{}
|
_ fs.Object = &Object{}
|
||||||
_ fs.MimeTyper = &Object{}
|
_ fs.MimeTyper = &Object{}
|
||||||
|
_ fs.GetTierer = &Object{}
|
||||||
|
_ fs.SetTierer = &Object{}
|
||||||
)
|
)
|
||||||
|
|
|
@ -11,8 +11,9 @@ import (
|
||||||
// TestIntegration runs integration tests against the remote
|
// TestIntegration runs integration tests against the remote
|
||||||
func TestIntegration(t *testing.T) {
|
func TestIntegration(t *testing.T) {
|
||||||
fstests.Run(t, &fstests.Opt{
|
fstests.Run(t, &fstests.Opt{
|
||||||
RemoteName: "TestS3:",
|
RemoteName: "TestS3:",
|
||||||
NilObject: (*Object)(nil),
|
NilObject: (*Object)(nil),
|
||||||
|
TiersToTest: []string{"STANDARD", "STANDARD_IA"},
|
||||||
ChunkedUpload: fstests.ChunkedUploadConfig{
|
ChunkedUpload: fstests.ChunkedUploadConfig{
|
||||||
MinChunkSize: minChunkSize,
|
MinChunkSize: minChunkSize,
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in a new issue