From 3bf6348f574e59565854646a20c0c12e6af4c2af Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 26 Dec 2012 12:23:58 +0000 Subject: [PATCH] Factor local filesystem and remote swift into Fs and FsObject interfaces This will enable * local -> local and remote -> remote copies * a much more uniform style * could do s3 as well --- fs.go | 112 +++++++++++++ fs_local.go | 244 ++++++++++++++++++++++++++++ fs_swift.go | 215 +++++++++++++++++++++++++ swiftsync.go | 447 +++++++++++---------------------------------------- 4 files changed, 663 insertions(+), 355 deletions(-) create mode 100644 fs.go create mode 100644 fs_local.go create mode 100644 fs_swift.go diff --git a/fs.go b/fs.go new file mode 100644 index 000000000..ab6115124 --- /dev/null +++ b/fs.go @@ -0,0 +1,112 @@ +// File system interface + +package main + +import ( + "io" + "time" +) + +// A Filesystem, describes the local filesystem and the remote object store +type Fs interface { + List() FsObjectsChan + NewFsObject(remote string) FsObject + Put(src FsObject) + Mkdir() error + Rmdir() error +} + +// FIXME make f.Debugf... + +// A filesystem like object which can either be a remote object or a +// local file/directory +type FsObject interface { + Remote() string + Debugf(string, ...interface{}) + Md5sum() (string, error) + ModTime() (time.Time, error) + SetModTime(time.Time) + Size() int64 + Open() (io.ReadCloser, error) + Storable() bool + // Exists() bool + Remove() error +} + +type FsObjectsChan chan FsObject + +type FsObjects []FsObject + +// checkClose is a utility function used to check the return from +// Close in a defer statement. +func checkClose(c io.Closer, err *error) { + cerr := c.Close() + if *err == nil { + *err = cerr + } +} + +// Checks to see if the src and dst objects are equal by looking at +// size, mtime and MD5SUM +// +// If the src and dst size are different then it is considered to be +// not equal. +// +// If the size is the same and the mtime is the same then it is +// considered to be equal. This is the heuristic rsync uses when +// not using --checksum. +// +// If the size is the same and and mtime is different or unreadable +// and the MD5SUM is the same then the file is considered to be equal. +// In this case the mtime on the dst is updated. +// +// Otherwise the file is considered to be not equal including if there +// were errors reading info. +func Equal(src, dst FsObject) bool { + if src.Size() != dst.Size() { + src.Debugf("Sizes differ") + return false + } + + // Size the same so check the mtime + srcModTime, err := src.ModTime() + if err != nil { + src.Debugf("Failed to read src mtime: %s", err) + } else { + dstModTime, err := dst.ModTime() + if err != nil { + dst.Debugf("Failed to read dst mtime: %s", err) + } else if !dstModTime.Equal(srcModTime) { + src.Debugf("Modification times differ") + } else { + src.Debugf("Size and modification time the same") + return true + } + } + + // mtime is unreadable or different but size is the same so + // check the MD5SUM + srcMd5, err := src.Md5sum() + if err != nil { + src.Debugf("Failed to calculate src md5: %s", err) + return false + } + dstMd5, err := dst.Md5sum() + if err != nil { + dst.Debugf("Failed to calculate dst md5: %s", err) + return false + } + // fs.Debugf("Src MD5 %s", srcMd5) + // fs.Debugf("Dst MD5 %s", obj.Hash) + if srcMd5 != dstMd5 { + src.Debugf("Md5sums differ") + return false + } + + // Size and MD5 the same but mtime different so update the + // mtime of the dst object here + dst.SetModTime(srcModTime) + + src.Debugf("Size and MD5SUM of src and dst objects identical") + return true +} diff --git a/fs_local.go b/fs_local.go new file mode 100644 index 000000000..bf6054172 --- /dev/null +++ b/fs_local.go @@ -0,0 +1,244 @@ +// Local filesystem interface +package main + +import ( + "crypto/md5" + "fmt" + "io" + "log" + "os" + "path" + "path/filepath" + "time" +) + +// FsLocal represents a local filesystem rooted at root +type FsLocal struct { + root string +} + +// FsObjectLocal represents a local filesystem object +type FsObjectLocal struct { + remote string // The remote path + path string // The local path + info os.FileInfo // Interface for file info +} + +// ------------------------------------------------------------ + +// Return an FsObject from a path +// +// May return nil if an error occurred +func (f *FsLocal) NewFsObjectWithInfo(remote string, info os.FileInfo) FsObject { + path := filepath.Join(f.root, remote) + fs := &FsObjectLocal{remote: remote, path: path} + if info != nil { + fs.info = info + } else { + err := fs.lstat() + if err != nil { + log.Printf("Failed to stat %s: %s", path, err) + return nil + } + } + return fs +} + +// Return an FsObject from a path +// +// May return nil if an error occurred +func (f *FsLocal) NewFsObject(remote string) FsObject { + return f.NewFsObjectWithInfo(remote, nil) +} + +// Walk the path returning a channel of FsObjects +// +// FIXME ignore symlinks? +// FIXME what about hardlinks / etc +func (f *FsLocal) List() FsObjectsChan { + out := make(FsObjectsChan, *checkers) + go func() { + err := filepath.Walk(f.root, func(path string, fi os.FileInfo, err error) error { + if err != nil { + log.Printf("Failed to open directory: %s: %s", path, err) + } else { + remote, err := filepath.Rel(f.root, path) + if err != nil { + log.Printf("Failed to get relative path %s: %s", path, err) + return nil + } + if remote == "." { + return nil + // remote = "" + } + if fs := f.NewFsObjectWithInfo(remote, fi); fs != nil { + if fs.Storable() { + out <- fs + } + } + } + return nil + }) + if err != nil { + log.Printf("Failed to open directory: %s: %s", f.root, err) + } + close(out) + }() + return out +} + +// FIXME most of this is generic +// could make it into Copy(dst, src FsObject) + +// Puts the FsObject to the local filesystem +// +// FIXME return the object? +func (f *FsLocal) Put(src FsObject) { + dstRemote := src.Remote() + dstPath := filepath.Join(f.root, dstRemote) + log.Printf("Download %s to %s", dstRemote, dstPath) + // Temporary FsObject under construction + fs := &FsObjectLocal{remote: dstRemote, path: dstPath} + + dir := path.Dir(dstPath) + err := os.MkdirAll(dir, 0770) + if err != nil { + fs.Debugf("Couldn't make directory: %s", err) + return + } + + out, err := os.Create(dstPath) + if err != nil { + fs.Debugf("Failed to open: %s", err) + return + } + + // Close and remove file on error at the end + defer func() { + checkClose(out, &err) + if err != nil { + fs.Debugf("Removing failed download") + removeErr := os.Remove(dstPath) + if removeErr != nil { + fs.Debugf("Failed to remove failed download: %s", err) + } + } + }() + + in, err := src.Open() + if err != nil { + fs.Debugf("Failed to open: %s", err) + return + } + defer checkClose(in, &err) + + _, err = io.Copy(out, in) + if err != nil { + fs.Debugf("Failed to download: %s", err) + return + } + + // Set the mtime + modTime, err := src.ModTime() + if err != nil { + fs.Debugf("Failed to read mtime from object: %s", err) + } else { + fs.SetModTime(modTime) + } +} + +// Mkdir creates the directory if it doesn't exist +func (f *FsLocal) Mkdir() error { + return os.MkdirAll(f.root, 0770) +} + +// Rmdir removes the directory +// +// If it isn't empty it will return an error +func (f *FsLocal) Rmdir() error { + return os.Remove(f.root) +} + +// ------------------------------------------------------------ + +// Return the remote path +func (fs *FsObjectLocal) Remote() string { + return fs.remote +} + +// Write debuging output for this FsObject +func (fs *FsObjectLocal) Debugf(text string, args ...interface{}) { + out := fmt.Sprintf(text, args...) + log.Printf("%s: %s", fs.remote, out) +} + +// Md5sum calculates the Md5sum of a file returning a lowercase hex string +func (fs *FsObjectLocal) Md5sum() (string, error) { + in, err := os.Open(fs.path) + if err != nil { + fs.Debugf("Failed to open: %s", err) + return "", err + } + defer in.Close() // FIXME ignoring error + hash := md5.New() + _, err = io.Copy(hash, in) + if err != nil { + fs.Debugf("Failed to read: %s", err) + return "", err + } + return fmt.Sprintf("%x", hash.Sum(nil)), nil +} + +// Size returns the size of an object in bytes +func (fs *FsObjectLocal) Size() int64 { + return fs.info.Size() +} + +// ModTime returns the modification time of the object +func (fs *FsObjectLocal) ModTime() (modTime time.Time, err error) { + return fs.info.ModTime(), nil +} + +// Sets the modification time of the local fs object +func (fs *FsObjectLocal) SetModTime(modTime time.Time) { + err := Chtimes(fs.path, modTime, modTime) + if err != nil { + fs.Debugf("Failed to set mtime on file: %s", err) + } +} + +// Is this object storable +func (fs *FsObjectLocal) Storable() bool { + mode := fs.info.Mode() + if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 { + fs.Debugf("Can't transfer non file/directory") + return false + } else if mode&os.ModeDir != 0 { + // Debug? + fs.Debugf("FIXME Skipping directory") + return false + } + return true +} + +// Open an object for read +func (fs *FsObjectLocal) Open() (in io.ReadCloser, err error) { + in, err = os.Open(fs.path) + return +} + +// Stat a FsObject into info +func (fs *FsObjectLocal) lstat() error { + info, err := os.Lstat(fs.path) + fs.info = info + return err +} + +// Remove an object +func (fs *FsObjectLocal) Remove() error { + return os.Remove(fs.path) +} + +// Check the interfaces are satisfied +var _ Fs = &FsLocal{} +var _ FsObject = &FsObjectLocal{} diff --git a/fs_swift.go b/fs_swift.go new file mode 100644 index 000000000..47bfa14ec --- /dev/null +++ b/fs_swift.go @@ -0,0 +1,215 @@ +// Swift interface +package main + +import ( + "fmt" + "github.com/ncw/swift" + "io" + "log" + "strings" + "time" +) + +// FsSwift represents a remote swift server +type FsSwift struct { + c swift.Connection // the connection to the swift server + container string // the container we are working on +} + +// FsObjectSwift describes a swift object +// +// Will definitely have info but maybe not meta +type FsObjectSwift struct { + swift *FsSwift // what this object is part of + remote string // The remote path + info swift.Object // Info from the swift object if known + meta *swift.Metadata // The object metadata if known +} + +// ------------------------------------------------------------ + +// Return an FsObject from a path +// +// May return nil if an error occurred +func (f *FsSwift) NewFsObjectWithInfo(remote string, info *swift.Object) FsObject { + fs := &FsObjectSwift{ + swift: f, + remote: remote, + } + if info != nil { + // Set info but not meta + fs.info = *info + } else { + err := fs.readMetaData() // reads info and meta, returning an error + if err != nil { + // logged already fs.Debugf("Failed to read info: %s", err) + return nil + } + } + return fs +} + +// Return an FsObject from a path +// +// May return nil if an error occurred +func (f *FsSwift) NewFsObject(remote string) FsObject { + return f.NewFsObjectWithInfo(remote, nil) +} + +// Walk the path returning a channel of FsObjects +// +// FIXME ignore symlinks? +// FIXME what about hardlinks / etc +// +// FIXME not returning error if dir not found? +func (f *FsSwift) List() FsObjectsChan { + out := make(FsObjectsChan, *checkers) + go func() { + // FIXME use a smaller limit? + err := f.c.ObjectsAllFn(f.container, nil, func(objects []swift.Object) bool { + for i := range objects { + object := &objects[i] + if fs := f.NewFsObjectWithInfo(object.Name, object); fs != nil { + out <- fs + } + } + return false + }) + if err != nil { + log.Printf("Couldn't read container %q: %s", f.container, err) + } + close(out) + }() + return out +} + +// Put the FsObject into the container +func (f *FsSwift) Put(src FsObject) { + // Temporary FsObject under construction + fs := &FsObjectSwift{swift: f, remote: src.Remote()} + // FIXME content type + in, err := src.Open() + if err != nil { + fs.Debugf("Failed to open: %s", err) + return + } + defer in.Close() + + // Set the mtime + m := swift.Metadata{} + modTime, err := src.ModTime() + if err != nil { + fs.Debugf("Failed to read mtime from object: %s", err) + } else { + m.SetModTime(modTime) + } + + _, err = fs.swift.c.ObjectPut(fs.swift.container, fs.remote, in, true, "", "", m.ObjectHeaders()) + if err != nil { + fs.Debugf("Failed to upload: %s", err) + return + } + fs.Debugf("Uploaded") +} + +// Mkdir creates the container if it doesn't exist +func (f *FsSwift) Mkdir() error { + return f.c.ContainerCreate(f.container, nil) +} + +// Rmdir deletes the container +// +// Returns an error if it isn't empty +func (f *FsSwift) Rmdir() error { + return f.c.ContainerDelete(f.container) +} + +// ------------------------------------------------------------ + +// Return the remote path +func (fs *FsObjectSwift) Remote() string { + return fs.remote +} + +// Write debuging output for this FsObject +func (fs *FsObjectSwift) Debugf(text string, args ...interface{}) { + out := fmt.Sprintf(text, args...) + log.Printf("%s: %s", fs.remote, out) +} + +// Md5sum returns the Md5sum of an object returning a lowercase hex string +func (fs *FsObjectSwift) Md5sum() (string, error) { + return strings.ToLower(fs.info.Hash), nil +} + +// Size returns the size of an object in bytes +func (fs *FsObjectSwift) Size() int64 { + return fs.info.Bytes +} + +// readMetaData gets the metadata if it hasn't already been fetched +// +// it also sets the info +func (fs *FsObjectSwift) readMetaData() (err error) { + if fs.meta != nil { + return nil + } + info, h, err := fs.swift.c.Object(fs.swift.container, fs.remote) + if err != nil { + fs.Debugf("Failed to read info: %s", err) + return err + } + meta := h.ObjectMetadata() + fs.info = info + fs.meta = &meta + return nil +} + +// ModTime returns the modification time of the object +func (fs *FsObjectSwift) ModTime() (modTime time.Time, err error) { + err = fs.readMetaData() + if err != nil { + fs.Debugf("Failed to read metadata: %s", err) + return + } + modTime, err = fs.meta.GetModTime() + if err != nil { + fs.Debugf("Failed to read mtime from object: %s", err) + return + } + return +} + +// Sets the modification time of the local fs object +func (fs *FsObjectSwift) SetModTime(modTime time.Time) { + err := fs.readMetaData() + if err != nil { + fs.Debugf("Failed to read metadata: %s", err) + return + } + fs.meta.SetModTime(modTime) + err = fs.swift.c.ObjectUpdate(fs.swift.container, fs.remote, fs.meta.ObjectHeaders()) + if err != nil { + fs.Debugf("Failed to update remote mtime: %s", err) + } +} + +// Is this object storable +func (fs *FsObjectSwift) Storable() bool { + return true +} + +// Open an object for read +func (fs *FsObjectSwift) Open() (in io.ReadCloser, err error) { + in, _, err = fs.swift.c.ObjectOpen(fs.swift.container, fs.info.Name, true, nil) + return +} + +// Remove an object +func (fs *FsObjectSwift) Remove() error { + return fs.swift.c.ObjectDelete(fs.swift.container, fs.remote) +} + +// Check the interfaces are satisfied +var _ Fs = &FsSwift{} +var _ FsObject = &FsObjectSwift{} diff --git a/swiftsync.go b/swiftsync.go index ddefc6417..ae3b2bb55 100644 --- a/swiftsync.go +++ b/swiftsync.go @@ -4,20 +4,15 @@ package main import ( - "crypto/md5" "flag" "fmt" "github.com/ncw/swift" - "io" "log" "os" - "path" - "path/filepath" "runtime" "runtime/pprof" "strings" "sync" - "time" ) // Globals @@ -35,325 +30,77 @@ var ( transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.") ) -// A filesystem like object which can either be a remote object or a -// local file/directory or both -type FsObject struct { - remote string // The remote path - path string // The local path - info os.FileInfo // Interface for file info -} - -type FsObjectsChan chan *FsObject - -type FsObjects []FsObject - -// Write debuging output for this FsObject -func (fs *FsObject) Debugf(text string, args ...interface{}) { - out := fmt.Sprintf(text, args...) - log.Printf("%s: %s", fs.remote, out) -} - -// md5sum calculates the md5sum of a file returning a lowercase hex string -func (fs *FsObject) md5sum() (string, error) { - in, err := os.Open(fs.path) - if err != nil { - fs.Debugf("Failed to open: %s", err) - return "", err - } - defer in.Close() // FIXME ignoring error - hash := md5.New() - _, err = io.Copy(hash, in) - if err != nil { - fs.Debugf("Failed to read: %s", err) - return "", err - } - return fmt.Sprintf("%x", hash.Sum(nil)), nil -} - -// Sets the modification time of the local fs object -func (fs *FsObject) SetModTime(modTime time.Time) { - err := Chtimes(fs.path, modTime, modTime) - if err != nil { - fs.Debugf("Failed to set mtime on file: %s", err) - } -} - -// Checks to see if the remote and local objects are equal by looking -// at size, mtime and MD5SUM -// -// If the remote and local size are different then it is considered to -// be not equal. -// -// If the size is the same and the mtime is the same then it is -// considered to be equal. This is the heuristic rsync uses when -// not using --checksum. -// -// If the size is the same and and mtime is different or unreadable -// and the MD5SUM is the same then the file is considered to be -// equal. In this case the mtime on the object is updated. If -// upload is set then the remote object is changed otherwise the local -// object. -// -// Otherwise the file is considered to be not equal including if there -// were errors reading info. -func (fs *FsObject) Equal(c *swift.Connection, container string, upload bool) bool { - // FIXME could pass in an Object here if we have one which - // will mean we could do the Size and Hash checks without a - // remote call if we wanted - obj, h, err := c.Object(container, fs.remote) - if err != nil { - fs.Debugf("Failed to read info: %s", err) - return false - } - if obj.Bytes != fs.info.Size() { - fs.Debugf("Sizes differ") - return false - } - - // Size the same so check the mtime - m := h.ObjectMetadata() - remoteModTime, err := m.GetModTime() - if err != nil { - fs.Debugf("Failed to read mtime: %s", err) - } else if !remoteModTime.Equal(fs.info.ModTime()) { - fs.Debugf("Modification times differ") - } else { - fs.Debugf("Size and modification time the same") - return true - } - - // mtime is unreadable or different but size is the same so - // check the MD5SUM - localMd5, err := fs.md5sum() - if err != nil { - fs.Debugf("Failed to calculate md5: %s", err) - return false - } - // fs.Debugf("Local MD5 %s", localMd5) - // fs.Debugf("Remote MD5 %s", obj.Hash) - if localMd5 != strings.ToLower(obj.Hash) { - fs.Debugf("Md5sums differ") - return false - } - - // Size and MD5 the same but mtime different so update the - // mtime of the local or remote object here - if upload { - m.SetModTime(fs.info.ModTime()) - err = c.ObjectUpdate(container, fs.remote, m.ObjectHeaders()) - if err != nil { - fs.Debugf("Failed to update remote mtime: %s", err) - } - fs.Debugf("Updated mtime of remote object") - } else { - fmt.Printf("metadata %q, remoteModTime = %s\n", m, remoteModTime) - fs.SetModTime(remoteModTime) - fs.Debugf("Updated mtime of local object") - } - - fs.Debugf("Size and MD5SUM of local and remote objects identical") - return true -} - -// Is this object storable -func (fs *FsObject) storable() bool { - mode := fs.info.Mode() - if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 { - fs.Debugf("Can't transfer non file/directory") - return false - } else if mode&os.ModeDir != 0 { - // Debug? - fs.Debugf("FIXME Skipping directory") - return false - } - return true -} - -// Puts the FsObject into the container -func (fs *FsObject) put(c *swift.Connection, container string) { - // FIXME content type - in, err := os.Open(fs.path) - if err != nil { - fs.Debugf("Failed to open: %s", err) - return - } - defer in.Close() - m := swift.Metadata{} - m.SetModTime(fs.info.ModTime()) - _, err = c.ObjectPut(container, fs.remote, in, true, "", "", m.ObjectHeaders()) - if err != nil { - fs.Debugf("Failed to upload: %s", err) - return - } - fs.Debugf("Uploaded") -} - -// Stat a FsObject into info -func (fs *FsObject) lstat() error { - info, err := os.Lstat(fs.path) - fs.info = info - return err -} - -// Return an FsObject from a path -// -// May return nil if an error occurred -func NewFsObject(root, path string) *FsObject { - remote, err := filepath.Rel(root, path) - if err != nil { - log.Printf("Failed to get relative path %s: %s", path, err) - return nil - } - if remote == "." { - remote = "" - } - fs := &FsObject{remote: remote, path: path} - err = fs.lstat() - if err != nil { - log.Printf("Failed to stat %s: %s", path, err) - return nil - } - return fs -} - -// Walk the path returning a channel of FsObjects -// -// FIXME ignore symlinks? -// FIXME what about hardlinks / etc -func walk(root string) FsObjectsChan { - out := make(FsObjectsChan, *checkers) - go func() { - err := filepath.Walk(root, func(path string, f os.FileInfo, err error) error { - if err != nil { - log.Printf("Failed to open directory: %s: %s", path, err) - } else { - if fs := NewFsObject(root, path); fs != nil { - out <- fs - } - } - return nil - }) - if err != nil { - log.Printf("Failed to open directory: %s: %s", root, err) - } - close(out) - }() - return out -} - -// Read FsObjects on in and write them to out if they need uploading +// Read FsObjects~s on in send to out if they need uploading // // FIXME potentially doing lots of MD5SUMS at once -func checker(c *swift.Connection, container string, in, out FsObjectsChan, upload bool, wg *sync.WaitGroup) { +func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) { defer wg.Done() - for fs := range in { - if !upload { - _ = fs.lstat() - if fs.info == nil { - fs.Debugf("Couldn't find local file - download") - out <- fs - continue - } + for src := range in { + dst := fdst.NewFsObject(src.Remote()) + if dst == nil { + src.Debugf("Couldn't find local file - download") + out <- src + continue } // Check to see if can store this - if !fs.storable() { + if !src.Storable() { continue } // Check to see if changed or not - if fs.Equal(c, container, upload) { - fs.Debugf("Unchanged skipping") + if Equal(src, dst) { + src.Debugf("Unchanged skipping") continue } - out <- fs + out <- src } } -// Read FsObjects on in and upload them -func uploader(c *swift.Connection, container string, in FsObjectsChan, wg *sync.WaitGroup) { +// Read FsObjects on in and copy them +func Copier(in FsObjectsChan, fdst Fs, wg *sync.WaitGroup) { defer wg.Done() - for fs := range in { - fs.put(c, container) + for src := range in { + fdst.Put(src) } } -// Syncs a directory into a container -func upload(c *swift.Connection, args []string) { - root, container := args[0], args[1] - mkdir(c, []string{container}) - to_be_checked := walk(root) +// Copies fsrc into fdst +func Copy(fsrc, fdst Fs) { + err := fdst.Mkdir() + if err != nil { + log.Fatal("Failed to make destination") + } + + to_be_checked := fsrc.List() to_be_uploaded := make(FsObjectsChan, *transfers) var checkerWg sync.WaitGroup checkerWg.Add(*checkers) for i := 0; i < *checkers; i++ { - go checker(c, container, to_be_checked, to_be_uploaded, true, &checkerWg) + go Checker(to_be_checked, to_be_uploaded, fdst, &checkerWg) } - var uploaderWg sync.WaitGroup - uploaderWg.Add(*transfers) + var copierWg sync.WaitGroup + copierWg.Add(*transfers) for i := 0; i < *transfers; i++ { - go uploader(c, container, to_be_uploaded, &uploaderWg) + go Copier(to_be_uploaded, fdst, &copierWg) } log.Printf("Waiting for checks to finish") checkerWg.Wait() close(to_be_uploaded) - log.Printf("Waiting for uploads to finish") - uploaderWg.Wait() + log.Printf("Waiting for transfers to finish") + copierWg.Wait() } -// Get an object to the filepath making directories if necessary -func (fs *FsObject) get(c *swift.Connection, container string) { - log.Printf("Download %s to %s", fs.remote, fs.path) +// Syncs a directory into a container +func upload(c *swift.Connection, args []string) { + root, container := args[0], args[1] + // FIXME + fsrc := &FsLocal{root: root} + fdst := &FsSwift{c: *c, container: container} - dir := path.Dir(fs.path) - err := os.MkdirAll(dir, 0770) - if err != nil { - fs.Debugf("Couldn't make directory: %s", err) - return - } - - out, err := os.Create(fs.path) - if err != nil { - fs.Debugf("Failed to open: %s", err) - return - } - - h, getErr := c.ObjectGet(container, fs.remote, out, true, nil) - if getErr != nil { - fs.Debugf("Failed to download: %s", getErr) - } - - closeErr := out.Close() - if closeErr != nil { - fs.Debugf("Error closing: %s", closeErr) - } - - if getErr != nil || closeErr != nil { - fs.Debugf("Removing failed download") - err = os.Remove(fs.path) - if err != nil { - fs.Debugf("Failed to remove failed download: %s", err) - } - return - } - - // Set the mtime - modTime, err := h.ObjectMetadata().GetModTime() - if err != nil { - fs.Debugf("Failed to read mtime from object: %s", err) - } else { - fs.SetModTime(modTime) - } -} - -// Read FsObjects on in and download them -func downloader(c *swift.Connection, container string, in FsObjectsChan, wg *sync.WaitGroup) { - defer wg.Done() - for fs := range in { - fs.get(c, container) - } + Copy(fsrc, fdst) } // Syncs a container into a directory @@ -363,45 +110,12 @@ func downloader(c *swift.Connection, container string, in FsObjectsChan, wg *syn // FIXME should download and stat many at once func download(c *swift.Connection, args []string) { container, root := args[0], args[1] - // FIXME this would be nice running into a channel! - objects, err := c.ObjectsAll(container, nil) - if err != nil { - log.Fatalf("Couldn't read container %q: %s", container, err) - } - err = os.MkdirAll(root, 0770) - if err != nil { - log.Fatalf("Couldn't make directory %q: %s", root, err) - } - - to_be_checked := make(FsObjectsChan, *checkers) - to_be_downloaded := make(FsObjectsChan, *transfers) - - var checkerWg sync.WaitGroup - checkerWg.Add(*checkers) - for i := 0; i < *checkers; i++ { - go checker(c, container, to_be_checked, to_be_downloaded, false, &checkerWg) - } - - var downloaderWg sync.WaitGroup - downloaderWg.Add(*transfers) - for i := 0; i < *transfers; i++ { - go downloader(c, container, to_be_downloaded, &downloaderWg) - } - - for i := range objects { - object := &objects[i] - filepath := path.Join(root, object.Name) - to_be_checked <- &FsObject{remote: object.Name, path: filepath} - } - close(to_be_checked) - - log.Printf("Waiting for checks to finish") - checkerWg.Wait() - close(to_be_downloaded) - log.Printf("Waiting for downloads to finish") - downloaderWg.Wait() + // FIXME + fsrc := &FsSwift{c: *c, container: container} + fdst := &FsLocal{root: root} + Copy(fsrc, fdst) } // Lists the containers @@ -415,6 +129,23 @@ func listContainers(c *swift.Connection) { } } +// List the Fs to stdout +func List(f Fs) { + // FIXME error? + in := f.List() + for fs := range in { + // FIXME + // if object.PseudoDirectory { + // fmt.Printf("%9s %19s %s\n", "Directory", "-", fs.Remote()) + // } else { + // FIXME ModTime is expensive? + modTime, _ := fs.ModTime() + fmt.Printf("%9d %19s %s\n", fs.Size(), modTime.Format("2006-01-02 15:04:05"), fs.Remote()) + // fmt.Printf("%9d %19s %s\n", fs.Size(), object.LastModified.Format("2006-01-02 15:04:05"), fs.Remote()) + // } + } +} + // Lists files in a container func list(c *swift.Connection, args []string) { if len(args) == 0 { @@ -422,24 +153,26 @@ func list(c *swift.Connection, args []string) { return } container := args[0] - //objects, err := c.ObjectsAll(container, &swift.ObjectsOpts{Prefix: "", Delimiter: '/'}) - objects, err := c.ObjectsAll(container, nil) - if err != nil { - log.Fatalf("Couldn't read container %q: %s", container, err) - } - for _, object := range objects { - if object.PseudoDirectory { - fmt.Printf("%9s %19s %s\n", "Directory", "-", object.Name) - } else { - fmt.Printf("%9d %19s %s\n", object.Bytes, object.LastModified.Format("2006-01-02 15:04:05"), object.Name) - } - } + // FIXME + f := &FsSwift{c: *c, container: container} + List(f) +} + +// Local lists files +func llist(c *swift.Connection, args []string) { + root := args[0] + // FIXME + f := &FsLocal{root: root} + List(f) } // Makes a container func mkdir(c *swift.Connection, args []string) { container := args[0] - err := c.ContainerCreate(container, nil) + // FIXME + fdst := &FsSwift{c: *c, container: container} + + err := fdst.Mkdir() if err != nil { log.Fatalf("Couldn't create container %q: %s", container, err) } @@ -448,7 +181,10 @@ func mkdir(c *swift.Connection, args []string) { // Removes a container func rmdir(c *swift.Connection, args []string) { container := args[0] - err := c.ContainerDelete(container) + // FIXME + fdst := &FsSwift{c: *c, container: container} + + err := fdst.Rmdir() if err != nil { log.Fatalf("Couldn't delete container %q: %s", container, err) } @@ -459,34 +195,27 @@ func rmdir(c *swift.Connection, args []string) { // FIXME should make FsObjects and use debugging func purge(c *swift.Connection, args []string) { container := args[0] - objects, err := c.ObjectsAll(container, nil) - if err != nil { - log.Fatalf("Couldn't read container %q: %s", container, err) - } + // FIXME + fdst := &FsSwift{c: *c, container: container} - to_be_deleted := make(chan *swift.Object, *transfers) + to_be_deleted := fdst.List() var wg sync.WaitGroup wg.Add(*transfers) for i := 0; i < *transfers; i++ { go func() { defer wg.Done() - for object := range to_be_deleted { - err := c.ObjectDelete(container, object.Name) + for dst := range to_be_deleted { + err := dst.Remove() if err != nil { - log.Printf("%s: Couldn't delete: %s\n", object.Name, err) + log.Printf("%s: Couldn't delete: %s\n", dst.Remote(), err) } else { - log.Printf("%s: Deleted\n", object.Name) + log.Printf("%s: Deleted\n", dst.Remote()) } } }() } - for i := range objects { - to_be_deleted <- &objects[i] - } - close(to_be_deleted) - log.Printf("Waiting for deletions to finish") wg.Wait() @@ -543,6 +272,14 @@ var Commands = []Command{ list, 0, 1, }, + { + "lls", + `[] + List the directory +`, + llist, + 1, 1, + }, { "mkdir", `