Be more constistent with naming in remotes
* External objects are called Fs and Object * Object.fs always points to the Fs
This commit is contained in:
parent
365b4babae
commit
b257de4aba
17 changed files with 506 additions and 475 deletions
129
s3/s3.go
129
s3/s3.go
|
@ -128,8 +128,8 @@ const (
|
|||
maxRetries = 10 // number of retries to make of operations
|
||||
)
|
||||
|
||||
// FsS3 represents a remote s3 server
|
||||
type FsS3 struct {
|
||||
// Fs represents a remote s3 server
|
||||
type Fs struct {
|
||||
name string // the name of the remote
|
||||
c *s3.S3 // the connection to the s3 server
|
||||
ses *session.Session // the s3 session
|
||||
|
@ -139,13 +139,13 @@ type FsS3 struct {
|
|||
locationConstraint string // location constraint of new buckets
|
||||
}
|
||||
|
||||
// FsObjectS3 describes a s3 object
|
||||
type FsObjectS3 struct {
|
||||
// Object describes a s3 object
|
||||
type Object struct {
|
||||
// Will definitely have everything but meta which may be nil
|
||||
//
|
||||
// List will read everything but meta - to fill that in need to call
|
||||
// readMetaData
|
||||
s3 *FsS3 // what this object is part of
|
||||
fs *Fs // what this object is part of
|
||||
remote string // The remote path
|
||||
etag string // md5sum of the object
|
||||
bytes int64 // size of the object
|
||||
|
@ -156,20 +156,20 @@ type FsObjectS3 struct {
|
|||
// ------------------------------------------------------------
|
||||
|
||||
// Name of the remote (as passed into NewFs)
|
||||
func (f *FsS3) Name() string {
|
||||
func (f *Fs) Name() string {
|
||||
return f.name
|
||||
}
|
||||
|
||||
// Root of the remote (as passed into NewFs)
|
||||
func (f *FsS3) Root() string {
|
||||
func (f *Fs) Root() string {
|
||||
if f.root == "" {
|
||||
return f.bucket
|
||||
}
|
||||
return f.bucket + "/" + f.root
|
||||
}
|
||||
|
||||
// String converts this FsS3 to a string
|
||||
func (f *FsS3) String() string {
|
||||
// String converts this Fs to a string
|
||||
func (f *Fs) String() string {
|
||||
if f.root == "" {
|
||||
return fmt.Sprintf("S3 bucket %s", f.bucket)
|
||||
}
|
||||
|
@ -247,7 +247,7 @@ func s3Connection(name string) (*s3.S3, *session.Session, error) {
|
|||
return c, ses, nil
|
||||
}
|
||||
|
||||
// NewFs contstructs an FsS3 from the path, bucket:path
|
||||
// NewFs contstructs an Fs from the path, bucket:path
|
||||
func NewFs(name, root string) (fs.Fs, error) {
|
||||
bucket, directory, err := s3ParsePath(root)
|
||||
if err != nil {
|
||||
|
@ -257,7 +257,7 @@ func NewFs(name, root string) (fs.Fs, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f := &FsS3{
|
||||
f := &Fs{
|
||||
name: name,
|
||||
c: c,
|
||||
bucket: bucket,
|
||||
|
@ -294,9 +294,9 @@ func NewFs(name, root string) (fs.Fs, error) {
|
|||
// Return an FsObject from a path
|
||||
//
|
||||
// May return nil if an error occurred
|
||||
func (f *FsS3) newFsObjectWithInfo(remote string, info *s3.Object) fs.Object {
|
||||
o := &FsObjectS3{
|
||||
s3: f,
|
||||
func (f *Fs) newFsObjectWithInfo(remote string, info *s3.Object) fs.Object {
|
||||
o := &Object{
|
||||
fs: f,
|
||||
remote: remote,
|
||||
}
|
||||
if info != nil {
|
||||
|
@ -322,14 +322,14 @@ func (f *FsS3) newFsObjectWithInfo(remote string, info *s3.Object) fs.Object {
|
|||
// NewFsObject returns an FsObject from a path
|
||||
//
|
||||
// May return nil if an error occurred
|
||||
func (f *FsS3) NewFsObject(remote string) fs.Object {
|
||||
func (f *Fs) NewFsObject(remote string) fs.Object {
|
||||
return f.newFsObjectWithInfo(remote, nil)
|
||||
}
|
||||
|
||||
// list the objects into the function supplied
|
||||
//
|
||||
// If directories is set it only sends directories
|
||||
func (f *FsS3) list(directories bool, fn func(string, *s3.Object)) {
|
||||
func (f *Fs) list(directories bool, fn func(string, *s3.Object)) {
|
||||
maxKeys := int64(listChunkSize)
|
||||
delimiter := ""
|
||||
if directories {
|
||||
|
@ -394,7 +394,7 @@ func (f *FsS3) list(directories bool, fn func(string, *s3.Object)) {
|
|||
}
|
||||
|
||||
// List walks the path returning a channel of FsObjects
|
||||
func (f *FsS3) List() fs.ObjectsChan {
|
||||
func (f *Fs) List() fs.ObjectsChan {
|
||||
out := make(fs.ObjectsChan, fs.Config.Checkers)
|
||||
if f.bucket == "" {
|
||||
// Return no objects at top level list
|
||||
|
@ -415,7 +415,7 @@ func (f *FsS3) List() fs.ObjectsChan {
|
|||
}
|
||||
|
||||
// ListDir lists the buckets
|
||||
func (f *FsS3) ListDir() fs.DirChan {
|
||||
func (f *Fs) ListDir() fs.DirChan {
|
||||
out := make(fs.DirChan, fs.Config.Checkers)
|
||||
if f.bucket == "" {
|
||||
// List the buckets
|
||||
|
@ -458,14 +458,17 @@ func (f *FsS3) ListDir() fs.DirChan {
|
|||
}
|
||||
|
||||
// Put the FsObject into the bucket
|
||||
func (f *FsS3) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.Object, error) {
|
||||
// Temporary FsObject under construction
|
||||
fs := &FsObjectS3{s3: f, remote: remote}
|
||||
func (f *Fs) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.Object, error) {
|
||||
// Temporary Object under construction
|
||||
fs := &Object{
|
||||
fs: f,
|
||||
remote: remote,
|
||||
}
|
||||
return fs, fs.Update(in, modTime, size)
|
||||
}
|
||||
|
||||
// Mkdir creates the bucket if it doesn't exist
|
||||
func (f *FsS3) Mkdir() error {
|
||||
func (f *Fs) Mkdir() error {
|
||||
req := s3.CreateBucketInput{
|
||||
Bucket: &f.bucket,
|
||||
ACL: &f.perm,
|
||||
|
@ -487,7 +490,7 @@ func (f *FsS3) Mkdir() error {
|
|||
// Rmdir deletes the bucket
|
||||
//
|
||||
// Returns an error if it isn't empty
|
||||
func (f *FsS3) Rmdir() error {
|
||||
func (f *Fs) Rmdir() error {
|
||||
req := s3.DeleteBucketInput{
|
||||
Bucket: &f.bucket,
|
||||
}
|
||||
|
@ -496,7 +499,7 @@ func (f *FsS3) Rmdir() error {
|
|||
}
|
||||
|
||||
// Precision of the remote
|
||||
func (f *FsS3) Precision() time.Duration {
|
||||
func (f *Fs) Precision() time.Duration {
|
||||
return time.Nanosecond
|
||||
}
|
||||
|
||||
|
@ -509,13 +512,13 @@ func (f *FsS3) Precision() time.Duration {
|
|||
// Will only be called if src.Fs().Name() == f.Name()
|
||||
//
|
||||
// If it isn't possible then return fs.ErrorCantCopy
|
||||
func (f *FsS3) Copy(src fs.Object, remote string) (fs.Object, error) {
|
||||
srcObj, ok := src.(*FsObjectS3)
|
||||
func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
|
||||
srcObj, ok := src.(*Object)
|
||||
if !ok {
|
||||
fs.Debug(src, "Can't copy - not same remote type")
|
||||
return nil, fs.ErrorCantCopy
|
||||
}
|
||||
srcFs := srcObj.s3
|
||||
srcFs := srcObj.fs
|
||||
key := f.root + remote
|
||||
source := srcFs.bucket + "/" + srcFs.root + srcObj.remote
|
||||
req := s3.CopyObjectInput{
|
||||
|
@ -534,12 +537,12 @@ func (f *FsS3) Copy(src fs.Object, remote string) (fs.Object, error) {
|
|||
// ------------------------------------------------------------
|
||||
|
||||
// Fs returns the parent Fs
|
||||
func (o *FsObjectS3) Fs() fs.Fs {
|
||||
return o.s3
|
||||
func (o *Object) Fs() fs.Fs {
|
||||
return o.fs
|
||||
}
|
||||
|
||||
// Return a string version
|
||||
func (o *FsObjectS3) String() string {
|
||||
func (o *Object) String() string {
|
||||
if o == nil {
|
||||
return "<nil>"
|
||||
}
|
||||
|
@ -547,14 +550,14 @@ func (o *FsObjectS3) String() string {
|
|||
}
|
||||
|
||||
// Remote returns the remote path
|
||||
func (o *FsObjectS3) Remote() string {
|
||||
func (o *Object) Remote() string {
|
||||
return o.remote
|
||||
}
|
||||
|
||||
var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)
|
||||
|
||||
// Md5sum returns the Md5sum of an object returning a lowercase hex string
|
||||
func (o *FsObjectS3) Md5sum() (string, error) {
|
||||
func (o *Object) Md5sum() (string, error) {
|
||||
etag := strings.Trim(strings.ToLower(o.etag), `"`)
|
||||
// Check the etag is a valid md5sum
|
||||
if !matchMd5.MatchString(etag) {
|
||||
|
@ -565,23 +568,23 @@ func (o *FsObjectS3) Md5sum() (string, error) {
|
|||
}
|
||||
|
||||
// Size returns the size of an object in bytes
|
||||
func (o *FsObjectS3) Size() int64 {
|
||||
func (o *Object) Size() int64 {
|
||||
return o.bytes
|
||||
}
|
||||
|
||||
// readMetaData gets the metadata if it hasn't already been fetched
|
||||
//
|
||||
// it also sets the info
|
||||
func (o *FsObjectS3) readMetaData() (err error) {
|
||||
func (o *Object) readMetaData() (err error) {
|
||||
if o.meta != nil {
|
||||
return nil
|
||||
}
|
||||
key := o.s3.root + o.remote
|
||||
key := o.fs.root + o.remote
|
||||
req := s3.HeadObjectInput{
|
||||
Bucket: &o.s3.bucket,
|
||||
Bucket: &o.fs.bucket,
|
||||
Key: &key,
|
||||
}
|
||||
resp, err := o.s3.c.HeadObject(&req)
|
||||
resp, err := o.fs.c.HeadObject(&req)
|
||||
if err != nil {
|
||||
fs.Debug(o, "Failed to read info: %s", err)
|
||||
return err
|
||||
|
@ -608,7 +611,7 @@ func (o *FsObjectS3) readMetaData() (err error) {
|
|||
//
|
||||
// It attempts to read the objects mtime and if that isn't present the
|
||||
// LastModified returned in the http headers
|
||||
func (o *FsObjectS3) ModTime() time.Time {
|
||||
func (o *Object) ModTime() time.Time {
|
||||
err := o.readMetaData()
|
||||
if err != nil {
|
||||
fs.Log(o, "Failed to read metadata: %s", err)
|
||||
|
@ -629,7 +632,7 @@ func (o *FsObjectS3) ModTime() time.Time {
|
|||
}
|
||||
|
||||
// SetModTime sets the modification time of the local fs object
|
||||
func (o *FsObjectS3) SetModTime(modTime time.Time) {
|
||||
func (o *Object) SetModTime(modTime time.Time) {
|
||||
err := o.readMetaData()
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
|
@ -639,18 +642,18 @@ func (o *FsObjectS3) SetModTime(modTime time.Time) {
|
|||
o.meta[metaMtime] = aws.String(swift.TimeToFloatString(modTime))
|
||||
|
||||
// Copy the object to itself to update the metadata
|
||||
key := o.s3.root + o.remote
|
||||
sourceKey := o.s3.bucket + "/" + key
|
||||
key := o.fs.root + o.remote
|
||||
sourceKey := o.fs.bucket + "/" + key
|
||||
directive := s3.MetadataDirectiveReplace // replace metadata with that passed in
|
||||
req := s3.CopyObjectInput{
|
||||
Bucket: &o.s3.bucket,
|
||||
ACL: &o.s3.perm,
|
||||
Bucket: &o.fs.bucket,
|
||||
ACL: &o.fs.perm,
|
||||
Key: &key,
|
||||
CopySource: &sourceKey,
|
||||
Metadata: o.meta,
|
||||
MetadataDirective: &directive,
|
||||
}
|
||||
_, err = o.s3.c.CopyObject(&req)
|
||||
_, err = o.fs.c.CopyObject(&req)
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
fs.ErrorLog(o, "Failed to update remote mtime: %s", err)
|
||||
|
@ -658,18 +661,18 @@ func (o *FsObjectS3) SetModTime(modTime time.Time) {
|
|||
}
|
||||
|
||||
// Storable raturns a boolean indicating if this object is storable
|
||||
func (o *FsObjectS3) Storable() bool {
|
||||
func (o *Object) Storable() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Open an object for read
|
||||
func (o *FsObjectS3) Open() (in io.ReadCloser, err error) {
|
||||
key := o.s3.root + o.remote
|
||||
func (o *Object) Open() (in io.ReadCloser, err error) {
|
||||
key := o.fs.root + o.remote
|
||||
req := s3.GetObjectInput{
|
||||
Bucket: &o.s3.bucket,
|
||||
Bucket: &o.fs.bucket,
|
||||
Key: &key,
|
||||
}
|
||||
resp, err := o.s3.c.GetObject(&req)
|
||||
resp, err := o.fs.c.GetObject(&req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -677,11 +680,11 @@ func (o *FsObjectS3) Open() (in io.ReadCloser, err error) {
|
|||
}
|
||||
|
||||
// Update the Object from in with modTime and size
|
||||
func (o *FsObjectS3) Update(in io.Reader, modTime time.Time, size int64) error {
|
||||
uploader := s3manager.NewUploader(o.s3.ses, func(u *s3manager.Uploader) {
|
||||
func (o *Object) Update(in io.Reader, modTime time.Time, size int64) error {
|
||||
uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) {
|
||||
u.Concurrency = 2
|
||||
u.LeavePartsOnError = false
|
||||
u.S3 = o.s3.c
|
||||
u.S3 = o.fs.c
|
||||
})
|
||||
|
||||
// Set the mtime in the meta data
|
||||
|
@ -692,10 +695,10 @@ func (o *FsObjectS3) Update(in io.Reader, modTime time.Time, size int64) error {
|
|||
// Guess the content type
|
||||
contentType := fs.MimeType(o)
|
||||
|
||||
key := o.s3.root + o.remote
|
||||
key := o.fs.root + o.remote
|
||||
req := s3manager.UploadInput{
|
||||
Bucket: &o.s3.bucket,
|
||||
ACL: &o.s3.perm,
|
||||
Bucket: &o.fs.bucket,
|
||||
ACL: &o.fs.perm,
|
||||
Key: &key,
|
||||
Body: in,
|
||||
ContentType: &contentType,
|
||||
|
@ -714,17 +717,19 @@ func (o *FsObjectS3) Update(in io.Reader, modTime time.Time, size int64) error {
|
|||
}
|
||||
|
||||
// Remove an object
|
||||
func (o *FsObjectS3) Remove() error {
|
||||
key := o.s3.root + o.remote
|
||||
func (o *Object) Remove() error {
|
||||
key := o.fs.root + o.remote
|
||||
req := s3.DeleteObjectInput{
|
||||
Bucket: &o.s3.bucket,
|
||||
Bucket: &o.fs.bucket,
|
||||
Key: &key,
|
||||
}
|
||||
_, err := o.s3.c.DeleteObject(&req)
|
||||
_, err := o.fs.c.DeleteObject(&req)
|
||||
return err
|
||||
}
|
||||
|
||||
// Check the interfaces are satisfied
|
||||
var _ fs.Fs = &FsS3{}
|
||||
var _ fs.Copier = &FsS3{}
|
||||
var _ fs.Object = &FsObjectS3{}
|
||||
var (
|
||||
_ fs.Fs = &Fs{}
|
||||
_ fs.Copier = &Fs{}
|
||||
_ fs.Object = &Object{}
|
||||
)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue