Make checking and uploading run in parallel using channels

Runs much faster and uses much less memory (thanks to walking the
filesystem into a channel too).
This commit is contained in:
Nick Craig-Wood 2012-11-29 22:13:58 +00:00
parent ba24b84396
commit 4d23e29274
2 changed files with 126 additions and 59 deletions

View file

@ -16,6 +16,10 @@ Need an iterate all objects routine... Could use a channel
FIXME progress meter would be nice! Do this by wrapping the Reader with a progress bar FIXME progress meter would be nice! Do this by wrapping the Reader with a progress bar
Do bandwidth limit by wrapping the Reader too Do bandwidth limit by wrapping the Reader too
Maybe using https://jra-go.googlecode.com/hg/linkio/ which will work for multiple
uploads or downloads.
* code.google.com/p/mxk/go1/flowcontrol - only does one flow at once
Or maybe put into swift library.
Could have an integrity check mode where we check the MD5sums of the local vs the remote Could have an integrity check mode where we check the MD5sums of the local vs the remote
@ -23,4 +27,15 @@ Some stats would be nice!
Windows paths? Do we need to translate / and \? Windows paths? Do we need to translate / and \?
Make swift timeouts be settable Make swift timeouts be settable with command line parameters
Add bandwidth limit?
Make a wrapper in connection which
* measures bandwidth and reports it
* limits bandwidth using Reader and Writer
* for a pool of all individual connectinos
* does timeouts by setting a limit, seeing whether io has happened
and resetting it if it has
Check the locking in swift module!

View file

@ -12,8 +12,10 @@ import (
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"runtime/pprof" "runtime/pprof"
"strings" "strings"
"sync"
) )
// Globals // Globals
@ -32,6 +34,8 @@ var (
authUrl = flag.String("auth", os.Getenv("ST_AUTH"), "Auth URL for server. Defaults to environment var ST_AUTH.") authUrl = flag.String("auth", os.Getenv("ST_AUTH"), "Auth URL for server. Defaults to environment var ST_AUTH.")
userName = flag.String("user", os.Getenv("ST_USER"), "User name. Defaults to environment var ST_USER.") userName = flag.String("user", os.Getenv("ST_USER"), "User name. Defaults to environment var ST_USER.")
apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.") apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.")
checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.")
uploaders = flag.Int("uploaders", 4, "Number of uploaders to run in parallel.")
) )
type FsObject struct { type FsObject struct {
@ -40,6 +44,8 @@ type FsObject struct {
info os.FileInfo info os.FileInfo
} }
type FsObjectsChan chan *FsObject
type FsObjects []FsObject type FsObjects []FsObject
// Write debuging output for this FsObject // Write debuging output for this FsObject
@ -128,20 +134,22 @@ func (fs *FsObject) changed(c *swift.Connection, container string) bool {
return false return false
} }
// Puts the FsObject into the container // Is this object storable
func (fs *FsObject) put(c *swift.Connection, container string) { func (fs *FsObject) storable(c *swift.Connection, container string) bool {
mode := fs.info.Mode() mode := fs.info.Mode()
if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 { if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 {
fs.Debugf("Can't transfer non file/directory") fs.Debugf("Can't transfer non file/directory")
return false
} else if mode&os.ModeDir != 0 { } else if mode&os.ModeDir != 0 {
// Debug? // Debug?
fs.Debugf("FIXME Skipping directory") fs.Debugf("FIXME Skipping directory")
} else { return false
// Check to see if changed or not
if !fs.changed(c, container) {
fs.Debugf("Unchanged skipping")
return
} }
return true
}
// Puts the FsObject into the container
func (fs *FsObject) put(c *swift.Connection, container string) {
// FIXME content type // FIXME content type
in, err := os.Open(fs.path) in, err := os.Open(fs.path)
if err != nil { if err != nil {
@ -157,16 +165,15 @@ func (fs *FsObject) put(c *swift.Connection, container string) {
return return
} }
fs.Debugf("Uploaded") fs.Debugf("Uploaded")
}
} }
// Walk the path // Walk the path returning a channel of FsObjects
// //
// FIXME ignore symlinks? // FIXME ignore symlinks?
// FIXME what about hardlinks / etc // FIXME what about hardlinks / etc
func walk(root string) FsObjects { func walk(root string) FsObjectsChan {
files := make(FsObjects, 0, 1024) out := make(FsObjectsChan, *checkers)
go func() {
err := filepath.Walk(root, func(path string, f os.FileInfo, err error) error { err := filepath.Walk(root, func(path string, f os.FileInfo, err error) error {
if err != nil { if err != nil {
log.Printf("Failed to open directory: %s: %s", path, err) log.Printf("Failed to open directory: %s: %s", path, err)
@ -184,14 +191,16 @@ func walk(root string) FsObjects {
if rel == "." { if rel == "." {
rel = "" rel = ""
} }
files = append(files, FsObject{rel: rel, path: path, info: info}) out <- &FsObject{rel: rel, path: path, info: info}
} }
return nil return nil
}) })
if err != nil { if err != nil {
log.Printf("Failed to open directory: %s: %s", root, err) log.Printf("Failed to open directory: %s: %s", root, err)
} }
return files close(out)
}()
return out
} }
// syntaxError prints the syntax // syntaxError prints the syntax
@ -221,14 +230,57 @@ func checkArgs(args []string, n int, message string) {
} }
} }
// uploads a file into a container // Read FsObjects on in and write them to out if they need uploading
func upload(c *swift.Connection, root, container string) { //
files := walk(root) // FIXME potentially doing lots of MD5SUMS at once
for _, fs := range files { func checker(c *swift.Connection, container string, in, out FsObjectsChan, wg *sync.WaitGroup) {
defer wg.Done()
for fs := range in {
// Check to see if can store this
if !fs.storable(c, container) {
continue
}
// Check to see if changed or not
if !fs.changed(c, container) {
fs.Debugf("Unchanged skipping")
continue
}
out <- fs
}
}
// Read FsObjects on in and upload them
func uploader(c *swift.Connection, container string, in FsObjectsChan, wg *sync.WaitGroup) {
defer wg.Done()
for fs := range in {
fs.put(c, container) fs.put(c, container)
} }
} }
// Syncs a directory into a container
func upload(c *swift.Connection, root, container string) {
to_be_checked := walk(root)
to_be_uploaded := make(FsObjectsChan, *uploaders)
var checkerWg sync.WaitGroup
checkerWg.Add(*checkers)
for i := 0; i < *checkers; i++ {
go checker(c, container, to_be_checked, to_be_uploaded, &checkerWg)
}
var uploaderWg sync.WaitGroup
uploaderWg.Add(*uploaders)
for i := 0; i < *uploaders; i++ {
go uploader(c, container, to_be_uploaded, &uploaderWg)
}
log.Printf("Waiting for checks to finish")
checkerWg.Wait()
close(to_be_uploaded)
log.Printf("Waiting for uploads to finish")
uploaderWg.Wait()
}
// Lists the containers // Lists the containers
func listContainers(c *swift.Connection) { func listContainers(c *swift.Connection) {
containers, err := c.ContainersAll(nil) containers, err := c.ContainersAll(nil)
@ -276,7 +328,7 @@ func main() {
flag.Usage = syntaxError flag.Usage = syntaxError
flag.Parse() flag.Parse()
args := flag.Args() args := flag.Args()
//runtime.GOMAXPROCS(3) runtime.GOMAXPROCS(runtime.NumCPU())
// Setup profiling if desired // Setup profiling if desired
if *cpuprofile != "" { if *cpuprofile != "" {