556 lines
13 KiB
Go
556 lines
13 KiB
Go
// Generic operations on filesystems and objects
|
|
|
|
package fs
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"mime"
|
|
"path"
|
|
"sync"
|
|
)
|
|
|
|
// Work out modify window for fses passed in - sets Config.ModifyWindow
|
|
//
|
|
// This is the largest modify window of all the fses in use, and the
|
|
// user configured value
|
|
func CalculateModifyWindow(fs ...Fs) {
|
|
for _, f := range fs {
|
|
if f != nil {
|
|
precision := f.Precision()
|
|
if precision > Config.ModifyWindow {
|
|
Config.ModifyWindow = precision
|
|
}
|
|
}
|
|
}
|
|
Debug(fs[0], "Modify window is %s\n", Config.ModifyWindow)
|
|
}
|
|
|
|
// Check the two files to see if the MD5sums are the same
|
|
//
|
|
// May return an error which will already have been logged
|
|
//
|
|
// If an error is returned it will return false
|
|
func CheckMd5sums(src, dst Object) (bool, error) {
|
|
srcMd5, err := src.Md5sum()
|
|
if err != nil {
|
|
Stats.Error()
|
|
Log(src, "Failed to calculate src md5: %s", err)
|
|
return false, err
|
|
}
|
|
dstMd5, err := dst.Md5sum()
|
|
if err != nil {
|
|
Stats.Error()
|
|
Log(dst, "Failed to calculate dst md5: %s", err)
|
|
return false, err
|
|
}
|
|
// Debug("Src MD5 %s", srcMd5)
|
|
// Debug("Dst MD5 %s", obj.Hash)
|
|
return srcMd5 == dstMd5, nil
|
|
}
|
|
|
|
// Checks to see if the src and dst objects are equal by looking at
|
|
// size, mtime and MD5SUM
|
|
//
|
|
// If the src and dst size are different then it is considered to be
|
|
// not equal.
|
|
//
|
|
// If the size is the same and the mtime is the same then it is
|
|
// considered to be equal. This is the heuristic rsync uses when
|
|
// not using --checksum.
|
|
//
|
|
// If the size is the same and and mtime is different or unreadable
|
|
// and the MD5SUM is the same then the file is considered to be equal.
|
|
// In this case the mtime on the dst is updated.
|
|
//
|
|
// Otherwise the file is considered to be not equal including if there
|
|
// were errors reading info.
|
|
func Equal(src, dst Object) bool {
|
|
if src.Size() != dst.Size() {
|
|
Debug(src, "Sizes differ")
|
|
return false
|
|
}
|
|
|
|
// Size the same so check the mtime
|
|
srcModTime := src.ModTime()
|
|
dstModTime := dst.ModTime()
|
|
dt := dstModTime.Sub(srcModTime)
|
|
ModifyWindow := Config.ModifyWindow
|
|
if dt >= ModifyWindow || dt <= -ModifyWindow {
|
|
Debug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime)
|
|
} else {
|
|
Debug(src, "Size and modification time the same (differ by %s, within tolerance %s)", dt, ModifyWindow)
|
|
return true
|
|
}
|
|
|
|
// mtime is unreadable or different but size is the same so
|
|
// check the MD5SUM
|
|
same, _ := CheckMd5sums(src, dst)
|
|
if !same {
|
|
Debug(src, "Md5sums differ")
|
|
return false
|
|
}
|
|
|
|
// Size and MD5 the same but mtime different so update the
|
|
// mtime of the dst object here
|
|
dst.SetModTime(srcModTime)
|
|
|
|
Debug(src, "Size and MD5SUM of src and dst objects identical")
|
|
return true
|
|
}
|
|
|
|
// Returns a guess at the mime type from the extension
|
|
func MimeType(o Object) string {
|
|
mimeType := mime.TypeByExtension(path.Ext(o.Remote()))
|
|
if mimeType == "" {
|
|
mimeType = "application/octet-stream"
|
|
}
|
|
return mimeType
|
|
}
|
|
|
|
// Used to remove a failed copy
|
|
func removeFailedCopy(dst Object) {
|
|
if dst != nil {
|
|
Debug(dst, "Removing failed copy")
|
|
removeErr := dst.Remove()
|
|
if removeErr != nil {
|
|
Debug(dst, "Failed to remove failed copy: %s", removeErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Copy src object to dst or f if nil
|
|
//
|
|
// If dst is nil then the object must not exist already. If you do
|
|
// call Copy() with dst nil on a pre-existing file then some filing
|
|
// systems (eg Drive) may duplicate the file.
|
|
func Copy(f Fs, dst, src Object) {
|
|
const maxTries = 10
|
|
tries := 0
|
|
doUpdate := dst != nil
|
|
tryAgain:
|
|
in0, err := src.Open()
|
|
if err != nil {
|
|
Stats.Error()
|
|
Log(src, "Failed to open: %s", err)
|
|
return
|
|
}
|
|
in := NewAccount(in0) // account the transfer
|
|
|
|
var actionTaken string
|
|
if doUpdate {
|
|
actionTaken = "Copied (updated existing)"
|
|
err = dst.Update(in, src.ModTime(), src.Size())
|
|
} else {
|
|
actionTaken = "Copied (new)"
|
|
dst, err = f.Put(in, src.Remote(), src.ModTime(), src.Size())
|
|
}
|
|
inErr := in.Close()
|
|
// Retry if err returned a retry error
|
|
if r, ok := err.(Retry); ok && r.Retry() && tries < maxTries {
|
|
tries++
|
|
Log(src, "Received error: %v - retrying %d/%d", err, tries, maxTries)
|
|
removeFailedCopy(dst)
|
|
goto tryAgain
|
|
}
|
|
if err == nil {
|
|
err = inErr
|
|
}
|
|
if err != nil {
|
|
Stats.Error()
|
|
Log(src, "Failed to copy: %s", err)
|
|
removeFailedCopy(dst)
|
|
return
|
|
}
|
|
|
|
// Verify sizes are the same after transfer
|
|
if src.Size() != dst.Size() {
|
|
Stats.Error()
|
|
err = fmt.Errorf("Corrupted on transfer: sizes differ %d vs %d", src.Size(), dst.Size())
|
|
Log(dst, "%s", err)
|
|
removeFailedCopy(dst)
|
|
return
|
|
}
|
|
|
|
// Verify md5sums are the same after transfer - ignoring blank md5sums
|
|
srcMd5sum, md5sumErr := src.Md5sum()
|
|
if md5sumErr != nil {
|
|
Stats.Error()
|
|
Log(src, "Failed to read md5sum: %s", md5sumErr)
|
|
} else if srcMd5sum != "" {
|
|
dstMd5sum, md5sumErr := dst.Md5sum()
|
|
if md5sumErr != nil {
|
|
Stats.Error()
|
|
Log(dst, "Failed to read md5sum: %s", md5sumErr)
|
|
} else if dstMd5sum != "" && srcMd5sum != dstMd5sum {
|
|
Stats.Error()
|
|
err = fmt.Errorf("Corrupted on transfer: md5sums differ %q vs %q", srcMd5sum, dstMd5sum)
|
|
Log(dst, "%s", err)
|
|
removeFailedCopy(dst)
|
|
return
|
|
}
|
|
}
|
|
|
|
Debug(src, actionTaken)
|
|
}
|
|
|
|
// Check to see if src needs to be copied to dst and if so puts it in out
|
|
func checkOne(pair ObjectPair, out ObjectPairChan) {
|
|
src, dst := pair.src, pair.dst
|
|
if dst == nil {
|
|
Debug(src, "Couldn't find file - need to transfer")
|
|
out <- pair
|
|
return
|
|
}
|
|
// Check to see if can store this
|
|
if !src.Storable() {
|
|
return
|
|
}
|
|
// Check to see if changed or not
|
|
if Equal(src, dst) {
|
|
Debug(src, "Unchanged skipping")
|
|
return
|
|
}
|
|
out <- pair
|
|
}
|
|
|
|
// Read Objects~s on in send to out if they need uploading
|
|
//
|
|
// FIXME potentially doing lots of MD5SUMS at once
|
|
func PairChecker(in ObjectPairChan, out ObjectPairChan, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
for pair := range in {
|
|
src := pair.src
|
|
Stats.Checking(src)
|
|
checkOne(pair, out)
|
|
Stats.DoneChecking(src)
|
|
}
|
|
}
|
|
|
|
// Read Objects on in and copy them
|
|
func Copier(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
for pair := range in {
|
|
src := pair.src
|
|
Stats.Transferring(src)
|
|
if Config.DryRun {
|
|
Debug(src, "Not copying as --dry-run")
|
|
} else {
|
|
Copy(fdst, pair.dst, src)
|
|
}
|
|
Stats.DoneTransferring(src)
|
|
}
|
|
}
|
|
|
|
// Delete all the files passed in the channel
|
|
func DeleteFiles(to_be_deleted ObjectsChan) {
|
|
var wg sync.WaitGroup
|
|
wg.Add(Config.Transfers)
|
|
for i := 0; i < Config.Transfers; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for dst := range to_be_deleted {
|
|
if Config.DryRun {
|
|
Debug(dst, "Not deleting as --dry-run")
|
|
} else {
|
|
Stats.Checking(dst)
|
|
err := dst.Remove()
|
|
Stats.DoneChecking(dst)
|
|
if err != nil {
|
|
Stats.Error()
|
|
Log(dst, "Couldn't delete: %s", err)
|
|
} else {
|
|
Debug(dst, "Deleted")
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
Log(nil, "Waiting for deletions to finish")
|
|
wg.Wait()
|
|
}
|
|
|
|
// Syncs fsrc into fdst
|
|
//
|
|
// If Delete is true then it deletes any files in fdst that aren't in fsrc
|
|
func Sync(fdst, fsrc Fs, Delete bool) error {
|
|
err := fdst.Mkdir()
|
|
if err != nil {
|
|
Stats.Error()
|
|
return err
|
|
}
|
|
|
|
Log(fdst, "Building file list")
|
|
|
|
// Read the destination files first
|
|
// FIXME could do this in parallel and make it use less memory
|
|
delFiles := make(map[string]Object)
|
|
for dst := range fdst.List() {
|
|
delFiles[dst.Remote()] = dst
|
|
}
|
|
|
|
// Read source files checking them off against dest files
|
|
to_be_checked := make(ObjectPairChan, Config.Transfers)
|
|
to_be_uploaded := make(ObjectPairChan, Config.Transfers)
|
|
|
|
var checkerWg sync.WaitGroup
|
|
checkerWg.Add(Config.Checkers)
|
|
for i := 0; i < Config.Checkers; i++ {
|
|
go PairChecker(to_be_checked, to_be_uploaded, &checkerWg)
|
|
}
|
|
|
|
var copierWg sync.WaitGroup
|
|
copierWg.Add(Config.Transfers)
|
|
for i := 0; i < Config.Transfers; i++ {
|
|
go Copier(to_be_uploaded, fdst, &copierWg)
|
|
}
|
|
|
|
go func() {
|
|
for src := range fsrc.List() {
|
|
remote := src.Remote()
|
|
dst, found := delFiles[remote]
|
|
if found {
|
|
delete(delFiles, remote)
|
|
to_be_checked <- ObjectPair{src, dst}
|
|
} else {
|
|
// No need to check since doesn't exist
|
|
to_be_uploaded <- ObjectPair{src, nil}
|
|
}
|
|
}
|
|
close(to_be_checked)
|
|
}()
|
|
|
|
Log(fdst, "Waiting for checks to finish")
|
|
checkerWg.Wait()
|
|
close(to_be_uploaded)
|
|
Log(fdst, "Waiting for transfers to finish")
|
|
copierWg.Wait()
|
|
|
|
// Delete files if asked
|
|
if Delete {
|
|
if Stats.Errored() {
|
|
Log(fdst, "Not deleting files as there were IO errors")
|
|
return nil
|
|
}
|
|
|
|
// Delete the spare files
|
|
toDelete := make(ObjectsChan, Config.Transfers)
|
|
go func() {
|
|
for _, fs := range delFiles {
|
|
toDelete <- fs
|
|
}
|
|
close(toDelete)
|
|
}()
|
|
DeleteFiles(toDelete)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Checks the files in fsrc and fdst according to Size and MD5SUM
|
|
func Check(fdst, fsrc Fs) error {
|
|
Log(fdst, "Building file list")
|
|
|
|
// Read the destination files first
|
|
// FIXME could do this in parallel and make it use less memory
|
|
dstFiles := make(map[string]Object)
|
|
for dst := range fdst.List() {
|
|
dstFiles[dst.Remote()] = dst
|
|
}
|
|
|
|
// Read the source files checking them against dstFiles
|
|
// FIXME could do this in parallel and make it use less memory
|
|
srcFiles := make(map[string]Object)
|
|
commonFiles := make(map[string][]Object)
|
|
for src := range fsrc.List() {
|
|
remote := src.Remote()
|
|
if dst, ok := dstFiles[remote]; ok {
|
|
commonFiles[remote] = []Object{dst, src}
|
|
delete(dstFiles, remote)
|
|
} else {
|
|
srcFiles[remote] = src
|
|
}
|
|
}
|
|
|
|
Log(fdst, "%d files not in %v", len(dstFiles), fsrc)
|
|
for _, dst := range dstFiles {
|
|
Stats.Error()
|
|
Log(dst, "File not in %v", fsrc)
|
|
}
|
|
|
|
Log(fsrc, "%d files not in %s", len(srcFiles), fdst)
|
|
for _, src := range srcFiles {
|
|
Stats.Error()
|
|
Log(src, "File not in %v", fdst)
|
|
}
|
|
|
|
checks := make(chan []Object, Config.Transfers)
|
|
go func() {
|
|
for _, check := range commonFiles {
|
|
checks <- check
|
|
}
|
|
close(checks)
|
|
}()
|
|
|
|
var checkerWg sync.WaitGroup
|
|
checkerWg.Add(Config.Checkers)
|
|
for i := 0; i < Config.Checkers; i++ {
|
|
go func() {
|
|
defer checkerWg.Done()
|
|
for check := range checks {
|
|
dst, src := check[0], check[1]
|
|
Stats.Checking(src)
|
|
if src.Size() != dst.Size() {
|
|
Stats.DoneChecking(src)
|
|
Stats.Error()
|
|
Log(src, "Sizes differ")
|
|
continue
|
|
}
|
|
same, err := CheckMd5sums(src, dst)
|
|
Stats.DoneChecking(src)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if !same {
|
|
Stats.Error()
|
|
Log(src, "Md5sums differ")
|
|
}
|
|
Debug(src, "OK")
|
|
}
|
|
}()
|
|
}
|
|
|
|
Log(fdst, "Waiting for checks to finish")
|
|
checkerWg.Wait()
|
|
Log(fdst, "%d differences found", Stats.GetErrors())
|
|
if Stats.GetErrors() > 0 {
|
|
return fmt.Errorf("%d differences found", Stats.GetErrors())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// List the Fs to the supplied function
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func ListFn(f Fs, fn func(Object)) error {
|
|
in := f.List()
|
|
var wg sync.WaitGroup
|
|
wg.Add(Config.Checkers)
|
|
for i := 0; i < Config.Checkers; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for o := range in {
|
|
fn(o)
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// mutex for synchronized output
|
|
var outMutex sync.Mutex
|
|
|
|
// Synchronized fmt.Fprintf
|
|
func syncFprintf(w io.Writer, format string, a ...interface{}) (n int, err error) {
|
|
outMutex.Lock()
|
|
defer outMutex.Unlock()
|
|
return fmt.Fprintf(w, format, a...)
|
|
}
|
|
|
|
// List the Fs to stdout
|
|
//
|
|
// Shows size and path
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func List(f Fs, w io.Writer) error {
|
|
return ListFn(f, func(o Object) {
|
|
syncFprintf(w, "%9d %s\n", o.Size(), o.Remote())
|
|
})
|
|
}
|
|
|
|
// List the Fs to stdout
|
|
//
|
|
// Shows size, mod time and path
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func ListLong(f Fs, w io.Writer) error {
|
|
return ListFn(f, func(o Object) {
|
|
Stats.Checking(o)
|
|
modTime := o.ModTime()
|
|
Stats.DoneChecking(o)
|
|
syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.000000000"), o.Remote())
|
|
})
|
|
}
|
|
|
|
// List the Fs to stdout
|
|
//
|
|
// Produces the same output as the md5sum command
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func Md5sum(f Fs, w io.Writer) error {
|
|
return ListFn(f, func(o Object) {
|
|
Stats.Checking(o)
|
|
md5sum, err := o.Md5sum()
|
|
Stats.DoneChecking(o)
|
|
if err != nil {
|
|
Debug(o, "Failed to read MD5: %v", err)
|
|
md5sum = "UNKNOWN"
|
|
}
|
|
syncFprintf(w, "%32s %s\n", md5sum, o.Remote())
|
|
})
|
|
}
|
|
|
|
// List the directories/buckets/containers in the Fs to stdout
|
|
func ListDir(f Fs, w io.Writer) error {
|
|
for dir := range f.ListDir() {
|
|
syncFprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Makes a destination directory or container
|
|
func Mkdir(f Fs) error {
|
|
err := f.Mkdir()
|
|
if err != nil {
|
|
Stats.Error()
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Removes a container but not if not empty
|
|
func Rmdir(f Fs) error {
|
|
if Config.DryRun {
|
|
Log(f, "Not deleting as dry run is set")
|
|
} else {
|
|
err := f.Rmdir()
|
|
if err != nil {
|
|
Stats.Error()
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Removes a container and all of its contents
|
|
//
|
|
// FIXME doesn't delete local directories
|
|
func Purge(f Fs) error {
|
|
var err error
|
|
if purger, ok := f.(Purger); ok {
|
|
if Config.DryRun {
|
|
Debug(f, "Not purging as --dry-run set")
|
|
} else {
|
|
err = purger.Purge()
|
|
}
|
|
} else {
|
|
// DeleteFiles and Rmdir observe --dry-run
|
|
DeleteFiles(f.List())
|
|
err = Rmdir(f)
|
|
}
|
|
if err != nil {
|
|
Stats.Error()
|
|
return err
|
|
}
|
|
return nil
|
|
}
|