sftp: Fix performance regression by re-enabling concurrent writes #5197

Betweeen rclone v1.54 and v1.55 there was an approx 3x performance
regression when transferring to distant SFTP servers (in particular
rsync.net).

This turned out to be due to the library github.com/pkg/sftp rclone
uses. Concurrent writes used to be enabled in this library by default
(for v1.12.0 as used in rclone v1.54) but they are no longer enabled
(for v1.13.0 as used in rclone v1.55) for safety reasons and it is
necessary to enable them specifically.

The safety concerns are due to the uncertainty as to whether writes
come in order and whether a half completed file might have holes in
it. This isn't a problem for rclone since a) it doesn't restart
uploads and b) it has a post-transfer checksum test.

This change introduces a new flag `--sftp-disable-concurrent-writes`
to control the feature which defaults to false, meaning that
concurrent writes are enabled as in v1.54.

However this isn't quite enough to fix the problem as the sftp library
needs to be able to sniff the size of the stream from the reader
passed in, so this also adds a `Size` interface to the reader to
enable this. This involved a patch to the library.

The library was reverted to v1.12.0 for v1.55.1 - this patch installs
v1.13.0+master to fix the Size interface problem.

See: https://github.com/pkg/sftp/issues/426
This commit is contained in:
Nick Craig-Wood 2021-04-05 10:32:20 +01:00
parent 4b1d28550a
commit 0537791d14
3 changed files with 60 additions and 29 deletions

View file

@ -224,6 +224,17 @@ have a server which returns
Then you may need to enable this flag.
If concurrent reads are disabled, the use_fstat option is ignored.
`,
Advanced: true,
}, {
Name: "disable_concurrent_writes",
Default: false,
Help: `If set don't use concurrent writes
Normally rclone uses concurrent writes to upload files. This improves
the performance greatly, especially for distant servers.
This option disables concurrent writes should that be necessary.
`,
Advanced: true,
}, {
@ -244,29 +255,30 @@ Set to 0 to keep connections indefinitely.
// Options defines the configuration for this backend
type Options struct {
Host string `config:"host"`
User string `config:"user"`
Port string `config:"port"`
Pass string `config:"pass"`
KeyPem string `config:"key_pem"`
KeyFile string `config:"key_file"`
KeyFilePass string `config:"key_file_pass"`
PubKeyFile string `config:"pubkey_file"`
KnownHostsFile string `config:"known_hosts_file"`
KeyUseAgent bool `config:"key_use_agent"`
UseInsecureCipher bool `config:"use_insecure_cipher"`
DisableHashCheck bool `config:"disable_hashcheck"`
AskPassword bool `config:"ask_password"`
PathOverride string `config:"path_override"`
SetModTime bool `config:"set_modtime"`
Md5sumCommand string `config:"md5sum_command"`
Sha1sumCommand string `config:"sha1sum_command"`
SkipLinks bool `config:"skip_links"`
Subsystem string `config:"subsystem"`
ServerCommand string `config:"server_command"`
UseFstat bool `config:"use_fstat"`
DisableConcurrentReads bool `config:"disable_concurrent_reads"`
IdleTimeout fs.Duration `config:"idle_timeout"`
Host string `config:"host"`
User string `config:"user"`
Port string `config:"port"`
Pass string `config:"pass"`
KeyPem string `config:"key_pem"`
KeyFile string `config:"key_file"`
KeyFilePass string `config:"key_file_pass"`
PubKeyFile string `config:"pubkey_file"`
KnownHostsFile string `config:"known_hosts_file"`
KeyUseAgent bool `config:"key_use_agent"`
UseInsecureCipher bool `config:"use_insecure_cipher"`
DisableHashCheck bool `config:"disable_hashcheck"`
AskPassword bool `config:"ask_password"`
PathOverride string `config:"path_override"`
SetModTime bool `config:"set_modtime"`
Md5sumCommand string `config:"md5sum_command"`
Sha1sumCommand string `config:"sha1sum_command"`
SkipLinks bool `config:"skip_links"`
Subsystem string `config:"subsystem"`
ServerCommand string `config:"server_command"`
UseFstat bool `config:"use_fstat"`
DisableConcurrentReads bool `config:"disable_concurrent_reads"`
DisableConcurrentWrites bool `config:"disable_concurrent_writes"`
IdleTimeout fs.Duration `config:"idle_timeout"`
}
// Fs stores the interface to the remote SFTP files
@ -414,8 +426,8 @@ func (f *Fs) newSftpClient(conn *ssh.Client, opts ...sftp.ClientOption) (*sftp.C
opts = opts[:len(opts):len(opts)] // make sure we don't overwrite the callers opts
opts = append(opts,
sftp.UseFstat(f.opt.UseFstat),
// FIXME disabled after library reversion
// sftp.UseConcurrentReads(!f.opt.DisableConcurrentReads),
sftp.UseConcurrentReads(!f.opt.DisableConcurrentReads),
sftp.UseConcurrentWrites(!f.opt.DisableConcurrentWrites),
)
if f.opt.DisableConcurrentReads { // FIXME
fs.Errorf(f, "Ignoring disable_concurrent_reads after library reversion - see #5197")
@ -1494,6 +1506,19 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
return in, nil
}
type sizeReader struct {
io.Reader
size int64
}
// Size returns the expected size of the stream
//
// It is used in sftpFile.ReadFrom as a hint to work out the
// concurrency needed
func (sr *sizeReader) Size() int64 {
return sr.size
}
// Update a remote sftp file using the data <in> and ModTime from <src>
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
o.fs.addTransfer() // Show transfer in progress
@ -1525,7 +1550,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
fs.Debugf(src, "Removed after failed upload: %v", err)
}
}
_, err = file.ReadFrom(in)
_, err = file.ReadFrom(&sizeReader{Reader: in, size: src.Size()})
if err != nil {
remove()
return errors.Wrap(err, "Update ReadFrom failed")