Implement Precision interface in all filesystems
* drive: add mime type on upload * drive: correct info on upload to fix crash * local: measure time precision by making a local file and using Chtimes * swift: move snet parameter here * core: add -modify-window and compute the optimum one * core: use modify window when checking times
This commit is contained in:
parent
b41367856b
commit
f8246b5a7e
7 changed files with 127 additions and 11 deletions
10
fs.go
10
fs.go
|
@ -32,6 +32,9 @@ type Fs interface {
|
|||
|
||||
// Remove the directory (container, bucket) if empty
|
||||
Rmdir() error
|
||||
|
||||
// Precision of the ModTimes in this Fs
|
||||
Precision() time.Duration
|
||||
}
|
||||
|
||||
// FIXME make f.Debugf...
|
||||
|
@ -168,10 +171,11 @@ func Equal(src, dst FsObject) bool {
|
|||
// Size the same so check the mtime
|
||||
srcModTime := src.ModTime()
|
||||
dstModTime := dst.ModTime()
|
||||
if !dstModTime.Equal(srcModTime) {
|
||||
FsDebug(src, "Modification times differ: %v, %v", srcModTime, dstModTime)
|
||||
dt := dstModTime.Sub(srcModTime)
|
||||
if dt >= *modifyWindow || dt <= -*modifyWindow {
|
||||
FsDebug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime)
|
||||
} else {
|
||||
FsDebug(src, "Size and modification time the same")
|
||||
FsDebug(src, "Size and modification time differ by %s (within %s)", dt, *modifyWindow)
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
25
fs_drive.go
25
fs_drive.go
|
@ -1,8 +1,15 @@
|
|||
// Drive interface
|
||||
package main
|
||||
|
||||
// FIXME drive code is leaking goroutines somehow
|
||||
|
||||
// FIXME use recursive listing not bound to directory for speed?
|
||||
|
||||
// FIXME list containers equivalent should list directories?
|
||||
|
||||
// FIXME list directory should list to channel for concurrency not
|
||||
// append to array
|
||||
|
||||
// FIXME drive times only accurate to 1 ms (3 decimal places)
|
||||
|
||||
// FIXME perhaps have a drive setup mode where we ask for all the
|
||||
|
@ -24,8 +31,10 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"mime"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -458,12 +467,18 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64
|
|||
return nil, fmt.Errorf("Couldn't find or make directory: %s", err)
|
||||
}
|
||||
|
||||
// Guess the mime type
|
||||
mimeType := mime.TypeByExtension(path.Ext(remote))
|
||||
if mimeType == "" {
|
||||
mimeType = "application/octet-stream"
|
||||
}
|
||||
|
||||
// Define the metadata for the file we are going to create.
|
||||
info := &drive.File{
|
||||
Title: leaf,
|
||||
Description: leaf,
|
||||
Parents: []*drive.ParentReference{{Id: directoryId}},
|
||||
// FIXME set mimeType:
|
||||
MimeType: mimeType,
|
||||
}
|
||||
|
||||
// FIXME can't set modified date on initial upload as no
|
||||
|
@ -483,12 +498,13 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("Upload failed: %s", err)
|
||||
}
|
||||
fs.info = info
|
||||
|
||||
// Set modified date
|
||||
info.ModifiedDate = modTime.Format(time.RFC3339Nano)
|
||||
_, err = f.svc.Files.Update(info.Id, info).SetModifiedDate(true).Do()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to set mtime: %s", err)
|
||||
return fs, fmt.Errorf("Failed to set mtime: %s", err)
|
||||
}
|
||||
return fs, nil
|
||||
}
|
||||
|
@ -523,6 +539,11 @@ func (f *FsDrive) Rmdir() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Return the precision
|
||||
func (fs *FsDrive) Precision() time.Duration {
|
||||
return time.Millisecond
|
||||
}
|
||||
|
||||
// Purge deletes all the files and the container
|
||||
//
|
||||
// Returns an error if it isn't empty
|
||||
|
|
63
fs_local.go
63
fs_local.go
|
@ -5,16 +5,20 @@ import (
|
|||
"crypto/md5"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// FsLocal represents a local filesystem rooted at root
|
||||
type FsLocal struct {
|
||||
root string
|
||||
root string // The root directory
|
||||
precisionOk sync.Once // Whether we need to read the precision
|
||||
precision time.Duration // precision of local filesystem
|
||||
}
|
||||
|
||||
// FsObjectLocal represents a local filesystem object
|
||||
|
@ -144,6 +148,63 @@ func (f *FsLocal) Rmdir() error {
|
|||
return os.Remove(f.root)
|
||||
}
|
||||
|
||||
// Return the precision
|
||||
func (f *FsLocal) Precision() (precision time.Duration) {
|
||||
f.precisionOk.Do(func() {
|
||||
f.precision = f.readPrecision()
|
||||
})
|
||||
return f.precision
|
||||
}
|
||||
|
||||
// Read the precision
|
||||
func (f *FsLocal) readPrecision() (precision time.Duration) {
|
||||
// Default precision of 1s
|
||||
precision = time.Second
|
||||
|
||||
// Create temporary file and test it
|
||||
fd, err := ioutil.TempFile("", "swiftsync")
|
||||
if err != nil {
|
||||
// If failed return 1s
|
||||
// fmt.Println("Failed to create temp file", err)
|
||||
return time.Second
|
||||
}
|
||||
path := fd.Name()
|
||||
// fmt.Println("Created temp file", path)
|
||||
fd.Close()
|
||||
|
||||
// Delete it on return
|
||||
defer func() {
|
||||
// fmt.Println("Remove temp file")
|
||||
os.Remove(path)
|
||||
}()
|
||||
|
||||
// Find the minimum duration we can detect
|
||||
for duration := time.Duration(1); duration < time.Second; duration *= 10 {
|
||||
// Current time with delta
|
||||
t := time.Unix(time.Now().Unix(), int64(duration))
|
||||
err := Chtimes(path, t, t)
|
||||
if err != nil {
|
||||
// fmt.Println("Failed to Chtimes", err)
|
||||
break
|
||||
}
|
||||
|
||||
// Read the actual time back
|
||||
fi, err := os.Stat(path)
|
||||
if err != nil {
|
||||
// fmt.Println("Failed to Stat", err)
|
||||
break
|
||||
}
|
||||
|
||||
// If it matches - have found the precision
|
||||
// fmt.Println("compare", fi.ModTime(), t)
|
||||
if fi.ModTime() == t {
|
||||
// fmt.Println("Precision detected as", duration)
|
||||
return duration
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
// Return the remote path
|
||||
|
|
5
fs_s3.go
5
fs_s3.go
|
@ -244,6 +244,11 @@ func (f *FsS3) Rmdir() error {
|
|||
return f.b.DelBucket()
|
||||
}
|
||||
|
||||
// Return the precision
|
||||
func (fs *FsS3) Precision() time.Duration {
|
||||
return time.Nanosecond
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
// Return the remote path
|
||||
|
|
|
@ -39,6 +39,7 @@ var (
|
|||
authUrl = flag.String("auth", os.Getenv("ST_AUTH"), "Auth URL for server. Defaults to environment var ST_AUTH.")
|
||||
userName = flag.String("user", os.Getenv("ST_USER"), "User name. Defaults to environment var ST_USER.")
|
||||
apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.")
|
||||
snet = flag.Bool("snet", false, "Use internal service network") // FIXME not implemented
|
||||
)
|
||||
|
||||
// String converts this FsSwift to a string
|
||||
|
@ -200,6 +201,11 @@ func (f *FsSwift) Rmdir() error {
|
|||
return f.c.ContainerDelete(f.container)
|
||||
}
|
||||
|
||||
// Return the precision
|
||||
func (fs *FsSwift) Precision() time.Duration {
|
||||
return time.Nanosecond
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
// Return the remote path
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
Todo
|
||||
* Make a test suite which can run on all the given types of fs
|
||||
* Copy should use the sync code as it is more efficient at directory listing
|
||||
* Drive needs a modify window of 1ms
|
||||
* Factor fses into own packages
|
||||
* FIXME: ls without an argument for buckets/containers?
|
||||
* FIXME: More -dry-run checks for object transfer
|
||||
|
@ -19,8 +18,8 @@ Todo
|
|||
* Check the locking in swift module!
|
||||
* Windows paths? Do we need to translate / and \?
|
||||
* Make a fs.Errorf and count errors and log them at a different level
|
||||
* add -modify-window flag - fs should keep knowledge of resolution
|
||||
* Add max object size to fs metadata - 5GB for swift, infinite for local, ? for s3
|
||||
* tie into -max-size flag
|
||||
|
||||
Drive
|
||||
* Do we need the secrets or just the code? If just the code then
|
||||
|
@ -33,11 +32,13 @@ Drive
|
|||
* It receives the token back
|
||||
* It displays the token to the user to paste in to the code
|
||||
* Should be https really
|
||||
|
||||
* Sometimes get: Failed to copy: Upload failed: googleapi: Error 403: Rate Limit Exceeded
|
||||
* quota is 100.0 requests/second/user
|
||||
|
||||
Ideas
|
||||
* could do encryption - put IV into metadata?
|
||||
* optimise remote copy container to another container using remote
|
||||
copy if local is same as remote
|
||||
copy if local is same as remote - use an optional Copier interface
|
||||
* Allow subpaths container:/sub/path
|
||||
* look at auth from env in s3 module - add to swift?
|
||||
* support
|
||||
|
|
20
swiftsync.go
20
swiftsync.go
|
@ -19,13 +19,13 @@ import (
|
|||
var (
|
||||
// Flags
|
||||
cpuprofile = flag.String("cpuprofile", "", "Write cpu profile to file")
|
||||
snet = flag.Bool("snet", false, "Use internal service network") // FIXME not implemented
|
||||
verbose = flag.Bool("verbose", false, "Print lots more stuff")
|
||||
quiet = flag.Bool("quiet", false, "Print as little stuff as possible")
|
||||
dry_run = flag.Bool("dry-run", false, "Do a trial run with no permanent changes")
|
||||
checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.")
|
||||
transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.")
|
||||
statsInterval = flag.Duration("stats", time.Minute*1, "Interval to print stats")
|
||||
modifyWindow = flag.Duration("modify-window", time.Nanosecond, "Max time diff to be considered the same")
|
||||
)
|
||||
|
||||
// A pair of FsObjects
|
||||
|
@ -566,6 +566,23 @@ func main() {
|
|||
fsrc, fdst = fdst, fsrc
|
||||
}
|
||||
|
||||
// Work out modify window
|
||||
if fsrc != nil {
|
||||
precision := fsrc.Precision()
|
||||
log.Printf("Source precision %s\n", precision)
|
||||
if precision > *modifyWindow {
|
||||
*modifyWindow = precision
|
||||
}
|
||||
}
|
||||
if fdst != nil {
|
||||
precision := fdst.Precision()
|
||||
log.Printf("Destination precision %s\n", precision)
|
||||
if precision > *modifyWindow {
|
||||
*modifyWindow = precision
|
||||
}
|
||||
}
|
||||
log.Printf("Modify window is %s\n", *modifyWindow)
|
||||
|
||||
// Print the stats every statsInterval
|
||||
go func() {
|
||||
ch := time.Tick(*statsInterval)
|
||||
|
@ -579,6 +596,7 @@ func main() {
|
|||
if found.run != nil {
|
||||
found.run(fdst, fsrc)
|
||||
fmt.Println(stats)
|
||||
log.Printf("*** Go routines at exit %d\n", runtime.NumGoroutine())
|
||||
if stats.errors > 0 {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue