From 47eab397ba6e5dbf3d6de1f71265334393f279c7 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Mon, 7 Aug 2017 17:19:37 +0100 Subject: [PATCH] sftp: implement connection pooling for multiple ssh connections A connection may be opened for each `--transfers` and `--checkers` now. Connections are checked when putting them in the pool and getting them out the pool so it should recover from network errors much better. This fixes #1561, fixes #1541, fixes #1381, fixes #1158, fixes #1538 --- sftp/sftp.go | 253 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 216 insertions(+), 37 deletions(-) diff --git a/sftp/sftp.go b/sftp/sftp.go index d67948c4e..6e0fd8865 100644 --- a/sftp/sftp.go +++ b/sftp/sftp.go @@ -11,6 +11,7 @@ import ( "path" "regexp" "strings" + "sync" "time" "github.com/ncw/rclone/fs" @@ -60,11 +61,14 @@ type Fs struct { name string root string features *fs.Features // optional features + config *ssh.ClientConfig + host string + port string url string - sshClient *ssh.Client - sftpClient *sftp.Client mkdirLock *stringLock cachedHashes *fs.HashSet + poolMu sync.Mutex + pool []*conn } // Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading) @@ -100,6 +104,114 @@ func Dial(network, addr string, config *ssh.ClientConfig) (*ssh.Client, error) { return ssh.NewClient(c, chans, reqs), nil } +// conn encapsulates an ssh client and corresponding sftp client +type conn struct { + sshClient *ssh.Client + sftpClient *sftp.Client + err chan error +} + +// Wait for connection to close +func (c *conn) wait() { + c.err <- c.sshClient.Conn.Wait() +} + +// Closes the connection +func (c *conn) close() error { + sftpErr := c.sftpClient.Close() + sshErr := c.sshClient.Close() + if sftpErr != nil { + return sftpErr + } + return sshErr +} + +// Returns an error if closed +func (c *conn) closed() error { + select { + case err := <-c.err: + return err + default: + } + return nil +} + +// Open a new connection to the SFTP server. +func (f *Fs) sftpConnection() (c *conn, err error) { + c = &conn{ + err: make(chan error, 1), + } + c.sshClient, err = Dial("tcp", f.host+":"+f.port, f.config) + if err != nil { + return nil, errors.Wrap(err, "couldn't connect SSH") + } + c.sftpClient, err = sftp.NewClient(c.sshClient) + if err != nil { + _ = c.sshClient.Close() + return nil, errors.Wrap(err, "couldn't initialise SFTP") + } + go c.wait() + return c, nil +} + +// Get an SFTP connection from the pool, or open a new one +func (f *Fs) getSftpConnection() (c *conn, err error) { + f.poolMu.Lock() + for len(f.pool) > 0 { + c = f.pool[0] + f.pool = f.pool[1:] + err := c.closed() + if err == nil { + break + } + fs.Errorf(f, "Discarding closed SSH connection: %v", err) + c = nil + } + f.poolMu.Unlock() + if c != nil { + return c, nil + } + return f.sftpConnection() +} + +// Return an SFTP connection to the pool +// +// It nils the pointed to connection out so it can't be reused +// +// if err is not nil then it checks the connection is alive using a +// Getwd request +func (f *Fs) putSftpConnection(pc **conn, err error) { + c := *pc + *pc = nil + if err != nil { + // work out if this is an expected error + underlyingErr := errors.Cause(err) + isRegularError := false + switch underlyingErr { + case os.ErrNotExist: + isRegularError = true + default: + switch underlyingErr.(type) { + case *sftp.StatusError, *os.PathError: + isRegularError = true + } + } + // If not a regular SFTP error code then check the connection + if !isRegularError { + _, nopErr := c.sftpClient.Getwd() + if nopErr != nil { + fs.Debugf(f, "Connection failed, closing: %v", nopErr) + _ = c.close() + return + } + fs.Debugf(f, "Connection OK after error: %v", err) + } + } + f.poolMu.Lock() + f.pool = append(f.pool, c) + f.poolMu.Unlock() +} + // NewFs creates a new Fs object from the name and root. It connects to // the host specified in the config file. func NewFs(name, root string) (fs.Fs, error) { @@ -156,24 +268,22 @@ func NewFs(name, root string) (fs.Fs, error) { config.Auth = append(config.Auth, ssh.Password(clearpass)) } - sshClient, err := Dial("tcp", host+":"+port, config) - if err != nil { - return nil, errors.Wrap(err, "couldn't connect ssh") - } - sftpClient, err := sftp.NewClient(sshClient) - if err != nil { - _ = sshClient.Close() - return nil, errors.Wrap(err, "couldn't initialise SFTP") - } f := &Fs{ - name: name, - root: root, - sshClient: sshClient, - sftpClient: sftpClient, - url: "sftp://" + user + "@" + host + ":" + port + "/" + root, - mkdirLock: newStringLock(), + name: name, + root: root, + config: config, + host: host, + port: port, + url: "sftp://" + user + "@" + host + ":" + port + "/" + root, + mkdirLock: newStringLock(), } f.features = (&fs.Features{}).Fill(f) + // Make a connection and pool it to return errors early + c, err := f.getSftpConnection() + if err != nil { + return nil, errors.Wrap(err, "NewFs") + } + f.putSftpConnection(&c, nil) if root != "" { // Check to see if the root actually an existing file remote := path.Base(root) @@ -193,11 +303,6 @@ func NewFs(name, root string) (fs.Fs, error) { // return an error with an fs which points to the parent return f, fs.ErrorIsFile } - go func() { - // FIXME re-open the connection here... - err := f.sshClient.Conn.Wait() - fs.Errorf(f, "SSH connection closed: %v", err) - }() return f, nil } @@ -245,7 +350,12 @@ func (f *Fs) dirExists(dir string) (bool, error) { if dir == "" { dir = "." } - info, err := f.sftpClient.Stat(dir) + c, err := f.getSftpConnection() + if err != nil { + return false, errors.Wrap(err, "dirExists") + } + info, err := c.sftpClient.Stat(dir) + f.putSftpConnection(&c, err) if err != nil { if os.IsNotExist(err) { return false, nil @@ -280,7 +390,12 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) { if sftpDir == "" { sftpDir = "." } - infos, err := f.sftpClient.ReadDir(sftpDir) + c, err := f.getSftpConnection() + if err != nil { + return nil, errors.Wrap(err, "List") + } + infos, err := c.sftpClient.ReadDir(sftpDir) + f.putSftpConnection(&c, err) if err != nil { return nil, errors.Wrapf(err, "error listing %q", dir) } @@ -350,7 +465,12 @@ func (f *Fs) mkdir(dirPath string) error { if err != nil { return err } - err = f.sftpClient.Mkdir(dirPath) + c, err := f.getSftpConnection() + if err != nil { + return errors.Wrap(err, "mkdir") + } + err = c.sftpClient.Mkdir(dirPath) + f.putSftpConnection(&c, err) if err != nil { return errors.Wrapf(err, "mkdir %q failed", dirPath) } @@ -366,7 +486,13 @@ func (f *Fs) Mkdir(dir string) error { // Rmdir removes the root directory of the Fs object func (f *Fs) Rmdir(dir string) error { root := path.Join(f.root, dir) - return f.sftpClient.Remove(root) + c, err := f.getSftpConnection() + if err != nil { + return errors.Wrap(err, "Rmdir") + } + err = c.sftpClient.Remove(root) + f.putSftpConnection(&c, err) + return err } // Move renames a remote sftp file object @@ -380,10 +506,15 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { if err != nil { return nil, errors.Wrap(err, "Move mkParentDir failed") } - err = f.sftpClient.Rename( + c, err := f.getSftpConnection() + if err != nil { + return nil, errors.Wrap(err, "Move") + } + err = c.sftpClient.Rename( srcObj.path(), path.Join(f.root, remote), ) + f.putSftpConnection(&c, err) if err != nil { return nil, errors.Wrap(err, "Move Rename failed") } @@ -427,10 +558,15 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { } // Do the move - err = f.sftpClient.Rename( + c, err := f.getSftpConnection() + if err != nil { + return errors.Wrap(err, "DirMove") + } + err = c.sftpClient.Rename( srcPath, dstPath, ) + f.putSftpConnection(&c, err) if err != nil { return errors.Wrapf(err, "DirMove Rename(%q,%q) failed", srcPath, dstPath) } @@ -443,7 +579,13 @@ func (f *Fs) Hashes() fs.HashSet { return *f.cachedHashes } - session, err := f.sshClient.NewSession() + c, err := f.getSftpConnection() + if err != nil { + fs.Errorf(f, "Couldn't get SSH connection to figure out Hashes: %v", err) + return fs.HashSet(fs.HashNone) + } + defer f.putSftpConnection(&c, err) + session, err := c.sshClient.NewSession() if err != nil { return fs.HashSet(fs.HashNone) } @@ -451,7 +593,7 @@ func (f *Fs) Hashes() fs.HashSet { expectedSha1 := "03cfd743661f07975fa2f1220c5194cbaff48451" _ = session.Close() - session, err = f.sshClient.NewSession() + session, err = c.sshClient.NewSession() if err != nil { return fs.HashSet(fs.HashNone) } @@ -505,7 +647,12 @@ func (o *Object) Hash(r fs.HashType) (string, error) { return *o.sha1sum, nil } - session, err := o.fs.sshClient.NewSession() + c, err := o.fs.getSftpConnection() + if err != nil { + return "", errors.Wrap(err, "Hash") + } + session, err := c.sshClient.NewSession() + o.fs.putSftpConnection(&c, err) if err != nil { o.fs.cachedHashes = nil // Something has changed on the remote system return "", fs.ErrHashUnsupported @@ -576,7 +723,12 @@ func (o *Object) setMetadata(info os.FileInfo) { // stat updates the info in the Object func (o *Object) stat() error { - info, err := o.fs.sftpClient.Stat(o.path()) + c, err := o.fs.getSftpConnection() + if err != nil { + return errors.Wrap(err, "stat") + } + info, err := c.sftpClient.Stat(o.path()) + o.fs.putSftpConnection(&c, err) if err != nil { if os.IsNotExist(err) { return fs.ErrorObjectNotFound @@ -594,7 +746,12 @@ func (o *Object) stat() error { // // it also updates the info field func (o *Object) SetModTime(modTime time.Time) error { - err := o.fs.sftpClient.Chtimes(o.path(), modTime, modTime) + c, err := o.fs.getSftpConnection() + if err != nil { + return errors.Wrap(err, "SetModTime") + } + err = c.sftpClient.Chtimes(o.path(), modTime, modTime) + o.fs.putSftpConnection(&c, err) if err != nil { return errors.Wrap(err, "SetModTime failed") } @@ -636,7 +793,12 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { } } } - sftpFile, err := o.fs.sftpClient.Open(o.path()) + c, err := o.fs.getSftpConnection() + if err != nil { + return nil, errors.Wrap(err, "Open") + } + sftpFile, err := c.sftpClient.Open(o.path()) + o.fs.putSftpConnection(&c, err) if err != nil { return nil, errors.Wrap(err, "Open failed") } @@ -655,13 +817,24 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { // Update a remote sftp file using the data and ModTime from func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { - file, err := o.fs.sftpClient.Create(o.path()) + c, err := o.fs.getSftpConnection() + if err != nil { + return errors.Wrap(err, "Update") + } + file, err := c.sftpClient.Create(o.path()) + o.fs.putSftpConnection(&c, err) if err != nil { return errors.Wrap(err, "Update Create failed") } // remove the file if upload failed remove := func() { - removeErr := o.fs.sftpClient.Remove(o.path()) + c, removeErr := o.fs.getSftpConnection() + if removeErr != nil { + fs.Debugf(src, "Failed to open new SSH connection for delete: %v", removeErr) + return + } + removeErr = c.sftpClient.Remove(o.path()) + o.fs.putSftpConnection(&c, removeErr) if removeErr != nil { fs.Debugf(src, "Failed to remove: %v", removeErr) } else { @@ -687,7 +860,13 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio // Remove a remote sftp file object func (o *Object) Remove() error { - return o.fs.sftpClient.Remove(o.path()) + c, err := o.fs.getSftpConnection() + if err != nil { + return errors.Wrap(err, "Remove") + } + err = c.sftpClient.Remove(o.path()) + o.fs.putSftpConnection(&c, err) + return err } // Check the interfaces are satisfied