Factor the generic code into fs and add some more intefaces

This commit is contained in:
Nick Craig-Wood 2014-03-28 17:56:04 +00:00
parent bc221fb27e
commit 92ec29fe3f
8 changed files with 630 additions and 504 deletions

View file

@ -760,6 +760,19 @@ func (f *FsDrive) Purge() error {
// ------------------------------------------------------------ // ------------------------------------------------------------
// Return the parent Fs
func (o *FsObjectDrive) Fs() fs.Fs {
return o.drive
}
// Return a string version
func (o *FsObjectDrive) String() string {
if o == nil {
return "<nil>"
}
return o.remote
}
// Return the remote path // Return the remote path
func (o *FsObjectDrive) Remote() string { func (o *FsObjectDrive) Remote() string {
return o.remote return o.remote

View file

@ -1,4 +1,5 @@
// Read and write the config file // Read, write and edit the config file
package fs package fs
import ( import (
@ -38,12 +39,14 @@ var (
checkers = pflag.IntP("checkers", "", 8, "Number of checkers to run in parallel.") checkers = pflag.IntP("checkers", "", 8, "Number of checkers to run in parallel.")
transfers = pflag.IntP("transfers", "", 4, "Number of file transfers to run in parallel.") transfers = pflag.IntP("transfers", "", 4, "Number of file transfers to run in parallel.")
configFile = pflag.StringP("config", "", ConfigPath, "Config file.") configFile = pflag.StringP("config", "", ConfigPath, "Config file.")
dryRun = pflag.BoolP("dry-run", "n", false, "Do a trial run with no permanent changes")
) )
// Filesystem config options // Filesystem config options
type ConfigInfo struct { type ConfigInfo struct {
Verbose bool Verbose bool
Quiet bool Quiet bool
DryRun bool
ModifyWindow time.Duration ModifyWindow time.Duration
Checkers int Checkers int
Transfers int Transfers int
@ -67,6 +70,7 @@ func LoadConfig() {
// FIXME read these from the config file too // FIXME read these from the config file too
Config.Verbose = *verbose Config.Verbose = *verbose
Config.Quiet = *quiet Config.Quiet = *quiet
Config.Quiet = *dryRun
Config.ModifyWindow = *modifyWindow Config.ModifyWindow = *modifyWindow
Config.Checkers = *checkers Config.Checkers = *checkers
Config.Transfers = *transfers Config.Transfers = *transfers
@ -230,6 +234,22 @@ func RemoteConfig(name string) {
} }
} }
// Choose an option
func ChooseOption(o *Option) string {
fmt.Println(o.Help)
if len(o.Examples) > 0 {
var values []string
var help []string
for _, example := range o.Examples {
values = append(values, example.Value)
help = append(help, example.Help)
}
return Choose(o.Name, values, help, true)
}
fmt.Printf("%s> ", o.Name)
return ReadLine()
}
// Make a new remote // Make a new remote
func NewRemote(name string) { func NewRemote(name string) {
fmt.Printf("What type of source is it?\n") fmt.Printf("What type of source is it?\n")
@ -244,7 +264,7 @@ func NewRemote(name string) {
log.Fatalf("Failed to find fs: %v", err) log.Fatalf("Failed to find fs: %v", err)
} }
for _, option := range fs.Options { for _, option := range fs.Options {
ConfigFile.SetValue(name, option.Name, option.Choose()) ConfigFile.SetValue(name, option.Name, ChooseOption(&option))
} }
RemoteConfig(name) RemoteConfig(name)
if OkRemote(name) { if OkRemote(name) {

178
fs/fs.go
View file

@ -38,22 +38,6 @@ type OptionExample struct {
Help string Help string
} }
// Choose an option
func (o *Option) Choose() string {
fmt.Println(o.Help)
if len(o.Examples) > 0 {
var values []string
var help []string
for _, example := range o.Examples {
values = append(values, example.Value)
help = append(help, example.Help)
}
return Choose(o.Name, values, help, true)
}
fmt.Printf("%s> ", o.Name)
return ReadLine()
}
// Register a filesystem // Register a filesystem
// //
// Fs modules should use this in an init() function // Fs modules should use this in an init() function
@ -92,11 +76,15 @@ type Fs interface {
Precision() time.Duration Precision() time.Duration
} }
// FIXME make f.Debugf...
// A filesystem like object which can either be a remote object or a // A filesystem like object which can either be a remote object or a
// local file/directory // local file/directory
type Object interface { type Object interface {
// String returns a description of the Object
String() string
// Fs returns the Fs that this object is part of
Fs() Fs
// Remote returns the remote path // Remote returns the remote path
Remote() string Remote() string
@ -137,6 +125,14 @@ type ObjectsChan chan Object
// A slice of Objects // A slice of Objects
type Objects []Object type Objects []Object
// A pair of Objects
type ObjectPair struct {
src, dst Object
}
// A channel of ObjectPair
type ObjectPairChan chan ObjectPair
// A structure of directory/container/bucket lists // A structure of directory/container/bucket lists
type Dir struct { type Dir struct {
Name string // name of the directory Name string // name of the directory
@ -186,19 +182,27 @@ func NewFs(path string) (Fs, error) {
return fs.NewFs(configName, fsPath) return fs.NewFs(configName, fsPath)
} }
// Write debuging output for this Object // Outputs log for object
func Debug(fs Object, text string, args ...interface{}) { func OutputLog(o interface{}, text string, args ...interface{}) {
description := ""
if x, ok := o.(fmt.Stringer); ok {
description = x.String() + ": "
}
out := fmt.Sprintf(text, args...)
log.Print(description + out)
}
// Write debuging output for this Object or Fs
func Debug(o interface{}, text string, args ...interface{}) {
if Config.Verbose { if Config.Verbose {
out := fmt.Sprintf(text, args...) OutputLog(o, text, args...)
log.Printf("%s: %s", fs.Remote(), out)
} }
} }
// Write log output for this Object // Write log output for this Object or Fs
func Log(fs Object, text string, args ...interface{}) { func Log(o interface{}, text string, args ...interface{}) {
if !Config.Quiet { if !Config.Quiet {
out := fmt.Sprintf(text, args...) OutputLog(o, text, args...)
log.Printf("%s: %s", fs.Remote(), out)
} }
} }
@ -210,125 +214,3 @@ func checkClose(c io.Closer, err *error) {
*err = cerr *err = cerr
} }
} }
// Work out modify window for fses passed in - sets Config.ModifyWindow
//
// This is the largest modify window of all the fses in use, and the
// user configured value
func CalculateModifyWindow(fs ...Fs) {
for _, f := range fs {
if f != nil {
precision := f.Precision()
if precision > Config.ModifyWindow {
Config.ModifyWindow = precision
}
}
}
if Config.Verbose {
log.Printf("Modify window is %s\n", Config.ModifyWindow)
}
}
// Check the two files to see if the MD5sums are the same
//
// May return an error which will already have been logged
//
// If an error is returned it will return false
func CheckMd5sums(src, dst Object) (bool, error) {
srcMd5, err := src.Md5sum()
if err != nil {
Stats.Error()
Log(src, "Failed to calculate src md5: %s", err)
return false, err
}
dstMd5, err := dst.Md5sum()
if err != nil {
Stats.Error()
Log(dst, "Failed to calculate dst md5: %s", err)
return false, err
}
// Debug("Src MD5 %s", srcMd5)
// Debug("Dst MD5 %s", obj.Hash)
return srcMd5 == dstMd5, nil
}
// Checks to see if the src and dst objects are equal by looking at
// size, mtime and MD5SUM
//
// If the src and dst size are different then it is considered to be
// not equal.
//
// If the size is the same and the mtime is the same then it is
// considered to be equal. This is the heuristic rsync uses when
// not using --checksum.
//
// If the size is the same and and mtime is different or unreadable
// and the MD5SUM is the same then the file is considered to be equal.
// In this case the mtime on the dst is updated.
//
// Otherwise the file is considered to be not equal including if there
// were errors reading info.
func Equal(src, dst Object) bool {
if src.Size() != dst.Size() {
Debug(src, "Sizes differ")
return false
}
// Size the same so check the mtime
srcModTime := src.ModTime()
dstModTime := dst.ModTime()
dt := dstModTime.Sub(srcModTime)
ModifyWindow := Config.ModifyWindow
if dt >= ModifyWindow || dt <= -ModifyWindow {
Debug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime)
} else {
Debug(src, "Size and modification time differ by %s (within %s)", dt, ModifyWindow)
return true
}
// mtime is unreadable or different but size is the same so
// check the MD5SUM
same, _ := CheckMd5sums(src, dst)
if !same {
Debug(src, "Md5sums differ")
return false
}
// Size and MD5 the same but mtime different so update the
// mtime of the dst object here
dst.SetModTime(srcModTime)
Debug(src, "Size and MD5SUM of src and dst objects identical")
return true
}
// Copy src object to f
func Copy(f Fs, src Object) {
in0, err := src.Open()
if err != nil {
Stats.Error()
Log(src, "Failed to open: %s", err)
return
}
in := NewAccount(in0) // account the transfer
dst, err := f.Put(in, src.Remote(), src.ModTime(), src.Size())
inErr := in.Close()
if err == nil {
err = inErr
}
if err != nil {
Stats.Error()
Log(src, "Failed to copy: %s", err)
if dst != nil {
Debug(dst, "Removing failed copy")
removeErr := dst.Remove()
if removeErr != nil {
Stats.Error()
Log(dst, "Failed to remove failed copy: %s", removeErr)
}
}
return
}
Debug(src, "Copied")
}

472
fs/operations.go Normal file
View file

@ -0,0 +1,472 @@
// Generic operations on filesystems and objects
package fs
import (
"fmt"
"log"
"sync"
)
// Work out modify window for fses passed in - sets Config.ModifyWindow
//
// This is the largest modify window of all the fses in use, and the
// user configured value
func CalculateModifyWindow(fs ...Fs) {
for _, f := range fs {
if f != nil {
precision := f.Precision()
if precision > Config.ModifyWindow {
Config.ModifyWindow = precision
}
}
}
Debug(fs[0], "Modify window is %s\n", Config.ModifyWindow)
}
// Check the two files to see if the MD5sums are the same
//
// May return an error which will already have been logged
//
// If an error is returned it will return false
func CheckMd5sums(src, dst Object) (bool, error) {
srcMd5, err := src.Md5sum()
if err != nil {
Stats.Error()
Log(src, "Failed to calculate src md5: %s", err)
return false, err
}
dstMd5, err := dst.Md5sum()
if err != nil {
Stats.Error()
Log(dst, "Failed to calculate dst md5: %s", err)
return false, err
}
// Debug("Src MD5 %s", srcMd5)
// Debug("Dst MD5 %s", obj.Hash)
return srcMd5 == dstMd5, nil
}
// Checks to see if the src and dst objects are equal by looking at
// size, mtime and MD5SUM
//
// If the src and dst size are different then it is considered to be
// not equal.
//
// If the size is the same and the mtime is the same then it is
// considered to be equal. This is the heuristic rsync uses when
// not using --checksum.
//
// If the size is the same and and mtime is different or unreadable
// and the MD5SUM is the same then the file is considered to be equal.
// In this case the mtime on the dst is updated.
//
// Otherwise the file is considered to be not equal including if there
// were errors reading info.
func Equal(src, dst Object) bool {
if src.Size() != dst.Size() {
Debug(src, "Sizes differ")
return false
}
// Size the same so check the mtime
srcModTime := src.ModTime()
dstModTime := dst.ModTime()
dt := dstModTime.Sub(srcModTime)
ModifyWindow := Config.ModifyWindow
if dt >= ModifyWindow || dt <= -ModifyWindow {
Debug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime)
} else {
Debug(src, "Size and modification time differ by %s (within %s)", dt, ModifyWindow)
return true
}
// mtime is unreadable or different but size is the same so
// check the MD5SUM
same, _ := CheckMd5sums(src, dst)
if !same {
Debug(src, "Md5sums differ")
return false
}
// Size and MD5 the same but mtime different so update the
// mtime of the dst object here
dst.SetModTime(srcModTime)
Debug(src, "Size and MD5SUM of src and dst objects identical")
return true
}
// Copy src object to f
func Copy(f Fs, src Object) {
in0, err := src.Open()
if err != nil {
Stats.Error()
Log(src, "Failed to open: %s", err)
return
}
in := NewAccount(in0) // account the transfer
dst, err := f.Put(in, src.Remote(), src.ModTime(), src.Size())
inErr := in.Close()
if err == nil {
err = inErr
}
if err != nil {
Stats.Error()
Log(src, "Failed to copy: %s", err)
if dst != nil {
Debug(dst, "Removing failed copy")
removeErr := dst.Remove()
if removeErr != nil {
Stats.Error()
Log(dst, "Failed to remove failed copy: %s", removeErr)
}
}
return
}
Debug(src, "Copied")
}
// Check to see if src needs to be copied to dst and if so puts it in out
func checkOne(src, dst Object, out ObjectsChan) {
if dst == nil {
Debug(src, "Couldn't find local file - download")
out <- src
return
}
// Check to see if can store this
if !src.Storable() {
return
}
// Check to see if changed or not
if Equal(src, dst) {
Debug(src, "Unchanged skipping")
return
}
out <- src
}
// Read FsObjects~s on in send to out if they need uploading
//
// FIXME potentially doing lots of MD5SUMS at once
func PairChecker(in ObjectPairChan, out ObjectsChan, wg *sync.WaitGroup) {
defer wg.Done()
for pair := range in {
src := pair.src
Stats.Checking(src)
checkOne(src, pair.dst, out)
Stats.DoneChecking(src)
}
}
// Read FsObjects~s on in send to out if they need uploading
//
// FIXME potentially doing lots of MD5SUMS at once
func Checker(in, out ObjectsChan, fdst Fs, wg *sync.WaitGroup) {
defer wg.Done()
for src := range in {
Stats.Checking(src)
dst := fdst.NewFsObject(src.Remote())
checkOne(src, dst, out)
Stats.DoneChecking(src)
}
}
// Read FsObjects on in and copy them
func Copier(in ObjectsChan, fdst Fs, wg *sync.WaitGroup) {
defer wg.Done()
for src := range in {
Stats.Transferring(src)
Copy(fdst, src)
Stats.DoneTransferring(src)
}
}
// Copies fsrc into fdst
func CopyFs(fdst, fsrc Fs) error {
err := fdst.Mkdir()
if err != nil {
Stats.Error()
return err
}
to_be_checked := fsrc.List()
to_be_uploaded := make(ObjectsChan, Config.Transfers)
var checkerWg sync.WaitGroup
checkerWg.Add(Config.Checkers)
for i := 0; i < Config.Checkers; i++ {
go Checker(to_be_checked, to_be_uploaded, fdst, &checkerWg)
}
var copierWg sync.WaitGroup
copierWg.Add(Config.Transfers)
for i := 0; i < Config.Transfers; i++ {
go Copier(to_be_uploaded, fdst, &copierWg)
}
Log(fdst, "Waiting for checks to finish")
checkerWg.Wait()
close(to_be_uploaded)
Log(fdst, "Waiting for transfers to finish")
copierWg.Wait()
return nil
}
// Delete all the files passed in the channel
func DeleteFiles(to_be_deleted ObjectsChan) {
var wg sync.WaitGroup
wg.Add(Config.Transfers)
var fs Fs
for i := 0; i < Config.Transfers; i++ {
go func() {
defer wg.Done()
for dst := range to_be_deleted {
fs = dst.Fs()
if Config.DryRun {
Debug(dst, "Not deleting as --dry-run")
} else {
Stats.Checking(dst)
err := dst.Remove()
Stats.DoneChecking(dst)
if err != nil {
Stats.Error()
Log(dst, "Couldn't delete: %s", err)
} else {
Debug(dst, "Deleted")
}
}
}
}()
}
Log(fs, "Waiting for deletions to finish")
wg.Wait()
}
// Syncs fsrc into fdst
func Sync(fdst, fsrc Fs) error {
err := fdst.Mkdir()
if err != nil {
Stats.Error()
return err
}
Log(fdst, "Building file list")
// Read the destination files first
// FIXME could do this in parallel and make it use less memory
delFiles := make(map[string]Object)
for dst := range fdst.List() {
delFiles[dst.Remote()] = dst
}
// Read source files checking them off against dest files
to_be_checked := make(ObjectPairChan, Config.Transfers)
to_be_uploaded := make(ObjectsChan, Config.Transfers)
var checkerWg sync.WaitGroup
checkerWg.Add(Config.Checkers)
for i := 0; i < Config.Checkers; i++ {
go PairChecker(to_be_checked, to_be_uploaded, &checkerWg)
}
var copierWg sync.WaitGroup
copierWg.Add(Config.Transfers)
for i := 0; i < Config.Transfers; i++ {
go Copier(to_be_uploaded, fdst, &copierWg)
}
go func() {
for src := range fsrc.List() {
remote := src.Remote()
dst, found := delFiles[remote]
if found {
delete(delFiles, remote)
to_be_checked <- ObjectPair{src, dst}
} else {
// No need to check doesn't exist
to_be_uploaded <- src
}
}
close(to_be_checked)
}()
Log(fdst, "Waiting for checks to finish")
checkerWg.Wait()
close(to_be_uploaded)
Log(fdst, "Waiting for transfers to finish")
copierWg.Wait()
if Stats.Errored() {
Log(fdst, "Not deleting files as there were IO errors")
return nil
}
// Delete the spare files
toDelete := make(ObjectsChan, Config.Transfers)
go func() {
for _, fs := range delFiles {
toDelete <- fs
}
close(toDelete)
}()
DeleteFiles(toDelete)
return nil
}
// Checks the files in fsrc and fdst according to Size and MD5SUM
func Check(fdst, fsrc Fs) error {
Log(fdst, "Building file list")
// Read the destination files first
// FIXME could do this in parallel and make it use less memory
dstFiles := make(map[string]Object)
for dst := range fdst.List() {
dstFiles[dst.Remote()] = dst
}
// Read the source files checking them against dstFiles
// FIXME could do this in parallel and make it use less memory
srcFiles := make(map[string]Object)
commonFiles := make(map[string][]Object)
for src := range fsrc.List() {
remote := src.Remote()
if dst, ok := dstFiles[remote]; ok {
commonFiles[remote] = []Object{dst, src}
delete(dstFiles, remote)
} else {
srcFiles[remote] = src
}
}
Log(fdst, "%d files not in %v", len(dstFiles), fsrc)
for _, dst := range dstFiles {
Stats.Error()
Log(dst, "File not in %v", fsrc)
}
Log(fsrc, "%d files not in %s", len(srcFiles), fdst)
for _, src := range srcFiles {
Stats.Error()
Log(src, "File not in %v", fdst)
}
checks := make(chan []Object, Config.Transfers)
go func() {
for _, check := range commonFiles {
checks <- check
}
close(checks)
}()
var checkerWg sync.WaitGroup
checkerWg.Add(Config.Checkers)
for i := 0; i < Config.Checkers; i++ {
go func() {
defer checkerWg.Done()
for check := range checks {
dst, src := check[0], check[1]
Stats.Checking(src)
if src.Size() != dst.Size() {
Stats.DoneChecking(src)
Stats.Error()
Log(src, "Sizes differ")
continue
}
same, err := CheckMd5sums(src, dst)
Stats.DoneChecking(src)
if err != nil {
continue
}
if !same {
Stats.Error()
Log(src, "Md5sums differ")
}
Debug(src, "OK")
}
}()
}
Log(fdst, "Waiting for checks to finish")
checkerWg.Wait()
Log(fdst, "%d differences found", Stats.GetErrors())
if Stats.GetErrors() > 0 {
return fmt.Errorf("%d differences found", Stats.GetErrors())
}
return nil
}
// List the Fs to stdout
//
// Lists in parallel which may get them out of order
func List(f Fs) error {
in := f.List()
var wg sync.WaitGroup
wg.Add(Config.Checkers)
for i := 0; i < Config.Checkers; i++ {
go func() {
defer wg.Done()
for o := range in {
Stats.Checking(o)
modTime := o.ModTime()
Stats.DoneChecking(o)
fmt.Printf("%9d %19s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.00000000"), o.Remote())
}
}()
}
wg.Wait()
return nil
}
// List the directories/buckets/containers in the Fs to stdout
func ListDir(f Fs) error {
for dir := range f.ListDir() {
fmt.Printf("%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
}
return nil
}
// Makes a destination directory or container
func Mkdir(f Fs) error {
err := f.Mkdir()
if err != nil {
Stats.Error()
return err
}
return nil
}
// Removes a container but not if not empty
func Rmdir(f Fs) error {
if Config.DryRun {
Log(f, "Not deleting as dry run is set")
} else {
err := f.Rmdir()
if err != nil {
Stats.Error()
return err
}
}
return nil
}
// Removes a container and all of its contents
//
// FIXME doesn't delete local directories
func Purge(f Fs) error {
if purger, ok := f.(Purger); ok {
err := purger.Purge()
if err != nil {
Stats.Error()
return err
}
} else {
DeleteFiles(f.List())
log.Printf("Deleting path")
Rmdir(f)
}
return nil
}

View file

@ -33,6 +33,7 @@ type FsLocal struct {
// FsObjectLocal represents a local filesystem object // FsObjectLocal represents a local filesystem object
type FsObjectLocal struct { type FsObjectLocal struct {
local fs.Fs // The Fs this object is part of
remote string // The remote path remote string // The remote path
path string // The local path path string // The local path
info os.FileInfo // Interface for file info info os.FileInfo // Interface for file info
@ -57,7 +58,7 @@ func (f *FsLocal) String() string {
// May return nil if an error occurred // May return nil if an error occurred
func (f *FsLocal) NewFsObjectWithInfo(remote string, info os.FileInfo) fs.Object { func (f *FsLocal) NewFsObjectWithInfo(remote string, info os.FileInfo) fs.Object {
path := filepath.Join(f.root, remote) path := filepath.Join(f.root, remote)
o := &FsObjectLocal{remote: remote, path: path} o := &FsObjectLocal{local: f, remote: remote, path: path}
if info != nil { if info != nil {
o.info = info o.info = info
} else { } else {
@ -162,7 +163,7 @@ func (f *FsLocal) ListDir() fs.DirChan {
func (f *FsLocal) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.Object, error) { func (f *FsLocal) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.Object, error) {
dstPath := filepath.Join(f.root, remote) dstPath := filepath.Join(f.root, remote)
// Temporary FsObject under construction // Temporary FsObject under construction
fs := &FsObjectLocal{remote: remote, path: dstPath} fs := &FsObjectLocal{local: f, remote: remote, path: dstPath}
dir := path.Dir(dstPath) dir := path.Dir(dstPath)
err := os.MkdirAll(dir, 0770) err := os.MkdirAll(dir, 0770)
@ -260,6 +261,19 @@ func (f *FsLocal) readPrecision() (precision time.Duration) {
// ------------------------------------------------------------ // ------------------------------------------------------------
// Return the parent Fs
func (o *FsObjectLocal) Fs() fs.Fs {
return o.local
}
// Return a string version
func (o *FsObjectLocal) String() string {
if o == nil {
return "<nil>"
}
return o.remote
}
// Return the remote path // Return the remote path
func (o *FsObjectLocal) Remote() string { func (o *FsObjectLocal) Remote() string {
return o.remote return o.remote

403
rclone.go
View file

@ -10,7 +10,6 @@ import (
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"strings" "strings"
"sync"
"time" "time"
"github.com/ogier/pflag" "github.com/ogier/pflag"
@ -27,351 +26,9 @@ import (
var ( var (
// Flags // Flags
cpuprofile = pflag.StringP("cpuprofile", "", "", "Write cpu profile to file") cpuprofile = pflag.StringP("cpuprofile", "", "", "Write cpu profile to file")
dry_run = pflag.BoolP("dry-run", "n", false, "Do a trial run with no permanent changes")
statsInterval = pflag.DurationP("stats", "", time.Minute*1, "Interval to print stats") statsInterval = pflag.DurationP("stats", "", time.Minute*1, "Interval to print stats")
) )
// A pair of fs.Objects
type PairFsObjects struct {
src, dst fs.Object
}
type PairFsObjectsChan chan PairFsObjects
// Check to see if src needs to be copied to dst and if so puts it in out
func checkOne(src, dst fs.Object, out fs.ObjectsChan) {
if dst == nil {
fs.Debug(src, "Couldn't find local file - download")
out <- src
return
}
// Check to see if can store this
if !src.Storable() {
return
}
// Check to see if changed or not
if fs.Equal(src, dst) {
fs.Debug(src, "Unchanged skipping")
return
}
out <- src
}
// Read FsObjects~s on in send to out if they need uploading
//
// FIXME potentially doing lots of MD5SUMS at once
func PairChecker(in PairFsObjectsChan, out fs.ObjectsChan, wg *sync.WaitGroup) {
defer wg.Done()
for pair := range in {
src := pair.src
fs.Stats.Checking(src)
checkOne(src, pair.dst, out)
fs.Stats.DoneChecking(src)
}
}
// Read FsObjects~s on in send to out if they need uploading
//
// FIXME potentially doing lots of MD5SUMS at once
func Checker(in, out fs.ObjectsChan, fdst fs.Fs, wg *sync.WaitGroup) {
defer wg.Done()
for src := range in {
fs.Stats.Checking(src)
dst := fdst.NewFsObject(src.Remote())
checkOne(src, dst, out)
fs.Stats.DoneChecking(src)
}
}
// Read FsObjects on in and copy them
func Copier(in fs.ObjectsChan, fdst fs.Fs, wg *sync.WaitGroup) {
defer wg.Done()
for src := range in {
fs.Stats.Transferring(src)
fs.Copy(fdst, src)
fs.Stats.DoneTransferring(src)
}
}
// Copies fsrc into fdst
func CopyFs(fdst, fsrc fs.Fs) {
err := fdst.Mkdir()
if err != nil {
fs.Stats.Error()
log.Fatal("Failed to make destination")
}
to_be_checked := fsrc.List()
to_be_uploaded := make(fs.ObjectsChan, fs.Config.Transfers)
var checkerWg sync.WaitGroup
checkerWg.Add(fs.Config.Checkers)
for i := 0; i < fs.Config.Checkers; i++ {
go Checker(to_be_checked, to_be_uploaded, fdst, &checkerWg)
}
var copierWg sync.WaitGroup
copierWg.Add(fs.Config.Transfers)
for i := 0; i < fs.Config.Transfers; i++ {
go Copier(to_be_uploaded, fdst, &copierWg)
}
log.Printf("Waiting for checks to finish")
checkerWg.Wait()
close(to_be_uploaded)
log.Printf("Waiting for transfers to finish")
copierWg.Wait()
}
// Delete all the files passed in the channel
func DeleteFiles(to_be_deleted fs.ObjectsChan) {
var wg sync.WaitGroup
wg.Add(fs.Config.Transfers)
for i := 0; i < fs.Config.Transfers; i++ {
go func() {
defer wg.Done()
for dst := range to_be_deleted {
if *dry_run {
fs.Debug(dst, "Not deleting as --dry-run")
} else {
fs.Stats.Checking(dst)
err := dst.Remove()
fs.Stats.DoneChecking(dst)
if err != nil {
fs.Stats.Error()
fs.Log(dst, "Couldn't delete: %s", err)
} else {
fs.Debug(dst, "Deleted")
}
}
}
}()
}
log.Printf("Waiting for deletions to finish")
wg.Wait()
}
// Syncs fsrc into fdst
func Sync(fdst, fsrc fs.Fs) {
err := fdst.Mkdir()
if err != nil {
fs.Stats.Error()
log.Fatal("Failed to make destination")
}
log.Printf("Building file list")
// Read the destination files first
// FIXME could do this in parallel and make it use less memory
delFiles := make(map[string]fs.Object)
for dst := range fdst.List() {
delFiles[dst.Remote()] = dst
}
// Read source files checking them off against dest files
to_be_checked := make(PairFsObjectsChan, fs.Config.Transfers)
to_be_uploaded := make(fs.ObjectsChan, fs.Config.Transfers)
var checkerWg sync.WaitGroup
checkerWg.Add(fs.Config.Checkers)
for i := 0; i < fs.Config.Checkers; i++ {
go PairChecker(to_be_checked, to_be_uploaded, &checkerWg)
}
var copierWg sync.WaitGroup
copierWg.Add(fs.Config.Transfers)
for i := 0; i < fs.Config.Transfers; i++ {
go Copier(to_be_uploaded, fdst, &copierWg)
}
go func() {
for src := range fsrc.List() {
remote := src.Remote()
dst, found := delFiles[remote]
if found {
delete(delFiles, remote)
to_be_checked <- PairFsObjects{src, dst}
} else {
// No need to check doesn't exist
to_be_uploaded <- src
}
}
close(to_be_checked)
}()
log.Printf("Waiting for checks to finish")
checkerWg.Wait()
close(to_be_uploaded)
log.Printf("Waiting for transfers to finish")
copierWg.Wait()
if fs.Stats.Errored() {
log.Printf("Not deleting files as there were IO errors")
return
}
// Delete the spare files
toDelete := make(fs.ObjectsChan, fs.Config.Transfers)
go func() {
for _, fs := range delFiles {
toDelete <- fs
}
close(toDelete)
}()
DeleteFiles(toDelete)
}
// Checks the files in fsrc and fdst according to Size and MD5SUM
func Check(fdst, fsrc fs.Fs) {
log.Printf("Building file list")
// Read the destination files first
// FIXME could do this in parallel and make it use less memory
dstFiles := make(map[string]fs.Object)
for dst := range fdst.List() {
dstFiles[dst.Remote()] = dst
}
// Read the source files checking them against dstFiles
// FIXME could do this in parallel and make it use less memory
srcFiles := make(map[string]fs.Object)
commonFiles := make(map[string][]fs.Object)
for src := range fsrc.List() {
remote := src.Remote()
if dst, ok := dstFiles[remote]; ok {
commonFiles[remote] = []fs.Object{dst, src}
delete(dstFiles, remote)
} else {
srcFiles[remote] = src
}
}
log.Printf("Files in %s but not in %s", fdst, fsrc)
for remote := range dstFiles {
fs.Stats.Error()
log.Printf(remote)
}
log.Printf("Files in %s but not in %s", fsrc, fdst)
for remote := range srcFiles {
fs.Stats.Error()
log.Printf(remote)
}
checks := make(chan []fs.Object, fs.Config.Transfers)
go func() {
for _, check := range commonFiles {
checks <- check
}
close(checks)
}()
var checkerWg sync.WaitGroup
checkerWg.Add(fs.Config.Checkers)
for i := 0; i < fs.Config.Checkers; i++ {
go func() {
defer checkerWg.Done()
for check := range checks {
dst, src := check[0], check[1]
fs.Stats.Checking(src)
if src.Size() != dst.Size() {
fs.Stats.DoneChecking(src)
fs.Stats.Error()
fs.Log(src, "Sizes differ")
continue
}
same, err := fs.CheckMd5sums(src, dst)
fs.Stats.DoneChecking(src)
if err != nil {
continue
}
if !same {
fs.Stats.Error()
fs.Log(src, "Md5sums differ")
}
fs.Debug(src, "OK")
}
}()
}
log.Printf("Waiting for checks to finish")
checkerWg.Wait()
log.Printf("%d differences found", fs.Stats.GetErrors())
}
// List the Fs to stdout
//
// Lists in parallel which may get them out of order
func List(f, _ fs.Fs) {
in := f.List()
var wg sync.WaitGroup
wg.Add(fs.Config.Checkers)
for i := 0; i < fs.Config.Checkers; i++ {
go func() {
defer wg.Done()
for o := range in {
fs.Stats.Checking(o)
modTime := o.ModTime()
fs.Stats.DoneChecking(o)
fmt.Printf("%9d %19s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.00000000"), o.Remote())
}
}()
}
wg.Wait()
}
// List the directories/buckets/containers in the Fs to stdout
func ListDir(f, _ fs.Fs) {
for dir := range f.ListDir() {
fmt.Printf("%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
}
}
// Makes a destination directory or container
func mkdir(fdst, fsrc fs.Fs) {
err := fdst.Mkdir()
if err != nil {
fs.Stats.Error()
log.Fatalf("Mkdir failed: %s", err)
}
}
// Removes a container but not if not empty
func rmdir(fdst, fsrc fs.Fs) {
if *dry_run {
log.Printf("Not deleting %s as --dry-run", fdst)
} else {
err := fdst.Rmdir()
if err != nil {
fs.Stats.Error()
log.Fatalf("Rmdir failed: %s", err)
}
}
}
// Removes a container and all of its contents
//
// FIXME doesn't delete local directories
func purge(fdst, fsrc fs.Fs) {
if f, ok := fdst.(fs.Purger); ok {
err := f.Purge()
if err != nil {
fs.Stats.Error()
log.Fatalf("Purge failed: %s", err)
}
} else {
DeleteFiles(fdst.List())
log.Printf("Deleting path")
rmdir(fdst, fsrc)
}
}
// Edits the config file
func EditConfig(fdst, fsrc fs.Fs) {
fs.EditConfig()
}
type Command struct { type Command struct {
Name string Name string
Help string Help string
@ -403,7 +60,12 @@ var Commands = []Command{
Copy the source to the destination. Doesn't transfer Copy the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by unchanged files, testing first by modification time then by
MD5SUM. Doesn't delete files from the destination.`, MD5SUM. Doesn't delete files from the destination.`,
Run: CopyFs, Run: func(fdst, fsrc fs.Fs) {
err := fs.CopyFs(fdst, fsrc)
if err != nil {
log.Fatalf("Failed to copy: %v", err)
}
},
MinArgs: 2, MinArgs: 2,
MaxArgs: 2, MaxArgs: 2,
}, },
@ -416,7 +78,12 @@ var Commands = []Command{
MD5SUM. Deletes any files that exist in source that don't MD5SUM. Deletes any files that exist in source that don't
exist in destination. Since this can cause data loss, test exist in destination. Since this can cause data loss, test
first with the --dry-run flag.`, first with the --dry-run flag.`,
Run: Sync, Run: func(fdst, fsrc fs.Fs) {
err := fs.Sync(fdst, fsrc)
if err != nil {
log.Fatalf("Failed to sync: %v", err)
}
},
MinArgs: 2, MinArgs: 2,
MaxArgs: 2, MaxArgs: 2,
}, },
@ -425,7 +92,12 @@ var Commands = []Command{
ArgsHelp: "[remote://path]", ArgsHelp: "[remote://path]",
Help: ` Help: `
List all the objects in the the path.`, List all the objects in the the path.`,
Run: List, Run: func(fdst, fsrc fs.Fs) {
err := fs.List(fdst)
if err != nil {
log.Fatalf("Failed to list: %v", err)
}
},
MinArgs: 1, MinArgs: 1,
MaxArgs: 1, MaxArgs: 1,
}, },
@ -434,7 +106,12 @@ var Commands = []Command{
ArgsHelp: "[remote://path]", ArgsHelp: "[remote://path]",
Help: ` Help: `
List all directoryes/objects/buckets in the the path.`, List all directoryes/objects/buckets in the the path.`,
Run: ListDir, Run: func(fdst, fsrc fs.Fs) {
err := fs.ListDir(fdst)
if err != nil {
log.Fatalf("Failed to listdir: %v", err)
}
},
MinArgs: 1, MinArgs: 1,
MaxArgs: 1, MaxArgs: 1,
}, },
@ -443,7 +120,12 @@ var Commands = []Command{
ArgsHelp: "remote://path", ArgsHelp: "remote://path",
Help: ` Help: `
Make the path if it doesn't already exist`, Make the path if it doesn't already exist`,
Run: mkdir, Run: func(fdst, fsrc fs.Fs) {
err := fs.Mkdir(fdst)
if err != nil {
log.Fatalf("Failed to mkdir: %v", err)
}
},
MinArgs: 1, MinArgs: 1,
MaxArgs: 1, MaxArgs: 1,
}, },
@ -453,7 +135,12 @@ var Commands = []Command{
Help: ` Help: `
Remove the path. Note that you can't remove a path with Remove the path. Note that you can't remove a path with
objects in it, use purge for that.`, objects in it, use purge for that.`,
Run: rmdir, Run: func(fdst, fsrc fs.Fs) {
err := fs.Rmdir(fdst)
if err != nil {
log.Fatalf("Failed to rmdir: %v", err)
}
},
MinArgs: 1, MinArgs: 1,
MaxArgs: 1, MaxArgs: 1,
}, },
@ -462,7 +149,12 @@ var Commands = []Command{
ArgsHelp: "remote://path", ArgsHelp: "remote://path",
Help: ` Help: `
Remove the path and all of its contents.`, Remove the path and all of its contents.`,
Run: purge, Run: func(fdst, fsrc fs.Fs) {
err := fs.Purge(fdst)
if err != nil {
log.Fatalf("Failed to purge: %v", err)
}
},
MinArgs: 1, MinArgs: 1,
MaxArgs: 1, MaxArgs: 1,
}, },
@ -473,7 +165,12 @@ var Commands = []Command{
Checks the files in the source and destination match. It Checks the files in the source and destination match. It
compares sizes and MD5SUMs and prints a report of files which compares sizes and MD5SUMs and prints a report of files which
don't match. It doesn't alter the source or destination.`, don't match. It doesn't alter the source or destination.`,
Run: Check, Run: func(fdst, fsrc fs.Fs) {
err := fs.Check(fdst, fsrc)
if err != nil {
log.Fatalf("Failed to check: %v", err)
}
},
MinArgs: 2, MinArgs: 2,
MaxArgs: 2, MaxArgs: 2,
}, },
@ -481,7 +178,9 @@ var Commands = []Command{
Name: "config", Name: "config",
Help: ` Help: `
Enter an interactive configuration session.`, Enter an interactive configuration session.`,
Run: EditConfig, Run: func(fdst, fsrc fs.Fs) {
fs.EditConfig()
},
NoStats: true, NoStats: true,
}, },
{ {

View file

@ -331,6 +331,19 @@ func (f *FsS3) Precision() time.Duration {
// ------------------------------------------------------------ // ------------------------------------------------------------
// Return the parent Fs
func (o *FsObjectS3) Fs() fs.Fs {
return o.s3
}
// Return a string version
func (o *FsObjectS3) String() string {
if o == nil {
return "<nil>"
}
return o.remote
}
// Return the remote path // Return the remote path
func (o *FsObjectS3) Remote() string { func (o *FsObjectS3) Remote() string {
return o.remote return o.remote

View file

@ -245,6 +245,19 @@ func (fs *FsSwift) Precision() time.Duration {
// ------------------------------------------------------------ // ------------------------------------------------------------
// Return the parent Fs
func (o *FsObjectSwift) Fs() fs.Fs {
return o.swift
}
// Return a string version
func (o *FsObjectSwift) String() string {
if o == nil {
return "<nil>"
}
return o.remote
}
// Return the remote path // Return the remote path
func (o *FsObjectSwift) Remote() string { func (o *FsObjectSwift) Remote() string {
return o.remote return o.remote