Detect Fs from path to make operations consistent everywhere

This commit is contained in:
Nick Craig-Wood 2012-12-29 11:35:41 +00:00
parent 4028a4192f
commit c15ae179ee
5 changed files with 196 additions and 171 deletions

10
fs.go
View file

@ -37,6 +37,16 @@ type FsObjectsChan chan FsObject
type FsObjects []FsObject type FsObjects []FsObject
// NewFs makes a new Fs object from the path
//
// FIXME make more generic in future
func NewFs(path string) (Fs, error) {
if swiftMatch.MatchString(path) {
return NewFsSwift(path)
}
return NewFsLocal(path)
}
// checkClose is a utility function used to check the return from // checkClose is a utility function used to check the return from
// Close in a defer statement. // Close in a defer statement.
func checkClose(c io.Closer, err *error) { func checkClose(c io.Closer, err *error) {

View file

@ -26,6 +26,13 @@ type FsObjectLocal struct {
// ------------------------------------------------------------ // ------------------------------------------------------------
// NewFsLocal contstructs an FsLocal from the path
func NewFsLocal(root string) (*FsLocal, error) {
root = path.Clean(root)
f := &FsLocal{root: root}
return f, nil
}
// Return an FsObject from a path // Return an FsObject from a path
// //
// May return nil if an error occurred // May return nil if an error occurred

View file

@ -2,10 +2,14 @@
package main package main
import ( import (
"errors"
"flag"
"fmt" "fmt"
"github.com/ncw/swift" "github.com/ncw/swift"
"io" "io"
"log" "log"
"os"
"regexp"
"strings" "strings"
"time" "time"
) )
@ -28,6 +32,85 @@ type FsObjectSwift struct {
// ------------------------------------------------------------ // ------------------------------------------------------------
// Globals
var (
// Flags
// FIXME make these part of swift so we get a standard set of flags?
authUrl = flag.String("auth", os.Getenv("ST_AUTH"), "Auth URL for server. Defaults to environment var ST_AUTH.")
userName = flag.String("user", os.Getenv("ST_USER"), "User name. Defaults to environment var ST_USER.")
apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.")
)
// Pattern to match a swift url
var swiftMatch = regexp.MustCompile(`^([^/:]+):(.*)$`)
// parseParse parses a swift 'url'
func parsePath(path string) (container, directory string, err error) {
parts := swiftMatch.FindAllStringSubmatch(path, -1)
if len(parts) != 1 || len(parts[0]) != 3 {
err = fmt.Errorf("Couldn't parse swift url %q", path)
} else {
container, directory = parts[0][1], parts[0][2]
directory = strings.Trim(directory, "/")
}
return
}
// swiftConnection makes a connection to swift
func swiftConnection() (*swift.Connection, error) {
if *userName == "" {
return nil, errors.New("Need -user or environmental variable ST_USER")
}
if *apiKey == "" {
return nil, errors.New("Need -key or environmental variable ST_KEY")
}
if *authUrl == "" {
return nil, errors.New("Need -auth or environmental variable ST_AUTH")
}
c := &swift.Connection{
UserName: *userName,
ApiKey: *apiKey,
AuthUrl: *authUrl,
}
err := c.Authenticate()
if err != nil {
return nil, err
}
return c, nil
}
// NewFsSwift contstructs an FsSwift from the path, container:path
func NewFsSwift(path string) (*FsSwift, error) {
container, directory, err := parsePath(path)
if err != nil {
return nil, err
}
if directory != "" {
return nil, fmt.Errorf("Directories not supported yet in %q", path)
}
c, err := swiftConnection()
if err != nil {
return nil, err
}
f := &FsSwift{c: *c, container: container}
return f, nil
}
// Lists the containers
func SwiftContainers() {
c, err := swiftConnection()
if err != nil {
log.Fatalf("Couldn't connect: %s", err)
}
containers, err := c.ContainersAll(nil)
if err != nil {
log.Fatalf("Couldn't list containers: %s", err)
}
for _, container := range containers {
fmt.Printf("%9d %12d %s\n", container.Count, container.Bytes, container.Name)
}
}
// Return an FsObject from a path // Return an FsObject from a path
// //
// May return nil if an error occurred // May return nil if an error occurred

View file

@ -1,40 +1,24 @@
Todo
* Add sync command (like rsync with delete)
* Check logging in various parts
* Make logging controllable with flags
* progress meter would be nice! Do this by wrapping the Reader with a progress bar
* Do bandwidth limit by wrapping the Reader too
* Maybe using https://jra-go.googlecode.com/hg/linkio/ which will work for multiple
uploads or downloads.
* code.google.com/p/mxk/go1/flowcontrol - only does one flow at once
* Or maybe put into swift library.
* Make swift timeouts be settable with command line parameters
* Check the locking in swift module!
* Windows paths? Do we need to translate / and \?
* Make a fs.Errorf and count errors and log them at a different level
Ideas Ideas
* remote copy container to another container * optimise remote copy container to another container using remote
* check local is same as remote copy if local is same as remote
* syncup/syncdown (like rsync with delete)
* use container: /path/syntax like rsync?
* Allow subpaths container:/sub/path * Allow subpaths container:/sub/path
* allow local/local too
* progress reports
* stats * stats
* Add bandwidth limit? * look at auth from env in s3 module - add to swift?
make 100% compatible with swift.py?
Make Env vars compatible with st?
Get and put the metadata in the libray (x-object-meta-mtime) when getting and putting a file?
This also puts meta-mtime
https://github.com/gholt/swiftly
As an integer, but it does parse it as a float
subargs.append('x-object-meta-mtime:%d' % getmtime(options.input_))
Need an iterate all objects routine... Could use a channel
- could just be a flag to Objects()
FIXME progress meter would be nice! Do this by wrapping the Reader with a progress bar
Do bandwidth limit by wrapping the Reader too
Maybe using https://jra-go.googlecode.com/hg/linkio/ which will work for multiple
uploads or downloads.
* code.google.com/p/mxk/go1/flowcontrol - only does one flow at once
Or maybe put into swift library.
Windows paths? Do we need to translate / and \?
Make swift timeouts be settable with command line parameters
Make a wrapper in connection which Make a wrapper in connection which
* measures bandwidth and reports it * measures bandwidth and reports it
@ -43,10 +27,17 @@ Make a wrapper in connection which
* does timeouts by setting a limit, seeing whether io has happened * does timeouts by setting a limit, seeing whether io has happened
and resetting it if it has and resetting it if it has
Check the locking in swift module!
Need to make directory objects otherwise can't upload an empty directory Need to make directory objects otherwise can't upload an empty directory
* Or could upload empty directories only? * Or could upload empty directories only?
* Can't purge a local filesystem because it leaves the directories behind
Make a fs.Errorf and count errors and log them at a different level s3
--
Can maybe set last modified?
https://forums.aws.amazon.com/message.jspa?messageID=214062
Otherwise can set metadata
Returns etag and last modified in bucket list

View file

@ -6,7 +6,6 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"github.com/ncw/swift"
"log" "log"
"os" "os"
"runtime" "runtime"
@ -22,12 +21,8 @@ var (
snet = flag.Bool("snet", false, "Use internal service network") // FIXME not implemented snet = flag.Bool("snet", false, "Use internal service network") // FIXME not implemented
verbose = flag.Bool("verbose", false, "Print lots more stuff") verbose = flag.Bool("verbose", false, "Print lots more stuff")
quiet = flag.Bool("quiet", false, "Print as little stuff as possible") quiet = flag.Bool("quiet", false, "Print as little stuff as possible")
// FIXME make these part of swift so we get a standard set of flags? checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.")
authUrl = flag.String("auth", os.Getenv("ST_AUTH"), "Auth URL for server. Defaults to environment var ST_AUTH.") transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.")
userName = flag.String("user", os.Getenv("ST_USER"), "User name. Defaults to environment var ST_USER.")
apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.")
checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.")
transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.")
) )
// Read FsObjects~s on in send to out if they need uploading // Read FsObjects~s on in send to out if they need uploading
@ -65,7 +60,7 @@ func Copier(in FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
} }
// Copies fsrc into fdst // Copies fsrc into fdst
func Copy(fsrc, fdst Fs) { func Copy(fdst, fsrc Fs) {
err := fdst.Mkdir() err := fdst.Mkdir()
if err != nil { if err != nil {
log.Fatal("Failed to make destination") log.Fatal("Failed to make destination")
@ -93,40 +88,9 @@ func Copy(fsrc, fdst Fs) {
copierWg.Wait() copierWg.Wait()
} }
// Syncs a directory into a container // Copy~s from source to dest
func upload(c *swift.Connection, args []string) { func copy_(fdst, fsrc Fs) {
root, container := args[0], args[1] Copy(fdst, fsrc)
// FIXME
fsrc := &FsLocal{root: root}
fdst := &FsSwift{c: *c, container: container}
Copy(fsrc, fdst)
}
// Syncs a container into a directory
//
// FIXME need optional stat in FsObject and to be able to make FsObjects from ObjectsAll
//
// FIXME should download and stat many at once
func download(c *swift.Connection, args []string) {
container, root := args[0], args[1]
// FIXME
fsrc := &FsSwift{c: *c, container: container}
fdst := &FsLocal{root: root}
Copy(fsrc, fdst)
}
// Lists the containers
func listContainers(c *swift.Connection) {
containers, err := c.ContainersAll(nil)
if err != nil {
log.Fatalf("Couldn't list containers: %s", err)
}
for _, container := range containers {
fmt.Printf("%9d %12d %s\n", container.Count, container.Bytes, container.Name)
}
} }
// List the Fs to stdout // List the Fs to stdout
@ -147,57 +111,34 @@ func List(f Fs) {
} }
// Lists files in a container // Lists files in a container
func list(c *swift.Connection, args []string) { func list(fdst, fsrc Fs) {
if len(args) == 0 { if fdst == nil {
listContainers(c) SwiftContainers()
return return
} }
container := args[0] List(fdst)
// FIXME
f := &FsSwift{c: *c, container: container}
List(f)
} }
// Local lists files // Makes a destination directory or container
func llist(c *swift.Connection, args []string) { func mkdir(fdst, fsrc Fs) {
root := args[0]
// FIXME
f := &FsLocal{root: root}
List(f)
}
// Makes a container
func mkdir(c *swift.Connection, args []string) {
container := args[0]
// FIXME
fdst := &FsSwift{c: *c, container: container}
err := fdst.Mkdir() err := fdst.Mkdir()
if err != nil { if err != nil {
log.Fatalf("Couldn't create container %q: %s", container, err) log.Fatalf("Mkdir failed: %s", err)
} }
} }
// Removes a container // Removes a container but not if not empty
func rmdir(c *swift.Connection, args []string) { func rmdir(fdst, fsrc Fs) {
container := args[0]
// FIXME
fdst := &FsSwift{c: *c, container: container}
err := fdst.Rmdir() err := fdst.Rmdir()
if err != nil { if err != nil {
log.Fatalf("Couldn't delete container %q: %s", container, err) log.Fatalf("Rmdir failed: %s", err)
} }
} }
// Removes a container and all of its contents // Removes a container and all of its contents
// //
// FIXME should make FsObjects and use debugging // FIXME doesn't delete local directories
func purge(c *swift.Connection, args []string) { func purge(fdst, fsrc Fs) {
container := args[0]
// FIXME
fdst := &FsSwift{c: *c, container: container}
to_be_deleted := fdst.List() to_be_deleted := fdst.List()
var wg sync.WaitGroup var wg sync.WaitGroup
@ -219,14 +160,14 @@ func purge(c *swift.Connection, args []string) {
log.Printf("Waiting for deletions to finish") log.Printf("Waiting for deletions to finish")
wg.Wait() wg.Wait()
log.Printf("Deleting container") log.Printf("Deleting path")
rmdir(c, args) rmdir(fdst, fsrc)
} }
type Command struct { type Command struct {
name string name string
help string help string
run func(*swift.Connection, []string) run func(fdst, fsrc Fs)
minArgs, maxArgs int minArgs, maxArgs int
} }
@ -245,62 +186,55 @@ func (cmd *Command) checkArgs(args []string) {
var Commands = []Command{ var Commands = []Command{
{ {
"upload", "copy",
`<directory> <container> `<source> <destination>
Upload the local directory to the remote container. Doesn't
upload unchanged files, testing first by modification time Copy the source to the destination. Doesn't transfer
then by MD5SUM unchanged files, testing first by modification time then by
MD5SUM. Doesn't delete files from the destination.
`, `,
upload, copy_,
2, 2,
},
{
"download",
`<container> <directory>
Download the container to the local directory. Doesn't
download unchanged files
`,
download,
2, 2, 2, 2,
}, },
{ {
"ls", "ls",
`[<container>] `[<path>]
List the containers if no parameter supplied or the contents
of <container> List the path. If no parameter is supplied then it lists the
available swift containers.
`, `,
list, list,
0, 1, 0, 1,
}, },
{
"lls",
`[<directory>]
List the directory
`,
llist,
1, 1,
},
{ {
"mkdir", "mkdir",
`<container> `<path>
Make the container if it doesn't already exist
Make the path if it doesn't already exist
`, `,
mkdir, mkdir,
1, 1, 1, 1,
}, },
{ {
"rmdir", "rmdir",
`<container> `<path>
Remove the container. Note that you can't remove a container with
objects in - use rm for that Remove the path. Note that you can't remove a path with
objects in it, use purge for that
`, `,
rmdir, rmdir,
1, 1, 1, 1,
}, },
{ {
"purge", "purge",
`<container> `<path>
Remove the container and all of the contents.
Remove the path and all of its contents.
`, `,
purge, purge,
1, 1, 1, 1,
@ -314,6 +248,7 @@ func syntaxError() {
Syntax: [options] subcommand <parameters> <parameters...> Syntax: [options] subcommand <parameters> <parameters...>
Subcommands: Subcommands:
`) `)
for i := range Commands { for i := range Commands {
cmd := &Commands[i] cmd := &Commands[i]
@ -350,30 +285,10 @@ func main() {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
fmt.Println(args)
if len(args) < 1 { if len(args) < 1 {
fatal("No command supplied\n") fatal("No command supplied\n")
} }
if *userName == "" {
log.Fatal("Need -user or environmental variable ST_USER")
}
if *apiKey == "" {
log.Fatal("Need -key or environmental variable ST_KEY")
}
if *authUrl == "" {
log.Fatal("Need -auth or environmental variable ST_AUTH")
}
c := &swift.Connection{
UserName: *userName,
ApiKey: *apiKey,
AuthUrl: *authUrl,
}
err := c.Authenticate()
if err != nil {
log.Fatal("Failed to authenticate", err)
}
cmd := strings.ToLower(args[0]) cmd := strings.ToLower(args[0])
args = args[1:] args = args[1:]
@ -396,5 +311,24 @@ func main() {
log.Fatalf("Unknown command %q", cmd) log.Fatalf("Unknown command %q", cmd)
} }
found.checkArgs(args) found.checkArgs(args)
found.run(c, args)
// Make source and destination fs
var fdst, fsrc Fs
var err error
if len(args) >= 1 {
fdst, err = NewFs(args[0])
if err != nil {
log.Fatal("Failed to create file system: ", err)
}
}
if len(args) >= 2 {
fsrc, err = NewFs(args[1])
if err != nil {
log.Fatal("Failed to create destination file system: ", err)
}
fsrc, fdst = fdst, fsrc
}
// Run the actual command
found.run(fdst, fsrc)
} }