implement rcat – fixes #230, fixes #1001

This commit is contained in:
Stefan Breunig 2017-08-03 21:42:35 +02:00
parent 3e3a59768e
commit 28a18303f3
32 changed files with 1223 additions and 916 deletions

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -34,6 +34,7 @@ import (
_ "github.com/ncw/rclone/cmd/ncdu" _ "github.com/ncw/rclone/cmd/ncdu"
_ "github.com/ncw/rclone/cmd/obscure" _ "github.com/ncw/rclone/cmd/obscure"
_ "github.com/ncw/rclone/cmd/purge" _ "github.com/ncw/rclone/cmd/purge"
_ "github.com/ncw/rclone/cmd/rcat"
_ "github.com/ncw/rclone/cmd/rmdir" _ "github.com/ncw/rclone/cmd/rmdir"
_ "github.com/ncw/rclone/cmd/rmdirs" _ "github.com/ncw/rclone/cmd/rmdirs"
_ "github.com/ncw/rclone/cmd/sha1sum" _ "github.com/ncw/rclone/cmd/sha1sum"

View file

@ -203,6 +203,20 @@ func NewFsDst(args []string) fs.Fs {
return fdst return fdst
} }
// NewFsDstFile creates a new dst fs with a destination file name from the arguments
func NewFsDstFile(args []string) (fdst fs.Fs, dstFileName string) {
dstRemote, dstFileName := fs.RemoteSplit(args[0])
if dstRemote == "" {
dstRemote = "."
}
if dstFileName == "" {
log.Fatalf("%q is a directory", args[0])
}
fdst = newFsDst(dstRemote)
fs.CalculateModifyWindow(fdst)
return
}
// ShowStats returns true if the user added a `--stats` flag to the command line. // ShowStats returns true if the user added a `--stats` flag to the command line.
// //
// This is called by Run to override the default value of the // This is called by Run to override the default value of the

45
cmd/rcat/rcat.go Normal file
View file

@ -0,0 +1,45 @@
package rcat
import (
"log"
"os"
"time"
"github.com/ncw/rclone/cmd"
"github.com/ncw/rclone/fs"
"github.com/spf13/cobra"
)
func init() {
cmd.Root.AddCommand(commandDefintion)
}
var commandDefintion = &cobra.Command{
Use: "rcat remote:path",
Short: `Copies standard input to file on remote.`,
Long: `
rclone rcat reads from standard input (stdin) and copies it to a
single remote file.
echo "hello world" | rclone rcat remote:path/to/file
Note that since the size is not known in advance, chunking options
will likely be ignored. The upload can also not be retried because
the data is not kept around until the upload succeeds. If you need
to transfer a lot of data, you're better off caching locally and
then ` + "`rclone move`" + ` it to the destination.
`,
Run: func(command *cobra.Command, args []string) {
cmd.CheckArgs(1, 1, command, args)
stat, _ := os.Stdin.Stat()
if (stat.Mode() & os.ModeCharDevice) != 0 {
log.Fatalf("nothing to read from standard input (stdin).")
}
fdst, dstFileName := cmd.NewFsDstFile(args)
cmd.Run(false, false, command, func() error {
return fs.Rcat(fdst, dstFileName, os.Stdin, time.Now())
})
},
}

View file

@ -255,6 +255,11 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
return f.newObject(o), nil return f.newObject(o), nil
} }
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(in, src, options...)
}
// Hashes returns the supported hash sets. // Hashes returns the supported hash sets.
func (f *Fs) Hashes() fs.HashSet { func (f *Fs) Hashes() fs.HashSet {
return fs.HashSet(fs.HashNone) return fs.HashSet(fs.HashNone)
@ -601,6 +606,7 @@ var (
_ fs.Mover = (*Fs)(nil) _ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil)
_ fs.PutUncheckeder = (*Fs)(nil) _ fs.PutUncheckeder = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.CleanUpper = (*Fs)(nil) _ fs.CleanUpper = (*Fs)(nil)
_ fs.UnWrapper = (*Fs)(nil) _ fs.UnWrapper = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil) _ fs.ListRer = (*Fs)(nil)

View file

@ -69,5 +69,6 @@ func TestObjectStorable2(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile2(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile2(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound2(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound2(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove2(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove2(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile2(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge2(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge2(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise2(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise2(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -69,5 +69,6 @@ func TestObjectStorable3(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile3(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile3(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound3(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound3(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove3(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove3(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile3(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge3(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge3(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise3(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise3(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -69,5 +69,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -682,6 +682,11 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
} }
} }
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(in, src, options...)
}
// PutUnchecked uploads the object // PutUnchecked uploads the object
// //
// This will create a duplicate if we upload a new file without // This will create a duplicate if we upload a new file without
@ -1363,6 +1368,7 @@ func (o *Object) MimeType() string {
var ( var (
_ fs.Fs = (*Fs)(nil) _ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil) _ fs.Purger = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil) _ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil) _ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil)

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -443,6 +443,11 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
return o, o.Update(in, src, options...) return o, o.Update(in, src, options...)
} }
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(in, src, options...)
}
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
func (f *Fs) Mkdir(dir string) error { func (f *Fs) Mkdir(dir string) error {
root := path.Join(f.slashRoot, dir) root := path.Join(f.slashRoot, dir)
@ -823,15 +828,32 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
// uploadChunked uploads the object in parts // uploadChunked uploads the object in parts
// //
// Call only if size is >= uploadChunkSize // Will work optimally if size is >= uploadChunkSize. If the size is either
// unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an
// avoidable request to the Dropbox API that does not carry payload.
// //
// FIXME buffer chunks to improve upload retries // FIXME buffer chunks to improve upload retries
func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) { func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) {
chunkSize := int64(uploadChunkSize) chunkSize := int64(uploadChunkSize)
chunks := int(size/chunkSize) + 1 chunks := 0
if size != -1 {
chunks = int(size/chunkSize) + 1
}
wc := &writeCounter{}
in := io.TeeReader(in0, wc)
// write the first whole chunk fmtChunk := func(cur int, last bool) {
fs.Debugf(o, "Uploading chunk 1/%d", chunks) if chunks == 0 && last {
fs.Debugf(o, "Streaming chunk %d/%d", cur, cur)
} else if chunks == 0 {
fs.Debugf(o, "Streaming chunk %d/unknown", cur)
} else {
fs.Debugf(o, "Uploading chunk %d/%d", cur, chunks)
}
}
// write the first chunk
fmtChunk(1, false)
var res *files.UploadSessionStartResult var res *files.UploadSessionStartResult
err = o.fs.pacer.CallNoRetry(func() (bool, error) { err = o.fs.pacer.CallNoRetry(func() (bool, error) {
res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, &io.LimitedReader{R: in, N: chunkSize}) res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, &io.LimitedReader{R: in, N: chunkSize})
@ -843,7 +865,7 @@ func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size
cursor := files.UploadSessionCursor{ cursor := files.UploadSessionCursor{
SessionId: res.SessionId, SessionId: res.SessionId,
Offset: uint64(chunkSize), Offset: 0,
} }
appendArg := files.UploadSessionAppendArg{ appendArg := files.UploadSessionAppendArg{
Cursor: &cursor, Cursor: &cursor,
@ -851,8 +873,19 @@ func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size
} }
// write more whole chunks (if any) // write more whole chunks (if any)
for i := 2; i < chunks; i++ { currentChunk := 2
fs.Debugf(o, "Uploading chunk %d/%d", i, chunks) for {
if chunks > 0 && currentChunk >= chunks {
// if the size is known, only upload full chunks. Remaining bytes are uploaded with
// the UploadSessionFinish request.
break
} else if chunks == 0 && wc.Written-cursor.Offset < uint64(chunkSize) {
// if the size is unknown, upload as long as we can read full chunks from the reader.
// The UploadSessionFinish request will not contain any payload.
break
}
cursor.Offset = wc.Written
fmtChunk(currentChunk, false)
err = o.fs.pacer.CallNoRetry(func() (bool, error) { err = o.fs.pacer.CallNoRetry(func() (bool, error) {
err = o.fs.srv.UploadSessionAppendV2(&appendArg, &io.LimitedReader{R: in, N: chunkSize}) err = o.fs.srv.UploadSessionAppendV2(&appendArg, &io.LimitedReader{R: in, N: chunkSize})
return shouldRetry(err) return shouldRetry(err)
@ -860,15 +893,16 @@ func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size
if err != nil { if err != nil {
return nil, err return nil, err
} }
cursor.Offset += uint64(chunkSize) currentChunk++
} }
// write the remains // write the remains
cursor.Offset = wc.Written
args := &files.UploadSessionFinishArg{ args := &files.UploadSessionFinishArg{
Cursor: &cursor, Cursor: &cursor,
Commit: commitInfo, Commit: commitInfo,
} }
fs.Debugf(o, "Uploading chunk %d/%d", chunks, chunks) fmtChunk(currentChunk, true)
err = o.fs.pacer.CallNoRetry(func() (bool, error) { err = o.fs.pacer.CallNoRetry(func() (bool, error) {
entry, err = o.fs.srv.UploadSessionFinish(args, in) entry, err = o.fs.srv.UploadSessionFinish(args, in)
return shouldRetry(err) return shouldRetry(err)
@ -898,7 +932,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
size := src.Size() size := src.Size()
var err error var err error
var entry *files.FileMetadata var entry *files.FileMetadata
if size > int64(uploadChunkSize) { if size > int64(uploadChunkSize) || size == -1 {
entry, err = o.uploadChunked(in, commitInfo, size) entry, err = o.uploadChunked(in, commitInfo, size)
} else { } else {
err = o.fs.pacer.CallNoRetry(func() (bool, error) { err = o.fs.pacer.CallNoRetry(func() (bool, error) {
@ -921,11 +955,23 @@ func (o *Object) Remove() (err error) {
return err return err
} }
type writeCounter struct {
Written uint64
}
// Write implements the io.Writer interface.
func (wc *writeCounter) Write(p []byte) (int, error) {
n := len(p)
wc.Written += uint64(n)
return n, nil
}
// Check the interfaces are satisfied // Check the interfaces are satisfied
var ( var (
_ fs.Fs = (*Fs)(nil) _ fs.Fs = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil) _ fs.Copier = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil) _ fs.Purger = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil) _ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil)
_ fs.Object = (*Object)(nil) _ fs.Object = (*Object)(nil)

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -358,7 +358,7 @@ func NewAccount(in io.ReadCloser, obj Object) *Account {
func (acc *Account) WithBuffer() *Account { func (acc *Account) WithBuffer() *Account {
acc.withBuf = true acc.withBuf = true
var buffers int var buffers int
if acc.size >= int64(Config.BufferSize) { if acc.size >= int64(Config.BufferSize) || acc.size == -1 {
buffers = int(int64(Config.BufferSize) / asyncBufferSize) buffers = int(int64(Config.BufferSize) / asyncBufferSize)
} else { } else {
buffers = int(acc.size / asyncBufferSize) buffers = int(acc.size / asyncBufferSize)

View file

@ -3,8 +3,10 @@ package fs
import ( import (
"io" "io"
"io/ioutil"
"log" "log"
"math" "math"
"os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"sort" "sort"
@ -304,6 +306,13 @@ type Features struct {
// exists. // exists.
PutUnchecked func(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error) PutUnchecked func(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
// PutStream uploads to the remote path with the modTime given of indeterminate size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
PutStream func(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
// CleanUp the trash in the Fs // CleanUp the trash in the Fs
// //
// Implement this if you have a way of emptying the trash or // Implement this if you have a way of emptying the trash or
@ -357,6 +366,9 @@ func (ft *Features) Fill(f Fs) *Features {
if do, ok := f.(PutUncheckeder); ok { if do, ok := f.(PutUncheckeder); ok {
ft.PutUnchecked = do.PutUnchecked ft.PutUnchecked = do.PutUnchecked
} }
if do, ok := f.(PutStreamer); ok {
ft.PutStream = do.PutStream
}
if do, ok := f.(CleanUpper); ok { if do, ok := f.(CleanUpper); ok {
ft.CleanUp = do.CleanUp ft.CleanUp = do.CleanUp
} }
@ -402,6 +414,9 @@ func (ft *Features) Mask(f Fs) *Features {
if mask.PutUnchecked == nil { if mask.PutUnchecked == nil {
ft.PutUnchecked = nil ft.PutUnchecked = nil
} }
if mask.PutStream == nil {
ft.PutStream = nil
}
if mask.CleanUp == nil { if mask.CleanUp == nil {
ft.CleanUp = nil ft.CleanUp = nil
} }
@ -508,6 +523,16 @@ type PutUncheckeder interface {
PutUnchecked(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error) PutUnchecked(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
} }
// PutStreamer is an optional interface for Fs
type PutStreamer interface {
// PutStream uploads to the remote path with the modTime given of indeterminate size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
PutStream(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
}
// CleanUpper is an optional interfaces for Fs // CleanUpper is an optional interfaces for Fs
type CleanUpper interface { type CleanUpper interface {
// CleanUp the trash in the Fs // CleanUp the trash in the Fs
@ -617,6 +642,21 @@ func NewFs(path string) (Fs, error) {
return fsInfo.NewFs(configName, fsPath) return fsInfo.NewFs(configName, fsPath)
} }
// temporaryLocalFs creates a local FS in the OS's temporary directory.
//
// No cleanup is performed, the caller must call Purge on the Fs themselves.
func temporaryLocalFs() (Fs, error) {
path, err := ioutil.TempDir("", "rclone-spool")
if err == nil {
err = os.Remove(path)
}
if err != nil {
return nil, err
}
path = filepath.ToSlash(path)
return NewFs(path)
}
// CheckClose is a utility function used to check the return from // CheckClose is a utility function used to check the return from
// Close in a defer statement. // Close in a defer statement.
func CheckClose(c io.Closer, err *error) { func CheckClose(c io.Closer, err *error) {

View file

@ -6,6 +6,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"mime" "mime"
"path" "path"
@ -13,6 +14,7 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/spf13/pflag" "github.com/spf13/pflag"
@ -1510,6 +1512,60 @@ func Cat(f Fs, w io.Writer, offset, count int64) error {
}) })
} }
// Rcat reads data from the Reader until EOF and uploads it to a file on remote
func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (err error) {
Stats.Transferring(dstFileName)
defer func() {
Stats.DoneTransferring(dstFileName, err == nil)
}()
fStreamTo := fdst
canStream := fdst.Features().PutStream != nil
if !canStream {
Debugf(fdst, "Target remote doesn't support streaming uploads, creating temporary local FS to spool file")
tmpLocalFs, err := temporaryLocalFs()
if err != nil {
return errors.Wrap(err, "Failed to create temporary local FS to spool file")
}
defer func() {
err := Purge(tmpLocalFs)
if err != nil {
Infof(tmpLocalFs, "Failed to cleanup temporary FS: %v", err)
}
}()
fStreamTo = tmpLocalFs
}
objInfo := NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil)
// work out which hash to use - limit to 1 hash in common
var common HashSet
hashType := HashNone
if !Config.SizeOnly {
common = fStreamTo.Hashes().Overlap(SupportedHashes)
if common.Count() > 0 {
hashType = common.GetOne()
common = HashSet(hashType)
}
}
hashOption := &HashesOption{Hashes: common}
in := NewAccountSizeName(in0, -1, dstFileName).WithBuffer()
if Config.DryRun {
Logf("stdin", "Not copying as --dry-run")
// prevents "broken pipe" errors
_, err = io.Copy(ioutil.Discard, in)
return err
}
tmpObj, err := fStreamTo.Features().PutStream(in, objInfo, hashOption)
if err == nil && !canStream {
err = Copy(fdst, nil, dstFileName, tmpObj)
}
return err
}
// Rmdirs removes any empty directories (or directories only // Rmdirs removes any empty directories (or directories only
// containing empty directories) under f, including f. // containing empty directories) under f, including f.
func Rmdirs(f Fs, dir string) error { func Rmdirs(f Fs, dir string) error {

View file

@ -697,6 +697,23 @@ func TestCat(t *testing.T) {
} }
} }
func TestRcat(t *testing.T) {
r := NewRun(t)
defer r.Finalise()
fstest.CheckListing(t, r.fremote, []fstest.Item{})
data := "this is some really nice test data"
path := "file_from_pipe"
in := ioutil.NopCloser(strings.NewReader(data))
err := fs.Rcat(r.fremote, path, in, t1)
require.NoError(t, err)
file := fstest.NewItem(path, data, t1)
fstest.CheckItems(t, r.fremote, file)
}
func TestRmdirs(t *testing.T) { func TestRmdirs(t *testing.T) {
r := NewRun(t) r := NewRun(t)
defer r.Finalise() defer r.Finalise()

View file

@ -119,7 +119,7 @@ func (i *Item) CheckHashes(t *testing.T, obj fs.Object) {
// Check checks all the attributes of the object are correct // Check checks all the attributes of the object are correct
func (i *Item) Check(t *testing.T, obj fs.Object, precision time.Duration) { func (i *Item) Check(t *testing.T, obj fs.Object, precision time.Duration) {
i.CheckHashes(t, obj) i.CheckHashes(t, obj)
assert.Equal(t, i.Size, obj.Size(), fmt.Sprintf("%s: size incorrect", i.Path)) assert.Equal(t, i.Size, obj.Size(), fmt.Sprintf("%s: size incorrect file=%d vs obj=%d", i.Path, i.Size, obj.Size()))
i.CheckModTime(t, obj, obj.ModTime(), precision) i.CheckModTime(t, obj, obj.ModTime(), precision)
} }

View file

@ -877,6 +877,47 @@ func TestObjectRemove(t *testing.T) {
fstest.CheckListing(t, remote, []fstest.Item{file2}) fstest.CheckListing(t, remote, []fstest.Item{file2})
} }
// TestFsPutUnknownLengthFile tests uploading files when size is not known in advance
func TestFsPutUnknownLengthFile(t *testing.T) {
skipIfNotOk(t)
file := fstest.Item{
ModTime: fstest.Time("2001-02-03T04:05:06.499999999Z"),
Path: "piped data.txt",
Size: -1, // use unknown size during upload
}
tries := 1
const maxTries = 10
again:
contentSize := 100
contents := fstest.RandomString(contentSize)
buf := bytes.NewBufferString(contents)
hash := fs.NewMultiHasher()
in := io.TeeReader(buf, hash)
file.Size = -1
obji := fs.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, nil, nil)
obj, err := remote.Put(in, obji)
if err != nil {
// Retry if err returned a retry error
if fs.IsRetryError(err) && tries < maxTries {
t.Logf("Put error: %v - low level retry %d/%d", err, tries, maxTries)
time.Sleep(2 * time.Second)
tries++
goto again
}
require.NoError(t, err, fmt.Sprintf("Put Unknown Length error: %v", err))
}
file.Hashes = hash.Sums()
file.Size = int64(contentSize) // use correct size when checking
file.Check(t, obj, remote.Precision())
// Re-read the object and check again
obj = findObject(t, file.Path)
file.Check(t, obj, remote.Precision())
}
// TestObjectPurge tests Purge // TestObjectPurge tests Purge
func TestObjectPurge(t *testing.T) { func TestObjectPurge(t *testing.T) {
skipIfNotOk(t) skipIfNotOk(t)

View file

@ -372,6 +372,11 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
return o, err return o, err
} }
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(in, src, options...)
}
// getInfo reads the FileInfo for a path // getInfo reads the FileInfo for a path
func (f *Fs) getInfo(remote string) (fi *FileInfo, err error) { func (f *Fs) getInfo(remote string) (fi *FileInfo, err error) {
// defer fs.Trace(remote, "")("fi=%v, err=%v", &fi, &err) // defer fs.Trace(remote, "")("fi=%v, err=%v", &fi, &err)
@ -709,5 +714,6 @@ var (
_ fs.Fs = &Fs{} _ fs.Fs = &Fs{}
_ fs.Mover = &Fs{} _ fs.Mover = &Fs{}
_ fs.DirMover = &Fs{} _ fs.DirMover = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.Object = &Object{} _ fs.Object = &Object{}
) )

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -327,6 +327,11 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
return o, nil return o, nil
} }
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(in, src, options...)
}
// Mkdir creates the directory if it doesn't exist // Mkdir creates the directory if it doesn't exist
func (f *Fs) Mkdir(dir string) error { func (f *Fs) Mkdir(dir string) error {
// FIXME: https://github.com/syncthing/syncthing/blob/master/lib/osutil/mkdirall_windows.go // FIXME: https://github.com/syncthing/syncthing/blob/master/lib/osutil/mkdirall_windows.go
@ -895,6 +900,7 @@ func cleanWindowsName(f *Fs, name string) string {
var ( var (
_ fs.Fs = &Fs{} _ fs.Fs = &Fs{}
_ fs.Purger = &Fs{} _ fs.Purger = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.Mover = &Fs{} _ fs.Mover = &Fs{}
_ fs.DirMover = &Fs{} _ fs.DirMover = &Fs{}
_ fs.Object = &Object{} _ fs.Object = &Object{}

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -314,6 +314,11 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
return o, nil return o, nil
} }
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(in, src, options...)
}
// mkParentDir makes the parent of remote if necessary and any // mkParentDir makes the parent of remote if necessary and any
// directories above that // directories above that
func (f *Fs) mkParentDir(remote string) error { func (f *Fs) mkParentDir(remote string) error {
@ -596,6 +601,7 @@ func (o *Object) Remove() error {
// Check the interfaces are satisfied // Check the interfaces are satisfied
var ( var (
_ fs.Fs = &Fs{} _ fs.Fs = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.Mover = &Fs{} _ fs.Mover = &Fs{}
_ fs.DirMover = &Fs{} _ fs.DirMover = &Fs{}
_ fs.Object = &Object{} _ fs.Object = &Object{}

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }

View file

@ -68,5 +68,6 @@ func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
func TestFsPutUnknownLengthFile(t *testing.T) { fstests.TestFsPutUnknownLengthFile(t) }
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }