fuse: re-use rcat to support uploads for all remotes (fixes #1672)

This commit is contained in:
Stefan Breunig 2017-09-16 22:49:08 +02:00
parent 168b0a0ecb
commit 12405f9f41
5 changed files with 22 additions and 63 deletions

View file

@ -143,13 +143,6 @@ Assuming only one rclone instance is running, you can reset the cache
like this: like this:
kill -SIGHUP $(pidof rclone) kill -SIGHUP $(pidof rclone)
### Bugs ###
* All the remotes should work for read, but some may not for write
* those which need to know the size in advance won't - eg B2
* maybe should pass in size as -1 to mean work it out
* Or put in an an upload cache to cache the files on disk first
`, `,
Run: func(command *cobra.Command, args []string) { Run: func(command *cobra.Command, args []string) {
cmd.CheckArgs(2, 2, command, args) cmd.CheckArgs(2, 2, command, args)

View file

@ -3,9 +3,9 @@ package mountlib
import ( import (
"io" "io"
"sync" "sync"
"time"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/pkg/errors"
) )
// WriteFileHandle is an open for write handle on a File // WriteFileHandle is an open for write handle on a File
@ -19,41 +19,28 @@ type WriteFileHandle struct {
file *File file *File
writeCalled bool // set the first time Write() is called writeCalled bool // set the first time Write() is called
offset int64 offset int64
hash *fs.MultiHasher
} }
func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, error) { func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, error) {
var hash *fs.MultiHasher
if !f.d.fsys.noChecksum {
var err error
hash, err = fs.NewMultiHasherTypes(src.Fs().Hashes())
if err != nil {
fs.Errorf(src.Fs(), "newWriteFileHandle hash error: %v", err)
}
}
fh := &WriteFileHandle{ fh := &WriteFileHandle{
remote: src.Remote(), remote: src.Remote(),
result: make(chan error, 1), result: make(chan error, 1),
file: f, file: f,
hash: hash,
} }
var pipeReader *io.PipeReader var pipeReader *io.PipeReader
pipeReader, fh.pipeWriter = io.Pipe() pipeReader, fh.pipeWriter = io.Pipe()
go func() { go func() {
r := fs.NewAccountSizeName(pipeReader, 0, src.Remote()).WithBuffer() // account the transfer o, err := fs.Rcat(d.f, src.Remote(), pipeReader, time.Now())
o, err := d.f.Put(r, src)
if err != nil { if err != nil {
fs.Errorf(fh.remote, "WriteFileHandle.New Put failed: %v", err) fs.Errorf(fh.remote, "WriteFileHandle.New Rcat failed: %v", err)
} }
// Close the Account and thus the pipeReader so the pipeWriter fails with ErrClosedPipe // Close the pipeReader so the pipeWriter fails with ErrClosedPipe
_ = r.Close() _ = pipeReader.Close()
fh.o = o fh.o = o
fh.result <- err fh.result <- err
}() }()
fh.file.addWriters(1) fh.file.addWriters(1)
fh.file.setSize(0) fh.file.setSize(0)
fs.Stats.Transferring(fh.remote)
return fh, nil return fh, nil
} }
@ -87,7 +74,6 @@ func (fh *WriteFileHandle) Write(data []byte, offset int64) (written int64, err
return 0, EBADF return 0, EBADF
} }
fh.writeCalled = true fh.writeCalled = true
// FIXME should probably check the file isn't being seeked?
n, err := fh.pipeWriter.Write(data) n, err := fh.pipeWriter.Write(data)
written = int64(n) written = int64(n)
fh.offset += written fh.offset += written
@ -97,13 +83,6 @@ func (fh *WriteFileHandle) Write(data []byte, offset int64) (written int64, err
return 0, err return 0, err
} }
// fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n) // fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n)
if fh.hash != nil {
_, err = fh.hash.Write(data[:n])
if err != nil {
fs.Errorf(fh.remote, "WriteFileHandle.Write HashError: %v", err)
return written, err
}
}
return written, nil return written, nil
} }
@ -121,7 +100,6 @@ func (fh *WriteFileHandle) close() error {
return EBADF return EBADF
} }
fh.closed = true fh.closed = true
fs.Stats.DoneTransferring(fh.remote, true)
fh.file.addWriters(-1) fh.file.addWriters(-1)
writeCloseErr := fh.pipeWriter.Close() writeCloseErr := fh.pipeWriter.Close()
err := <-fh.result err := <-fh.result
@ -129,17 +107,6 @@ func (fh *WriteFileHandle) close() error {
fh.file.setObject(fh.o) fh.file.setObject(fh.o)
err = writeCloseErr err = writeCloseErr
} }
if err == nil && fh.hash != nil {
for hashType, srcSum := range fh.hash.Sums() {
dstSum, err := fh.o.Hash(hashType)
if err != nil {
return err
}
if !fs.HashEquals(srcSum, dstSum) {
return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum)
}
}
}
return err return err
} }

View file

@ -50,7 +50,8 @@ a lot of data, you're better off caching locally and then
fdst, dstFileName := cmd.NewFsDstFile(args) fdst, dstFileName := cmd.NewFsDstFile(args)
cmd.Run(false, false, command, func() error { cmd.Run(false, false, command, func() error {
return fs.Rcat(fdst, dstFileName, os.Stdin, time.Now()) _, err := fs.Rcat(fdst, dstFileName, os.Stdin, time.Now())
return err
}) })
}, },
} }

View file

@ -1561,8 +1561,7 @@ func Cat(f Fs, w io.Writer, offset, count int64) error {
} }
// Rcat reads data from the Reader until EOF and uploads it to a file on remote // Rcat reads data from the Reader until EOF and uploads it to a file on remote
func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (err error) { func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (dst Object, err error) {
Stats.Transferring(dstFileName) Stats.Transferring(dstFileName)
defer func() { defer func() {
Stats.DoneTransferring(dstFileName, err == nil) Stats.DoneTransferring(dstFileName, err == nil)
@ -1574,7 +1573,7 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er
hashOption := &HashesOption{Hashes: fdst.Hashes()} hashOption := &HashesOption{Hashes: fdst.Hashes()}
hash, err := NewMultiHasherTypes(fdst.Hashes()) hash, err := NewMultiHasherTypes(fdst.Hashes())
if err != nil { if err != nil {
return err return nil, err
} }
readCounter := NewCountingReader(in0) readCounter := NewCountingReader(in0)
trackingIn := io.TeeReader(readCounter, hash) trackingIn := io.TeeReader(readCounter, hash)
@ -1599,13 +1598,13 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er
objInfo := NewStaticObjectInfo(dstFileName, modTime, int64(n), false, nil, nil) objInfo := NewStaticObjectInfo(dstFileName, modTime, int64(n), false, nil, nil)
if Config.DryRun { if Config.DryRun {
Logf("stdin", "Not uploading as --dry-run") Logf("stdin", "Not uploading as --dry-run")
return nil return nil, nil
} }
dst, err := fdst.Put(in, objInfo, hashOption) dst, err := fdst.Put(in, objInfo, hashOption)
if err != nil { if err != nil {
return err return dst, err
} }
return compare(dst) return dst, compare(dst)
} }
in := ioutil.NopCloser(io.MultiReader(bytes.NewReader(buf), trackingIn)) in := ioutil.NopCloser(io.MultiReader(bytes.NewReader(buf), trackingIn))
@ -1615,7 +1614,7 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er
Debugf(fdst, "Target remote doesn't support streaming uploads, creating temporary local FS to spool file") Debugf(fdst, "Target remote doesn't support streaming uploads, creating temporary local FS to spool file")
tmpLocalFs, err := temporaryLocalFs() tmpLocalFs, err := temporaryLocalFs()
if err != nil { if err != nil {
return errors.Wrap(err, "Failed to create temporary local FS to spool file") return nil, errors.Wrap(err, "Failed to create temporary local FS to spool file")
} }
defer func() { defer func() {
err := Purge(tmpLocalFs) err := Purge(tmpLocalFs)
@ -1632,21 +1631,20 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er
Logf("stdin", "Not uploading as --dry-run") Logf("stdin", "Not uploading as --dry-run")
// prevents "broken pipe" errors // prevents "broken pipe" errors
_, err = io.Copy(ioutil.Discard, in) _, err = io.Copy(ioutil.Discard, in)
return err return nil, err
} }
objInfo := NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil) objInfo := NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil)
tmpObj, err := fStreamTo.Features().PutStream(in, objInfo, hashOption) if dst, err = fStreamTo.Features().PutStream(in, objInfo, hashOption); err != nil {
if err != nil { return dst, err
return err
} }
if err = compare(tmpObj); err != nil { if err = compare(dst); err != nil {
return err return dst, err
} }
if !canStream { if !canStream {
return Copy(fdst, nil, dstFileName, tmpObj) return dst, Copy(fdst, nil, dstFileName, dst)
} }
return nil return dst, nil
} }
// Rmdirs removes any empty directories (or directories only // Rmdirs removes any empty directories (or directories only

View file

@ -754,11 +754,11 @@ func TestRcat(t *testing.T) {
path2 := prefix + "big_file_from_pipe" path2 := prefix + "big_file_from_pipe"
in := ioutil.NopCloser(strings.NewReader(data1)) in := ioutil.NopCloser(strings.NewReader(data1))
err := fs.Rcat(r.fremote, path1, in, t1) _, err := fs.Rcat(r.fremote, path1, in, t1)
require.NoError(t, err) require.NoError(t, err)
in = ioutil.NopCloser(strings.NewReader(data2)) in = ioutil.NopCloser(strings.NewReader(data2))
err = fs.Rcat(r.fremote, path2, in, t2) _, err = fs.Rcat(r.fremote, path2, in, t2)
require.NoError(t, err) require.NoError(t, err)
file1 := fstest.NewItem(path1, data1, t1) file1 := fstest.NewItem(path1, data1, t1)