sftp: fix timeout when doing MD5SUM of large file

Before this change we were timing out MD5SUMs after 1 minute because
rclone was closing the SSH session when there were sessions still
aftive.

This change counts sessions active for all SSH sessions now (Upload,
Download, Hashes and running commands).

See: https://forum.rclone.org/t/while-rclone-copying-large-files-md5sum-failed-with-exit-status/26845/
This commit is contained in:
Nick Craig-Wood 2021-10-06 11:50:35 +01:00
parent a98e3ea6f1
commit 93d85015af

View file

@ -300,7 +300,7 @@ type Fs struct {
drain *time.Timer // used to drain the pool when we stop using the connections drain *time.Timer // used to drain the pool when we stop using the connections
pacer *fs.Pacer // pacer for operations pacer *fs.Pacer // pacer for operations
savedpswd string savedpswd string
transfers int32 // count in use references sessions int32 // count in use sessions
} }
// Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading) // Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading)
@ -363,21 +363,21 @@ func (c *conn) closed() error {
return nil return nil
} }
// Show that we are doing an upload or download // Show that we are using an ssh session
// //
// Call removeTransfer() when done // Call removeSession() when done
func (f *Fs) addTransfer() { func (f *Fs) addSession() {
atomic.AddInt32(&f.transfers, 1) atomic.AddInt32(&f.sessions, 1)
} }
// Show the upload or download done // Show the ssh session is no longer in use
func (f *Fs) removeTransfer() { func (f *Fs) removeSession() {
atomic.AddInt32(&f.transfers, -1) atomic.AddInt32(&f.sessions, -1)
} }
// getTransfers shows whether there are any transfers in progress // getSessions shows whether there are any sessions in use
func (f *Fs) getTransfers() int32 { func (f *Fs) getSessions() int32 {
return atomic.LoadInt32(&f.transfers) return atomic.LoadInt32(&f.sessions)
} }
// Open a new connection to the SFTP server. // Open a new connection to the SFTP server.
@ -506,8 +506,8 @@ func (f *Fs) putSftpConnection(pc **conn, err error) {
func (f *Fs) drainPool(ctx context.Context) (err error) { func (f *Fs) drainPool(ctx context.Context) (err error) {
f.poolMu.Lock() f.poolMu.Lock()
defer f.poolMu.Unlock() defer f.poolMu.Unlock()
if transfers := f.getTransfers(); transfers != 0 { if sessions := f.getSessions(); sessions != 0 {
fs.Debugf(f, "Not closing %d unused connections as %d transfers in progress", len(f.pool), transfers) fs.Debugf(f, "Not closing %d unused connections as %d sessions active", len(f.pool), sessions)
if f.opt.IdleTimeout > 0 { if f.opt.IdleTimeout > 0 {
f.drain.Reset(time.Duration(f.opt.IdleTimeout)) // nudge on the pool emptying timer f.drain.Reset(time.Duration(f.opt.IdleTimeout)) // nudge on the pool emptying timer
} }
@ -1093,6 +1093,9 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string
// run runds cmd on the remote end returning standard output // run runds cmd on the remote end returning standard output
func (f *Fs) run(ctx context.Context, cmd string) ([]byte, error) { func (f *Fs) run(ctx context.Context, cmd string) ([]byte, error) {
f.addSession() // Show session in use
defer f.removeSession()
c, err := f.getSftpConnection(ctx) c, err := f.getSftpConnection(ctx)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "run: get SFTP connection") return nil, errors.Wrap(err, "run: get SFTP connection")
@ -1231,6 +1234,8 @@ func (o *Object) Remote() string {
// Hash returns the selected checksum of the file // Hash returns the selected checksum of the file
// If no checksum is available it returns "" // If no checksum is available it returns ""
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) { func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
o.fs.addSession() // Show session in use
defer o.fs.removeSession()
if o.fs.opt.DisableHashCheck { if o.fs.opt.DisableHashCheck {
return "", nil return "", nil
} }
@ -1434,7 +1439,7 @@ func (f *Fs) newObjectReader(sftpFile *sftp.File) *objectReader {
done: make(chan struct{}), done: make(chan struct{}),
} }
// Show connection in use // Show connection in use
f.addTransfer() f.addSession()
go func() { go func() {
// Use sftpFile.WriteTo to pump data so that it gets a // Use sftpFile.WriteTo to pump data so that it gets a
@ -1465,7 +1470,7 @@ func (file *objectReader) Close() (err error) {
// Wait for the background process to finish // Wait for the background process to finish
<-file.done <-file.done
// Show connection no longer in use // Show connection no longer in use
file.f.removeTransfer() file.f.removeSession()
return err return err
} }
@ -1518,8 +1523,8 @@ func (sr *sizeReader) Size() int64 {
// Update a remote sftp file using the data <in> and ModTime from <src> // Update a remote sftp file using the data <in> and ModTime from <src>
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
o.fs.addTransfer() // Show transfer in progress o.fs.addSession() // Show session in use
defer o.fs.removeTransfer() defer o.fs.removeSession()
// Clear the hash cache since we are about to update the object // Clear the hash cache since we are about to update the object
o.md5sum = nil o.md5sum = nil
o.sha1sum = nil o.sha1sum = nil