Factor Put() methods into generic Copy() function
This commit is contained in:
parent
38ce4c3629
commit
555ab5001e
6 changed files with 61 additions and 98 deletions
31
fs.go
31
fs.go
|
@ -14,7 +14,7 @@ type Fs interface {
|
|||
String() string
|
||||
List() FsObjectsChan
|
||||
NewFsObject(remote string) FsObject
|
||||
Put(src FsObject)
|
||||
Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error)
|
||||
Mkdir() error
|
||||
Rmdir() error
|
||||
}
|
||||
|
@ -147,3 +147,32 @@ func Equal(src, dst FsObject) bool {
|
|||
FsDebug(src, "Size and MD5SUM of src and dst objects identical")
|
||||
return true
|
||||
}
|
||||
|
||||
// Copy src object to f
|
||||
func Copy(f Fs, src FsObject) {
|
||||
in0, err := src.Open()
|
||||
if err != nil {
|
||||
stats.Error()
|
||||
FsLog(src, "Failed to open: %s", err)
|
||||
return
|
||||
}
|
||||
in := NewAccount(in0) // account the transfer
|
||||
|
||||
dst, err := f.Put(in, src.Remote(), src.ModTime(), src.Size())
|
||||
inErr := in.Close()
|
||||
if err == nil {
|
||||
err = inErr
|
||||
}
|
||||
if err != nil {
|
||||
stats.Error()
|
||||
FsLog(dst, "Failed to copy: %s", err)
|
||||
FsDebug(dst, "Removing failed copy")
|
||||
removeErr := dst.Remove()
|
||||
if removeErr != nil {
|
||||
stats.Error()
|
||||
FsLog(dst, "Failed to remove failed copy: %s", removeErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
FsDebug(src, "Copied")
|
||||
}
|
||||
|
|
54
fs_local.go
54
fs_local.go
|
@ -101,65 +101,35 @@ func (f *FsLocal) List() FsObjectsChan {
|
|||
return out
|
||||
}
|
||||
|
||||
// FIXME most of this is generic
|
||||
// could make it into Copy(dst, src FsObject)
|
||||
|
||||
// Puts the FsObject to the local filesystem
|
||||
//
|
||||
// FIXME return the object?
|
||||
func (f *FsLocal) Put(src FsObject) {
|
||||
dstRemote := src.Remote()
|
||||
dstPath := filepath.Join(f.root, dstRemote)
|
||||
func (f *FsLocal) Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error) {
|
||||
dstPath := filepath.Join(f.root, remote)
|
||||
// Temporary FsObject under construction
|
||||
fs := &FsObjectLocal{remote: dstRemote, path: dstPath}
|
||||
FsDebug(fs, "Download %s to %s", dstRemote, dstPath)
|
||||
fs := &FsObjectLocal{remote: remote, path: dstPath}
|
||||
|
||||
dir := path.Dir(dstPath)
|
||||
err := os.MkdirAll(dir, 0770)
|
||||
if err != nil {
|
||||
stats.Error()
|
||||
FsLog(fs, "Couldn't make directory: %s", err)
|
||||
return
|
||||
return fs, err
|
||||
}
|
||||
|
||||
out, err := os.Create(dstPath)
|
||||
if err != nil {
|
||||
stats.Error()
|
||||
FsLog(fs, "Failed to open: %s", err)
|
||||
return
|
||||
return fs, err
|
||||
}
|
||||
|
||||
// Close and remove file on error at the end
|
||||
defer func() {
|
||||
checkClose(out, &err)
|
||||
if err != nil {
|
||||
FsDebug(fs, "Removing failed download")
|
||||
removeErr := os.Remove(dstPath)
|
||||
if removeErr != nil {
|
||||
stats.Error()
|
||||
FsLog(fs, "Failed to remove failed download: %s", removeErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
in0, err := src.Open()
|
||||
if err != nil {
|
||||
stats.Error()
|
||||
FsLog(fs, "Failed to open: %s", err)
|
||||
return
|
||||
}
|
||||
in := NewAccount(in0) // account the transfer
|
||||
defer checkClose(in, &err)
|
||||
|
||||
_, err = io.Copy(out, in)
|
||||
outErr := out.Close()
|
||||
if err != nil {
|
||||
stats.Error()
|
||||
FsLog(fs, "Failed to download: %s", err)
|
||||
return
|
||||
return fs, err
|
||||
}
|
||||
if outErr != nil {
|
||||
return fs, outErr
|
||||
}
|
||||
|
||||
// Set the mtime
|
||||
fs.SetModTime(src.ModTime())
|
||||
fs.SetModTime(modTime)
|
||||
return fs, err
|
||||
}
|
||||
|
||||
// Mkdir creates the directory if it doesn't exist
|
||||
|
|
32
fs_s3.go
32
fs_s3.go
|
@ -207,43 +207,23 @@ func (f *FsS3) List() FsObjectsChan {
|
|||
}
|
||||
|
||||
// Put the FsObject into the bucket
|
||||
func (f *FsS3) Put(src FsObject) {
|
||||
func (f *FsS3) Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error) {
|
||||
// Temporary FsObject under construction
|
||||
fs := &FsObjectS3{s3: f, remote: src.Remote()}
|
||||
|
||||
in0, err := src.Open()
|
||||
if err != nil {
|
||||
stats.Error()
|
||||
FsLog(fs, "Failed to open: %s", err)
|
||||
return
|
||||
}
|
||||
in := NewAccount(in0) // account the transfer
|
||||
defer in.Close()
|
||||
fs := &FsObjectS3{s3: f, remote: remote}
|
||||
|
||||
// Set the mtime in the headers
|
||||
headers := s3.Headers{
|
||||
metaMtime: swift.TimeToFloatString(src.ModTime()),
|
||||
metaMtime: swift.TimeToFloatString(modTime),
|
||||
}
|
||||
|
||||
// Guess the content type
|
||||
contentType := mime.TypeByExtension(path.Ext(fs.remote))
|
||||
contentType := mime.TypeByExtension(path.Ext(remote))
|
||||
if contentType == "" {
|
||||
contentType = "application/octet-stream"
|
||||
}
|
||||
|
||||
_, err = fs.s3.b.PutReaderHeaders(fs.remote, in, src.Size(), contentType, fs.s3.perm, headers)
|
||||
if err != nil {
|
||||
stats.Error()
|
||||
FsLog(fs, "Failed to upload: %s", err)
|
||||
FsDebug(fs, "Removing failed upload")
|
||||
removeErr := fs.Remove()
|
||||
if removeErr != nil {
|
||||
stats.Error()
|
||||
FsLog(fs, "Failed to remove failed download: %s", removeErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
FsDebug(fs, "Uploaded")
|
||||
_, err := fs.s3.b.PutReaderHeaders(remote, in, size, contentType, f.perm, headers)
|
||||
return fs, err
|
||||
}
|
||||
|
||||
// Mkdir creates the bucket if it doesn't exist
|
||||
|
|
35
fs_swift.go
35
fs_swift.go
|
@ -173,36 +173,19 @@ func (f *FsSwift) List() FsObjectsChan {
|
|||
}
|
||||
|
||||
// Put the FsObject into the container
|
||||
func (f *FsSwift) Put(src FsObject) {
|
||||
//
|
||||
// Copy the reader in to the new object which is returned
|
||||
//
|
||||
// The new object may have been created
|
||||
func (f *FsSwift) Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error) {
|
||||
// Temporary FsObject under construction
|
||||
fs := &FsObjectSwift{swift: f, remote: src.Remote()}
|
||||
// FIXME content type
|
||||
in0, err := src.Open()
|
||||
if err != nil {
|
||||
stats.Error()
|
||||
FsLog(fs, "Failed to open: %s", err)
|
||||
return
|
||||
}
|
||||
in := NewAccount(in0) // account the transfer
|
||||
defer in.Close()
|
||||
fs := &FsObjectSwift{swift: f, remote: remote}
|
||||
|
||||
// Set the mtime
|
||||
m := swift.Metadata{}
|
||||
m.SetModTime(src.ModTime())
|
||||
|
||||
_, err = fs.swift.c.ObjectPut(fs.swift.container, fs.remote, in, true, "", "", m.ObjectHeaders())
|
||||
if err != nil {
|
||||
stats.Error()
|
||||
FsLog(fs, "Failed to upload: %s", err)
|
||||
FsDebug(fs, "Removing failed upload")
|
||||
removeErr := fs.Remove()
|
||||
if removeErr != nil {
|
||||
stats.Error()
|
||||
FsLog(fs, "Failed to remove failed download: %s", removeErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
FsDebug(fs, "Uploaded")
|
||||
m.SetModTime(modTime)
|
||||
_, err := f.c.ObjectPut(f.container, remote, in, true, "", "", m.ObjectHeaders())
|
||||
return fs, err
|
||||
}
|
||||
|
||||
// Mkdir creates the container if it doesn't exist
|
||||
|
|
|
@ -2,6 +2,7 @@ Todo
|
|||
* Factor fses into own packages
|
||||
* FIXME: ls without an argument for buckets/containers?
|
||||
* FIXME: More -dry-run checks for object transfer
|
||||
* FIXME: don't delete in sync if errors
|
||||
* Might be quicker to check md5sums first? for swift <-> swift certainly, and maybe for small files
|
||||
* Ignoring the pseudo directories
|
||||
* if object.PseudoDirectory {
|
||||
|
|
|
@ -62,13 +62,13 @@ func Copier(in FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
|
|||
defer wg.Done()
|
||||
for src := range in {
|
||||
stats.Transferring(src)
|
||||
fdst.Put(src)
|
||||
Copy(fdst, src)
|
||||
stats.DoneTransferring(src)
|
||||
}
|
||||
}
|
||||
|
||||
// Copies fsrc into fdst
|
||||
func Copy(fdst, fsrc Fs) {
|
||||
func CopyFs(fdst, fsrc Fs) {
|
||||
err := fdst.Mkdir()
|
||||
if err != nil {
|
||||
stats.Error()
|
||||
|
@ -351,7 +351,7 @@ var Commands = []Command{
|
|||
MD5SUM. Doesn't delete files from the destination.
|
||||
|
||||
`,
|
||||
Copy,
|
||||
CopyFs,
|
||||
2, 2,
|
||||
},
|
||||
{
|
||||
|
|
Loading…
Reference in a new issue