rclone/s3/s3.go
Nick Craig-Wood 255d3e925d s3: fix crash if a bad listing is received - fixes #1927
Caringo Swarm is returning a listing with IsTruncated set but no
NextMarker and no Keys.  Rclone doesn't know how to continue the
listing at this point, so it returns an error rather than truncating
the listing or risking a loop.
2017-12-20 16:51:07 +00:00

1073 lines
30 KiB
Go

// Package s3 provides an interface to Amazon S3 oject storage
package s3
// FIXME need to prevent anything but ListDir working for s3://
/*
Progress of port to aws-sdk
* Don't really need o.meta at all?
What happens if you CTRL-C a multipart upload
* get an incomplete upload
* disappears when you delete the bucket
*/
import (
"fmt"
"io"
"net/http"
"net/url"
"path"
"regexp"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/corehandlers"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/ncw/rclone/fs"
"github.com/ncw/swift"
"github.com/pkg/errors"
)
// Register with Fs
func init() {
fs.Register(&fs.RegInfo{
Name: "s3",
Description: "Amazon S3 (also Dreamhost, Ceph, Minio)",
NewFs: NewFs,
// AWS endpoints: http://docs.amazonwebservices.com/general/latest/gr/rande.html#s3_region
Options: []fs.Option{{
Name: "env_auth",
Help: "Get AWS credentials from runtime (environment variables or EC2/ECS meta data if no env vars). Only applies if access_key_id and secret_access_key is blank.",
Examples: []fs.OptionExample{
{
Value: "false",
Help: "Enter AWS credentials in the next step",
}, {
Value: "true",
Help: "Get AWS credentials from the environment (env vars or IAM)",
},
},
}, {
Name: "access_key_id",
Help: "AWS Access Key ID - leave blank for anonymous access or runtime credentials.",
}, {
Name: "secret_access_key",
Help: "AWS Secret Access Key (password) - leave blank for anonymous access or runtime credentials.",
}, {
Name: "region",
Help: "Region to connect to.",
Examples: []fs.OptionExample{{
Value: "us-east-1",
Help: "The default endpoint - a good choice if you are unsure.\nUS Region, Northern Virginia or Pacific Northwest.\nLeave location constraint empty.",
}, {
Value: "us-east-2",
Help: "US East (Ohio) Region\nNeeds location constraint us-east-2.",
}, {
Value: "us-west-2",
Help: "US West (Oregon) Region\nNeeds location constraint us-west-2.",
}, {
Value: "us-west-1",
Help: "US West (Northern California) Region\nNeeds location constraint us-west-1.",
}, {
Value: "ca-central-1",
Help: "Canada (Central) Region\nNeeds location constraint ca-central-1.",
}, {
Value: "eu-west-1",
Help: "EU (Ireland) Region\nNeeds location constraint EU or eu-west-1.",
}, {
Value: "eu-west-2",
Help: "EU (London) Region\nNeeds location constraint eu-west-2.",
}, {
Value: "eu-central-1",
Help: "EU (Frankfurt) Region\nNeeds location constraint eu-central-1.",
}, {
Value: "ap-southeast-1",
Help: "Asia Pacific (Singapore) Region\nNeeds location constraint ap-southeast-1.",
}, {
Value: "ap-southeast-2",
Help: "Asia Pacific (Sydney) Region\nNeeds location constraint ap-southeast-2.",
}, {
Value: "ap-northeast-1",
Help: "Asia Pacific (Tokyo) Region\nNeeds location constraint ap-northeast-1.",
}, {
Value: "ap-northeast-2",
Help: "Asia Pacific (Seoul)\nNeeds location constraint ap-northeast-2.",
}, {
Value: "ap-south-1",
Help: "Asia Pacific (Mumbai)\nNeeds location constraint ap-south-1.",
}, {
Value: "sa-east-1",
Help: "South America (Sao Paulo) Region\nNeeds location constraint sa-east-1.",
}, {
Value: "other-v2-signature",
Help: "If using an S3 clone that only understands v2 signatures\neg Ceph/Dreamhost\nset this and make sure you set the endpoint.",
}, {
Value: "other-v4-signature",
Help: "If using an S3 clone that understands v4 signatures set this\nand make sure you set the endpoint.",
}},
}, {
Name: "endpoint",
Help: "Endpoint for S3 API.\nLeave blank if using AWS to use the default endpoint for the region.\nSpecify if using an S3 clone such as Ceph.",
}, {
Name: "location_constraint",
Help: "Location constraint - must be set to match the Region. Used when creating buckets only.",
Examples: []fs.OptionExample{{
Value: "",
Help: "Empty for US Region, Northern Virginia or Pacific Northwest.",
}, {
Value: "us-east-2",
Help: "US East (Ohio) Region.",
}, {
Value: "us-west-2",
Help: "US West (Oregon) Region.",
}, {
Value: "us-west-1",
Help: "US West (Northern California) Region.",
}, {
Value: "ca-central-1",
Help: "Canada (Central) Region.",
}, {
Value: "eu-west-1",
Help: "EU (Ireland) Region.",
}, {
Value: "eu-west-2",
Help: "EU (London) Region.",
}, {
Value: "EU",
Help: "EU Region.",
}, {
Value: "ap-southeast-1",
Help: "Asia Pacific (Singapore) Region.",
}, {
Value: "ap-southeast-2",
Help: "Asia Pacific (Sydney) Region.",
}, {
Value: "ap-northeast-1",
Help: "Asia Pacific (Tokyo) Region.",
}, {
Value: "ap-northeast-2",
Help: "Asia Pacific (Seoul)",
}, {
Value: "ap-south-1",
Help: "Asia Pacific (Mumbai)",
}, {
Value: "sa-east-1",
Help: "South America (Sao Paulo) Region.",
}},
}, {
Name: "acl",
Help: "Canned ACL used when creating buckets and/or storing objects in S3.\nFor more info visit https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl",
Examples: []fs.OptionExample{{
Value: "private",
Help: "Owner gets FULL_CONTROL. No one else has access rights (default).",
}, {
Value: "public-read",
Help: "Owner gets FULL_CONTROL. The AllUsers group gets READ access.",
}, {
Value: "public-read-write",
Help: "Owner gets FULL_CONTROL. The AllUsers group gets READ and WRITE access.\nGranting this on a bucket is generally not recommended.",
}, {
Value: "authenticated-read",
Help: "Owner gets FULL_CONTROL. The AuthenticatedUsers group gets READ access.",
}, {
Value: "bucket-owner-read",
Help: "Object owner gets FULL_CONTROL. Bucket owner gets READ access.\nIf you specify this canned ACL when creating a bucket, Amazon S3 ignores it.",
}, {
Value: "bucket-owner-full-control",
Help: "Both the object owner and the bucket owner get FULL_CONTROL over the object.\nIf you specify this canned ACL when creating a bucket, Amazon S3 ignores it.",
}},
}, {
Name: "server_side_encryption",
Help: "The server-side encryption algorithm used when storing this object in S3.",
Examples: []fs.OptionExample{{
Value: "",
Help: "None",
}, {
Value: "AES256",
Help: "AES256",
}},
}, {
Name: "storage_class",
Help: "The storage class to use when storing objects in S3.",
Examples: []fs.OptionExample{{
Value: "",
Help: "Default",
}, {
Value: "STANDARD",
Help: "Standard storage class",
}, {
Value: "REDUCED_REDUNDANCY",
Help: "Reduced redundancy storage class",
}, {
Value: "STANDARD_IA",
Help: "Standard Infrequent Access storage class",
}},
}},
})
}
// Constants
const (
metaMtime = "Mtime" // the meta key to store mtime in - eg X-Amz-Meta-Mtime
listChunkSize = 1000 // number of items to read at once
maxRetries = 10 // number of retries to make of operations
maxSizeForCopy = 5 * 1024 * 1024 * 1024 // The maximum size of object we can COPY
maxFileSize = 5 * 1024 * 1024 * 1024 * 1024 // largest possible upload file size
)
// Globals
var (
// Flags
s3ACL = fs.StringP("s3-acl", "", "", "Canned ACL used when creating buckets and/or storing objects in S3")
s3StorageClass = fs.StringP("s3-storage-class", "", "", "Storage class to use when uploading S3 objects (STANDARD|REDUCED_REDUNDANCY|STANDARD_IA)")
)
// Fs represents a remote s3 server
type Fs struct {
name string // the name of the remote
root string // root of the bucket - ignore all objects above this
features *fs.Features // optional features
c *s3.S3 // the connection to the s3 server
ses *session.Session // the s3 session
bucket string // the bucket we are working on
bucketOKMu sync.Mutex // mutex to protect bucket OK
bucketOK bool // true if we have created the bucket
bucketDeleted bool // true if we have deleted the bucket
acl string // ACL for new buckets / objects
locationConstraint string // location constraint of new buckets
sse string // the type of server-side encryption
storageClass string // storage class
}
// Object describes a s3 object
type Object struct {
// Will definitely have everything but meta which may be nil
//
// List will read everything but meta & mimeType - to fill
// that in you need to call readMetaData
fs *Fs // what this object is part of
remote string // The remote path
etag string // md5sum of the object
bytes int64 // size of the object
lastModified time.Time // Last modified
meta map[string]*string // The object metadata if known - may be nil
mimeType string // MimeType of object - may be ""
}
// ------------------------------------------------------------
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
}
// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
if f.root == "" {
return f.bucket
}
return f.bucket + "/" + f.root
}
// String converts this Fs to a string
func (f *Fs) String() string {
if f.root == "" {
return fmt.Sprintf("S3 bucket %s", f.bucket)
}
return fmt.Sprintf("S3 bucket %s path %s", f.bucket, f.root)
}
// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
return f.features
}
// Pattern to match a s3 path
var matcher = regexp.MustCompile(`^([^/]*)(.*)$`)
// parseParse parses a s3 'url'
func s3ParsePath(path string) (bucket, directory string, err error) {
parts := matcher.FindStringSubmatch(path)
if parts == nil {
err = errors.Errorf("couldn't parse bucket out of s3 path %q", path)
} else {
bucket, directory = parts[1], parts[2]
directory = strings.Trim(directory, "/")
}
return
}
// s3Connection makes a connection to s3
func s3Connection(name string) (*s3.S3, *session.Session, error) {
// Make the auth
v := credentials.Value{
AccessKeyID: fs.ConfigFileGet(name, "access_key_id"),
SecretAccessKey: fs.ConfigFileGet(name, "secret_access_key"),
SessionToken: fs.ConfigFileGet(name, "session_token"),
}
lowTimeoutClient := &http.Client{Timeout: 1 * time.Second} // low timeout to ec2 metadata service
def := defaults.Get()
def.Config.HTTPClient = lowTimeoutClient
// first provider to supply a credential set "wins"
providers := []credentials.Provider{
// use static credentials if they're present (checked by provider)
&credentials.StaticProvider{Value: v},
// * Access Key ID: AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY
// * Secret Access Key: AWS_SECRET_ACCESS_KEY or AWS_SECRET_KEY
&credentials.EnvProvider{},
// Pick up IAM role if we're in an ECS task
defaults.RemoteCredProvider(*def.Config, def.Handlers),
// Pick up IAM role in case we're on EC2
&ec2rolecreds.EC2RoleProvider{
Client: ec2metadata.New(session.New(), &aws.Config{
HTTPClient: lowTimeoutClient,
}),
ExpiryWindow: 3,
},
}
cred := credentials.NewChainCredentials(providers)
switch {
case fs.ConfigFileGetBool(name, "env_auth", false):
// No need for empty checks if "env_auth" is true
case v.AccessKeyID == "" && v.SecretAccessKey == "":
// if no access key/secret and iam is explicitly disabled then fall back to anon interaction
cred = credentials.AnonymousCredentials
case v.AccessKeyID == "":
return nil, nil, errors.New("access_key_id not found")
case v.SecretAccessKey == "":
return nil, nil, errors.New("secret_access_key not found")
}
endpoint := fs.ConfigFileGet(name, "endpoint")
region := fs.ConfigFileGet(name, "region")
if region == "" && endpoint == "" {
endpoint = "https://s3.amazonaws.com/"
}
if region == "" {
region = "us-east-1"
}
awsConfig := aws.NewConfig().
WithRegion(region).
WithMaxRetries(maxRetries).
WithCredentials(cred).
WithEndpoint(endpoint).
WithHTTPClient(fs.Config.Client()).
WithS3ForcePathStyle(true)
// awsConfig.WithLogLevel(aws.LogDebugWithSigning)
ses := session.New()
c := s3.New(ses, awsConfig)
if region == "other-v2-signature" {
fs.Debugf(name, "Using v2 auth")
signer := func(req *request.Request) {
// Ignore AnonymousCredentials object
if req.Config.Credentials == credentials.AnonymousCredentials {
return
}
sign(v.AccessKeyID, v.SecretAccessKey, req.HTTPRequest)
}
c.Handlers.Sign.Clear()
c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler)
c.Handlers.Sign.PushBack(signer)
}
return c, ses, nil
}
// NewFs constructs an Fs from the path, bucket:path
func NewFs(name, root string) (fs.Fs, error) {
bucket, directory, err := s3ParsePath(root)
if err != nil {
return nil, err
}
c, ses, err := s3Connection(name)
if err != nil {
return nil, err
}
f := &Fs{
name: name,
c: c,
bucket: bucket,
ses: ses,
acl: fs.ConfigFileGet(name, "acl"),
root: directory,
locationConstraint: fs.ConfigFileGet(name, "location_constraint"),
sse: fs.ConfigFileGet(name, "server_side_encryption"),
storageClass: fs.ConfigFileGet(name, "storage_class"),
}
f.features = (&fs.Features{
ReadMimeType: true,
WriteMimeType: true,
BucketBased: true,
}).Fill(f)
if *s3ACL != "" {
f.acl = *s3ACL
}
if *s3StorageClass != "" {
f.storageClass = *s3StorageClass
}
if f.root != "" {
f.root += "/"
// Check to see if the object exists
req := s3.HeadObjectInput{
Bucket: &f.bucket,
Key: &directory,
}
_, err = f.c.HeadObject(&req)
if err == nil {
f.root = path.Dir(directory)
if f.root == "." {
f.root = ""
} else {
f.root += "/"
}
// return an error with an fs which points to the parent
return f, fs.ErrorIsFile
}
}
// f.listMultipartUploads()
return f, nil
}
// Return an Object from a path
//
//If it can't be found it returns the error ErrorObjectNotFound.
func (f *Fs) newObjectWithInfo(remote string, info *s3.Object) (fs.Object, error) {
o := &Object{
fs: f,
remote: remote,
}
if info != nil {
// Set info but not meta
if info.LastModified == nil {
fs.Logf(o, "Failed to read last modified")
o.lastModified = time.Now()
} else {
o.lastModified = *info.LastModified
}
o.etag = aws.StringValue(info.ETag)
o.bytes = aws.Int64Value(info.Size)
} else {
err := o.readMetaData() // reads info and meta, returning an error
if err != nil {
return nil, err
}
}
return o, nil
}
// NewObject finds the Object at remote. If it can't be found
// it returns the error fs.ErrorObjectNotFound.
func (f *Fs) NewObject(remote string) (fs.Object, error) {
return f.newObjectWithInfo(remote, nil)
}
// listFn is called from list to handle an object.
type listFn func(remote string, object *s3.Object, isDirectory bool) error
// list the objects into the function supplied
//
// dir is the starting directory, "" for root
//
// Set recurse to read sub directories
func (f *Fs) list(dir string, recurse bool, fn listFn) error {
root := f.root
if dir != "" {
root += dir + "/"
}
maxKeys := int64(listChunkSize)
delimiter := ""
if !recurse {
delimiter = "/"
}
var marker *string
for {
// FIXME need to implement ALL loop
req := s3.ListObjectsInput{
Bucket: &f.bucket,
Delimiter: &delimiter,
Prefix: &root,
MaxKeys: &maxKeys,
Marker: marker,
}
resp, err := f.c.ListObjects(&req)
if err != nil {
if awsErr, ok := err.(awserr.RequestFailure); ok {
if awsErr.StatusCode() == http.StatusNotFound {
err = fs.ErrorDirNotFound
}
}
return err
}
rootLength := len(f.root)
if !recurse {
for _, commonPrefix := range resp.CommonPrefixes {
if commonPrefix.Prefix == nil {
fs.Logf(f, "Nil common prefix received")
continue
}
remote := *commonPrefix.Prefix
if !strings.HasPrefix(remote, f.root) {
fs.Logf(f, "Odd name received %q", remote)
continue
}
remote = remote[rootLength:]
if strings.HasSuffix(remote, "/") {
remote = remote[:len(remote)-1]
}
err = fn(remote, &s3.Object{Key: &remote}, true)
if err != nil {
return err
}
}
}
for _, object := range resp.Contents {
key := aws.StringValue(object.Key)
if !strings.HasPrefix(key, f.root) {
fs.Logf(f, "Odd name received %q", key)
continue
}
remote := key[rootLength:]
err = fn(remote, object, false)
if err != nil {
return err
}
}
if !aws.BoolValue(resp.IsTruncated) {
break
}
// Use NextMarker if set, otherwise use last Key
if resp.NextMarker == nil || *resp.NextMarker == "" {
if len(resp.Contents) == 0 {
return errors.New("s3 protocol error: received listing with IsTruncated set, no NextMarker and no Contents")
}
marker = resp.Contents[len(resp.Contents)-1].Key
} else {
marker = resp.NextMarker
}
}
return nil
}
// Convert a list item into a DirEntry
func (f *Fs) itemToDirEntry(remote string, object *s3.Object, isDirectory bool) (fs.DirEntry, error) {
if isDirectory {
size := int64(0)
if object.Size != nil {
size = *object.Size
}
d := fs.NewDir(remote, time.Time{}).SetSize(size)
return d, nil
}
o, err := f.newObjectWithInfo(remote, object)
if err != nil {
return nil, err
}
return o, nil
}
// listDir lists files and directories to out
func (f *Fs) listDir(dir string) (entries fs.DirEntries, err error) {
// List the objects and directories
err = f.list(dir, false, func(remote string, object *s3.Object, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil {
return err
}
if entry != nil {
entries = append(entries, entry)
}
return nil
})
if err != nil {
return nil, err
}
return entries, nil
}
// listBuckets lists the buckets to out
func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) {
if dir != "" {
return nil, fs.ErrorListBucketRequired
}
req := s3.ListBucketsInput{}
resp, err := f.c.ListBuckets(&req)
if err != nil {
return nil, err
}
for _, bucket := range resp.Buckets {
d := fs.NewDir(aws.StringValue(bucket.Name), aws.TimeValue(bucket.CreationDate))
entries = append(entries, d)
}
return entries, nil
}
// List the objects and directories in dir into entries. The
// entries can be returned in any order but should be for a
// complete directory.
//
// dir should be "" to list the root, and should not have
// trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
func (f *Fs) List(dir string) (entries fs.DirEntries, err error) {
if f.bucket == "" {
return f.listBuckets(dir)
}
return f.listDir(dir)
}
// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
//
// dir should be "" to start from the root, and should not
// have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
//
// It should call callback for each tranche of entries read.
// These need not be returned in any particular order. If
// callback returns an error then the listing will stop
// immediately.
//
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) {
if f.bucket == "" {
return fs.ErrorListBucketRequired
}
list := fs.NewListRHelper(callback)
err = f.list(dir, true, func(remote string, object *s3.Object, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil {
return err
}
return list.Add(entry)
})
if err != nil {
return err
}
return list.Flush()
}
// Put the Object into the bucket
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// Temporary Object under construction
fs := &Object{
fs: f,
remote: src.Remote(),
}
return fs, fs.Update(in, src, options...)
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(in, src, options...)
}
// Check if the bucket exists
//
// NB this can return incorrect results if called immediately after bucket deletion
func (f *Fs) dirExists() (bool, error) {
req := s3.HeadBucketInput{
Bucket: &f.bucket,
}
_, err := f.c.HeadBucket(&req)
if err == nil {
return true, nil
}
if err, ok := err.(awserr.RequestFailure); ok {
if err.StatusCode() == http.StatusNotFound {
return false, nil
}
}
return false, err
}
// Mkdir creates the bucket if it doesn't exist
func (f *Fs) Mkdir(dir string) error {
f.bucketOKMu.Lock()
defer f.bucketOKMu.Unlock()
if f.bucketOK {
return nil
}
if !f.bucketDeleted {
exists, err := f.dirExists()
if err == nil {
f.bucketOK = exists
}
if err != nil || exists {
return err
}
}
req := s3.CreateBucketInput{
Bucket: &f.bucket,
ACL: &f.acl,
}
if f.locationConstraint != "" {
req.CreateBucketConfiguration = &s3.CreateBucketConfiguration{
LocationConstraint: &f.locationConstraint,
}
}
_, err := f.c.CreateBucket(&req)
if err, ok := err.(awserr.Error); ok {
if err.Code() == "BucketAlreadyOwnedByYou" {
err = nil
}
}
if err == nil {
f.bucketOK = true
f.bucketDeleted = false
}
return err
}
// Rmdir deletes the bucket if the fs is at the root
//
// Returns an error if it isn't empty
func (f *Fs) Rmdir(dir string) error {
f.bucketOKMu.Lock()
defer f.bucketOKMu.Unlock()
if f.root != "" || dir != "" {
return nil
}
req := s3.DeleteBucketInput{
Bucket: &f.bucket,
}
_, err := f.c.DeleteBucket(&req)
if err == nil {
f.bucketOK = false
f.bucketDeleted = true
}
return err
}
// Precision of the remote
func (f *Fs) Precision() time.Duration {
return time.Nanosecond
}
// Copy src to this remote using server side copy operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
err := f.Mkdir("")
if err != nil {
return nil, err
}
srcObj, ok := src.(*Object)
if !ok {
fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
srcFs := srcObj.fs
key := f.root + remote
source := url.QueryEscape(srcFs.bucket + "/" + srcFs.root + srcObj.remote)
req := s3.CopyObjectInput{
Bucket: &f.bucket,
Key: &key,
CopySource: &source,
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
}
_, err = f.c.CopyObject(&req)
if err != nil {
return nil, err
}
return f.NewObject(remote)
}
// Hashes returns the supported hash sets.
func (f *Fs) Hashes() fs.HashSet {
return fs.HashSet(fs.HashMD5)
}
// ------------------------------------------------------------
// Fs returns the parent Fs
func (o *Object) Fs() fs.Info {
return o.fs
}
// Return a string version
func (o *Object) String() string {
if o == nil {
return "<nil>"
}
return o.remote
}
// Remote returns the remote path
func (o *Object) Remote() string {
return o.remote
}
var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)
// Hash returns the Md5sum of an object returning a lowercase hex string
func (o *Object) Hash(t fs.HashType) (string, error) {
if t != fs.HashMD5 {
return "", fs.ErrHashUnsupported
}
etag := strings.Trim(strings.ToLower(o.etag), `"`)
// Check the etag is a valid md5sum
if !matchMd5.MatchString(etag) {
// fs.Debugf(o, "Invalid md5sum (probably multipart uploaded) - ignoring: %q", etag)
return "", nil
}
return etag, nil
}
// Size returns the size of an object in bytes
func (o *Object) Size() int64 {
return o.bytes
}
// readMetaData gets the metadata if it hasn't already been fetched
//
// it also sets the info
func (o *Object) readMetaData() (err error) {
if o.meta != nil {
return nil
}
key := o.fs.root + o.remote
req := s3.HeadObjectInput{
Bucket: &o.fs.bucket,
Key: &key,
}
resp, err := o.fs.c.HeadObject(&req)
if err != nil {
if awsErr, ok := err.(awserr.RequestFailure); ok {
if awsErr.StatusCode() == http.StatusNotFound {
return fs.ErrorObjectNotFound
}
}
return err
}
var size int64
// Ignore missing Content-Length assuming it is 0
// Some versions of ceph do this due their apache proxies
if resp.ContentLength != nil {
size = *resp.ContentLength
}
o.etag = aws.StringValue(resp.ETag)
o.bytes = size
o.meta = resp.Metadata
if resp.LastModified == nil {
fs.Logf(o, "Failed to read last modified from HEAD: %v", err)
o.lastModified = time.Now()
} else {
o.lastModified = *resp.LastModified
}
o.mimeType = aws.StringValue(resp.ContentType)
return nil
}
// ModTime returns the modification time of the object
//
// It attempts to read the objects mtime and if that isn't present the
// LastModified returned in the http headers
func (o *Object) ModTime() time.Time {
err := o.readMetaData()
if err != nil {
fs.Logf(o, "Failed to read metadata: %v", err)
return time.Now()
}
// read mtime out of metadata if available
d, ok := o.meta[metaMtime]
if !ok || d == nil {
// fs.Debugf(o, "No metadata")
return o.lastModified
}
modTime, err := swift.FloatStringToTime(*d)
if err != nil {
fs.Logf(o, "Failed to read mtime from object: %v", err)
return o.lastModified
}
return modTime
}
// SetModTime sets the modification time of the local fs object
func (o *Object) SetModTime(modTime time.Time) error {
err := o.readMetaData()
if err != nil {
return err
}
o.meta[metaMtime] = aws.String(swift.TimeToFloatString(modTime))
if o.bytes >= maxSizeForCopy {
fs.Debugf(o, "SetModTime is unsupported for objects bigger than %v bytes", fs.SizeSuffix(maxSizeForCopy))
return nil
}
// Guess the content type
mimeType := fs.MimeType(o)
// Copy the object to itself to update the metadata
key := o.fs.root + o.remote
sourceKey := o.fs.bucket + "/" + key
directive := s3.MetadataDirectiveReplace // replace metadata with that passed in
req := s3.CopyObjectInput{
Bucket: &o.fs.bucket,
ACL: &o.fs.acl,
Key: &key,
ContentType: &mimeType,
CopySource: aws.String(url.QueryEscape(sourceKey)),
Metadata: o.meta,
MetadataDirective: &directive,
}
_, err = o.fs.c.CopyObject(&req)
return err
}
// Storable raturns a boolean indicating if this object is storable
func (o *Object) Storable() bool {
return true
}
// Open an object for read
func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
key := o.fs.root + o.remote
req := s3.GetObjectInput{
Bucket: &o.fs.bucket,
Key: &key,
}
for _, option := range options {
switch option.(type) {
case *fs.RangeOption, *fs.SeekOption:
_, value := option.Header()
req.Range = &value
default:
if option.Mandatory() {
fs.Logf(o, "Unsupported mandatory option: %v", option)
}
}
}
resp, err := o.fs.c.GetObject(&req)
if err, ok := err.(awserr.RequestFailure); ok {
if err.Code() == "InvalidObjectState" {
return nil, errors.Errorf("Object in GLACIER, restore first: %v", key)
}
}
if err != nil {
return nil, err
}
return resp.Body, nil
}
// Update the Object from in with modTime and size
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
err := o.fs.Mkdir("")
if err != nil {
return err
}
modTime := src.ModTime()
uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) {
u.Concurrency = 2
u.LeavePartsOnError = false
u.S3 = o.fs.c
u.PartSize = s3manager.MinUploadPartSize
size := src.Size()
if size == -1 {
// Make parts as small as possible while still being able to upload to the
// S3 file size limit. Rounded up to nearest MB.
u.PartSize = (((maxFileSize / s3manager.MaxUploadParts) >> 20) + 1) << 20
return
}
// Adjust PartSize until the number of parts is small enough.
if size/u.PartSize >= s3manager.MaxUploadParts {
// Calculate partition size rounded up to the nearest MB
u.PartSize = (((size / s3manager.MaxUploadParts) >> 20) + 1) << 20
}
})
// Set the mtime in the meta data
metadata := map[string]*string{
metaMtime: aws.String(swift.TimeToFloatString(modTime)),
}
// Guess the content type
mimeType := fs.MimeType(src)
key := o.fs.root + o.remote
req := s3manager.UploadInput{
Bucket: &o.fs.bucket,
ACL: &o.fs.acl,
Key: &key,
Body: in,
ContentType: &mimeType,
Metadata: metadata,
//ContentLength: &size,
}
if o.fs.sse != "" {
req.ServerSideEncryption = &o.fs.sse
}
if o.fs.storageClass != "" {
req.StorageClass = &o.fs.storageClass
}
_, err = uploader.Upload(&req)
if err != nil {
return err
}
// Read the metadata from the newly created object
o.meta = nil // wipe old metadata
err = o.readMetaData()
return err
}
// Remove an object
func (o *Object) Remove() error {
key := o.fs.root + o.remote
req := s3.DeleteObjectInput{
Bucket: &o.fs.bucket,
Key: &key,
}
_, err := o.fs.c.DeleteObject(&req)
return err
}
// MimeType of an Object if known, "" otherwise
func (o *Object) MimeType() string {
err := o.readMetaData()
if err != nil {
fs.Logf(o, "Failed to read metadata: %v", err)
return ""
}
return o.mimeType
}
// Check the interfaces are satisfied
var (
_ fs.Fs = &Fs{}
_ fs.Copier = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.ListRer = &Fs{}
_ fs.Object = &Object{}
_ fs.MimeTyper = &Object{}
)