diff --git a/fs.go b/fs.go index 8415f4795..cdc33fb8e 100644 --- a/fs.go +++ b/fs.go @@ -3,12 +3,15 @@ package main import ( + "fmt" "io" + "log" "time" ) // A Filesystem, describes the local filesystem and the remote object store type Fs interface { + String() string List() FsObjectsChan NewFsObject(remote string) FsObject Put(src FsObject) @@ -22,7 +25,6 @@ type Fs interface { // local file/directory type FsObject interface { Remote() string - Debugf(string, ...interface{}) Md5sum() (string, error) ModTime() (time.Time, error) SetModTime(time.Time) @@ -47,6 +49,22 @@ func NewFs(path string) (Fs, error) { return NewFsLocal(path) } +// Write debuging output for this FsObject +func FsDebug(fs FsObject, text string, args ...interface{}) { + if *verbose { + out := fmt.Sprintf(text, args...) + log.Printf("%s: %s", fs.Remote(), out) + } +} + +// Write log output for this FsObject +func FsLog(fs FsObject, text string, args ...interface{}) { + if !*quiet { + out := fmt.Sprintf(text, args...) + log.Printf("%s: %s", fs.Remote(), out) + } +} + // checkClose is a utility function used to check the return from // Close in a defer statement. func checkClose(c io.Closer, err *error) { @@ -74,22 +92,22 @@ func checkClose(c io.Closer, err *error) { // were errors reading info. func Equal(src, dst FsObject) bool { if src.Size() != dst.Size() { - src.Debugf("Sizes differ") + FsDebug(src, "Sizes differ") return false } // Size the same so check the mtime srcModTime, err := src.ModTime() if err != nil { - src.Debugf("Failed to read src mtime: %s", err) + FsDebug(src, "Failed to read src mtime: %s", err) } else { dstModTime, err := dst.ModTime() if err != nil { - dst.Debugf("Failed to read dst mtime: %s", err) + FsDebug(dst, "Failed to read dst mtime: %s", err) } else if !dstModTime.Equal(srcModTime) { - src.Debugf("Modification times differ") + FsDebug(src, "Modification times differ") } else { - src.Debugf("Size and modification time the same") + FsDebug(src, "Size and modification time the same") return true } } @@ -98,18 +116,18 @@ func Equal(src, dst FsObject) bool { // check the MD5SUM srcMd5, err := src.Md5sum() if err != nil { - src.Debugf("Failed to calculate src md5: %s", err) + FsDebug(src, "Failed to calculate src md5: %s", err) return false } dstMd5, err := dst.Md5sum() if err != nil { - dst.Debugf("Failed to calculate dst md5: %s", err) + FsDebug(dst, "Failed to calculate dst md5: %s", err) return false } - // fs.Debugf("Src MD5 %s", srcMd5) - // fs.Debugf("Dst MD5 %s", obj.Hash) + // FsDebug("Src MD5 %s", srcMd5) + // FsDebug("Dst MD5 %s", obj.Hash) if srcMd5 != dstMd5 { - src.Debugf("Md5sums differ") + FsDebug(src, "Md5sums differ") return false } @@ -117,6 +135,6 @@ func Equal(src, dst FsObject) bool { // mtime of the dst object here dst.SetModTime(srcModTime) - src.Debugf("Size and MD5SUM of src and dst objects identical") + FsDebug(src, "Size and MD5SUM of src and dst objects identical") return true } diff --git a/fs_local.go b/fs_local.go index 84f0440bd..7d36924e9 100644 --- a/fs_local.go +++ b/fs_local.go @@ -33,6 +33,11 @@ func NewFsLocal(root string) (*FsLocal, error) { return f, nil } +// String converts this FsLocal to a string +func (f *FsLocal) String() string { + return fmt.Sprintf("Local file system at %s", f.root) +} + // Return an FsObject from a path // // May return nil if an error occurred @@ -109,13 +114,13 @@ func (f *FsLocal) Put(src FsObject) { dir := path.Dir(dstPath) err := os.MkdirAll(dir, 0770) if err != nil { - fs.Debugf("Couldn't make directory: %s", err) + FsLog(fs, "Couldn't make directory: %s", err) return } out, err := os.Create(dstPath) if err != nil { - fs.Debugf("Failed to open: %s", err) + FsLog(fs, "Failed to open: %s", err) return } @@ -123,31 +128,31 @@ func (f *FsLocal) Put(src FsObject) { defer func() { checkClose(out, &err) if err != nil { - fs.Debugf("Removing failed download") + FsDebug(fs, "Removing failed download") removeErr := os.Remove(dstPath) if removeErr != nil { - fs.Debugf("Failed to remove failed download: %s", err) + FsLog(fs, "Failed to remove failed download: %s", err) } } }() in, err := src.Open() if err != nil { - fs.Debugf("Failed to open: %s", err) + FsLog(fs, "Failed to open: %s", err) return } defer checkClose(in, &err) _, err = io.Copy(out, in) if err != nil { - fs.Debugf("Failed to download: %s", err) + FsLog(fs, "Failed to download: %s", err) return } // Set the mtime modTime, err := src.ModTime() if err != nil { - fs.Debugf("Failed to read mtime from object: %s", err) + FsDebug(fs, "Failed to read mtime from object: %s", err) } else { fs.SetModTime(modTime) } @@ -172,24 +177,18 @@ func (fs *FsObjectLocal) Remote() string { return fs.remote } -// Write debuging output for this FsObject -func (fs *FsObjectLocal) Debugf(text string, args ...interface{}) { - out := fmt.Sprintf(text, args...) - log.Printf("%s: %s", fs.remote, out) -} - // Md5sum calculates the Md5sum of a file returning a lowercase hex string func (fs *FsObjectLocal) Md5sum() (string, error) { in, err := os.Open(fs.path) if err != nil { - fs.Debugf("Failed to open: %s", err) + FsLog(fs, "Failed to open: %s", err) return "", err } defer in.Close() // FIXME ignoring error hash := md5.New() _, err = io.Copy(hash, in) if err != nil { - fs.Debugf("Failed to read: %s", err) + FsLog(fs, "Failed to read: %s", err) return "", err } return fmt.Sprintf("%x", hash.Sum(nil)), nil @@ -209,7 +208,7 @@ func (fs *FsObjectLocal) ModTime() (modTime time.Time, err error) { func (fs *FsObjectLocal) SetModTime(modTime time.Time) { err := Chtimes(fs.path, modTime, modTime) if err != nil { - fs.Debugf("Failed to set mtime on file: %s", err) + FsDebug(fs, "Failed to set mtime on file: %s", err) } } @@ -217,11 +216,10 @@ func (fs *FsObjectLocal) SetModTime(modTime time.Time) { func (fs *FsObjectLocal) Storable() bool { mode := fs.info.Mode() if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 { - fs.Debugf("Can't transfer non file/directory") + FsDebug(fs, "Can't transfer non file/directory") return false } else if mode&os.ModeDir != 0 { - // Debug? - fs.Debugf("FIXME Skipping directory") + FsDebug(fs, "FIXME Skipping directory") return false } return true diff --git a/fs_swift.go b/fs_swift.go index 2b8776e65..91883403d 100644 --- a/fs_swift.go +++ b/fs_swift.go @@ -41,6 +41,11 @@ var ( apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.") ) +// String converts this FsSwift to a string +func (f *FsSwift) String() string { + return fmt.Sprintf("Swift container %s", f.container) +} + // Pattern to match a swift url var swiftMatch = regexp.MustCompile(`^([^/:]+):(.*)$`) @@ -125,7 +130,7 @@ func (f *FsSwift) NewFsObjectWithInfo(remote string, info *swift.Object) FsObjec } else { err := fs.readMetaData() // reads info and meta, returning an error if err != nil { - // logged already fs.Debugf("Failed to read info: %s", err) + // logged already FsDebug("Failed to read info: %s", err) return nil } } @@ -171,7 +176,7 @@ func (f *FsSwift) Put(src FsObject) { // FIXME content type in, err := src.Open() if err != nil { - fs.Debugf("Failed to open: %s", err) + FsLog(fs, "Failed to open: %s", err) return } defer in.Close() @@ -180,17 +185,17 @@ func (f *FsSwift) Put(src FsObject) { m := swift.Metadata{} modTime, err := src.ModTime() if err != nil { - fs.Debugf("Failed to read mtime from object: %s", err) + FsDebug(fs, "Failed to read mtime from object: %s", err) } else { m.SetModTime(modTime) } _, err = fs.swift.c.ObjectPut(fs.swift.container, fs.remote, in, true, "", "", m.ObjectHeaders()) if err != nil { - fs.Debugf("Failed to upload: %s", err) + FsLog(fs, "Failed to upload: %s", err) return } - fs.Debugf("Uploaded") + FsDebug(fs, "Uploaded") } // Mkdir creates the container if it doesn't exist @@ -212,12 +217,6 @@ func (fs *FsObjectSwift) Remote() string { return fs.remote } -// Write debuging output for this FsObject -func (fs *FsObjectSwift) Debugf(text string, args ...interface{}) { - out := fmt.Sprintf(text, args...) - log.Printf("%s: %s", fs.remote, out) -} - // Md5sum returns the Md5sum of an object returning a lowercase hex string func (fs *FsObjectSwift) Md5sum() (string, error) { return strings.ToLower(fs.info.Hash), nil @@ -237,7 +236,7 @@ func (fs *FsObjectSwift) readMetaData() (err error) { } info, h, err := fs.swift.c.Object(fs.swift.container, fs.remote) if err != nil { - fs.Debugf("Failed to read info: %s", err) + FsLog(fs, "Failed to read info: %s", err) return err } meta := h.ObjectMetadata() @@ -250,12 +249,12 @@ func (fs *FsObjectSwift) readMetaData() (err error) { func (fs *FsObjectSwift) ModTime() (modTime time.Time, err error) { err = fs.readMetaData() if err != nil { - fs.Debugf("Failed to read metadata: %s", err) + FsLog(fs, "Failed to read metadata: %s", err) return } modTime, err = fs.meta.GetModTime() if err != nil { - fs.Debugf("Failed to read mtime from object: %s", err) + FsLog(fs, "Failed to read mtime from object: %s", err) return } return @@ -265,13 +264,13 @@ func (fs *FsObjectSwift) ModTime() (modTime time.Time, err error) { func (fs *FsObjectSwift) SetModTime(modTime time.Time) { err := fs.readMetaData() if err != nil { - fs.Debugf("Failed to read metadata: %s", err) + FsLog(fs, "Failed to read metadata: %s", err) return } fs.meta.SetModTime(modTime) err = fs.swift.c.ObjectUpdate(fs.swift.container, fs.remote, fs.meta.ObjectHeaders()) if err != nil { - fs.Debugf("Failed to update remote mtime: %s", err) + FsLog(fs, "Failed to update remote mtime: %s", err) } } diff --git a/notes.txt b/notes.txt index d3c1f3b2d..c93cd0adc 100644 --- a/notes.txt +++ b/notes.txt @@ -1,17 +1,18 @@ Todo - * Add sync command (like rsync with delete) * Check logging in various parts - * Make logging controllable with flags + * Make logging controllable with flags (mostly done) * progress meter would be nice! Do this by wrapping the Reader with a progress bar * Do bandwidth limit by wrapping the Reader too * Maybe using https://jra-go.googlecode.com/hg/linkio/ which will work for multiple uploads or downloads. * code.google.com/p/mxk/go1/flowcontrol - only does one flow at once * Or maybe put into swift library. - * Make swift timeouts be settable with command line parameters + * -timeout: Make all timeouts be settable with command line parameters * Check the locking in swift module! * Windows paths? Do we need to translate / and \? * Make a fs.Errorf and count errors and log them at a different level + * add -modify-window flag - fs should keep knowledge of resolution + * add check command to compare local MD5SUMs with remote Ideas * optimise remote copy container to another container using remote diff --git a/swiftsync.go b/swiftsync.go index 1daffc4fe..3da76f94e 100644 --- a/swiftsync.go +++ b/swiftsync.go @@ -21,6 +21,7 @@ var ( snet = flag.Bool("snet", false, "Use internal service network") // FIXME not implemented verbose = flag.Bool("verbose", false, "Print lots more stuff") quiet = flag.Bool("quiet", false, "Print as little stuff as possible") + dry_run = flag.Bool("dry-run", false, "Do a trial run with no permanent changes") checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.") transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.") ) @@ -33,7 +34,7 @@ func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) { for src := range in { dst := fdst.NewFsObject(src.Remote()) if dst == nil { - src.Debugf("Couldn't find local file - download") + FsDebug(src, "Couldn't find local file - download") out <- src continue } @@ -44,7 +45,7 @@ func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) { } // Check to see if changed or not if Equal(src, dst) { - src.Debugf("Unchanged skipping") + FsDebug(src, "Unchanged skipping") continue } out <- src @@ -88,9 +89,87 @@ func Copy(fdst, fsrc Fs) { copierWg.Wait() } -// Copy~s from source to dest -func copy_(fdst, fsrc Fs) { - Copy(fdst, fsrc) +// Delete all the files passed in the channel +func DeleteFiles(to_be_deleted FsObjectsChan) { + var wg sync.WaitGroup + wg.Add(*transfers) + for i := 0; i < *transfers; i++ { + go func() { + defer wg.Done() + for dst := range to_be_deleted { + if *dry_run { + FsDebug(dst, "Not deleting as -dry-run") + } else { + err := dst.Remove() + if err != nil { + FsLog(dst, "Couldn't delete: %s", err) + } else { + FsDebug(dst, "Deleted") + } + } + } + }() + } + + log.Printf("Waiting for deletions to finish") + wg.Wait() +} + +// Syncs fsrc into fdst +func Sync(fdst, fsrc Fs) { + err := fdst.Mkdir() + if err != nil { + log.Fatal("Failed to make destination") + } + + // Read the destination files first + // FIXME could do this in parallel and make it use less memory + delFiles := make(map[string]FsObject) + for dstFile := range fdst.List() { + delFiles[dstFile.Remote()] = dstFile + } + + // Read source files checking them off against dest files + to_be_checked := make(FsObjectsChan, *transfers) + go func() { + for srcFile := range fsrc.List() { + delete(delFiles, srcFile.Remote()) + to_be_checked <- srcFile + } + close(to_be_checked) + }() + + to_be_uploaded := make(FsObjectsChan, *transfers) + + var checkerWg sync.WaitGroup + checkerWg.Add(*checkers) + for i := 0; i < *checkers; i++ { + go Checker(to_be_checked, to_be_uploaded, fdst, &checkerWg) + } + + var copierWg sync.WaitGroup + copierWg.Add(*transfers) + for i := 0; i < *transfers; i++ { + go Copier(to_be_uploaded, fdst, &copierWg) + } + + log.Printf("Waiting for checks to finish") + checkerWg.Wait() + close(to_be_uploaded) + log.Printf("Waiting for transfers to finish") + copierWg.Wait() + + // FIXME don't delete if IO errors + + // Delete the spare files + toDelete := make(FsObjectsChan, *transfers) + go func() { + for _, fs := range delFiles { + toDelete <- fs + } + close(toDelete) + }() + DeleteFiles(toDelete) } // List the Fs to stdout @@ -129,9 +208,13 @@ func mkdir(fdst, fsrc Fs) { // Removes a container but not if not empty func rmdir(fdst, fsrc Fs) { - err := fdst.Rmdir() - if err != nil { - log.Fatalf("Rmdir failed: %s", err) + if *dry_run { + log.Printf("Not deleting %s as -dry-run", fdst) + } else { + err := fdst.Rmdir() + if err != nil { + log.Fatalf("Rmdir failed: %s", err) + } } } @@ -139,27 +222,7 @@ func rmdir(fdst, fsrc Fs) { // // FIXME doesn't delete local directories func purge(fdst, fsrc Fs) { - to_be_deleted := fdst.List() - - var wg sync.WaitGroup - wg.Add(*transfers) - for i := 0; i < *transfers; i++ { - go func() { - defer wg.Done() - for dst := range to_be_deleted { - err := dst.Remove() - if err != nil { - log.Printf("%s: Couldn't delete: %s\n", dst.Remote(), err) - } else { - log.Printf("%s: Deleted\n", dst.Remote()) - } - } - }() - } - - log.Printf("Waiting for deletions to finish") - wg.Wait() - + DeleteFiles(fdst.List()) log.Printf("Deleting path") rmdir(fdst, fsrc) } @@ -194,7 +257,21 @@ var Commands = []Command{ MD5SUM. Doesn't delete files from the destination. `, - copy_, + Copy, + 2, 2, + }, + { + "sync", + ` + + Sync the source to the destination. Doesn't transfer + unchanged files, testing first by modification time then by + MD5SUM. Deletes any files that exist in source that don't + exist in destination. Since this can cause data loss, test + first with the -dry-run flag. + +`, + Sync, 2, 2, }, {