From f158a398f301a12b166cb9f812e7e426f7ed69f8 Mon Sep 17 00:00:00 2001 From: Sebastian Brandt Date: Thu, 7 Nov 2019 14:57:42 +0100 Subject: [PATCH] sftp: Retry Creation of Connection - fixes #3656 Removes the existing rate limiter because it is implicitly included in the pacer. --- backend/sftp/sftp.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/backend/sftp/sftp.go b/backend/sftp/sftp.go index 9dbfc9320..d426b6af3 100644 --- a/backend/sftp/sftp.go +++ b/backend/sftp/sftp.go @@ -29,15 +29,17 @@ import ( "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/env" + "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/readers" sshagent "github.com/xanzy/ssh-agent" "golang.org/x/crypto/ssh" - "golang.org/x/time/rate" ) const ( - connectionsPerSecond = 10 // don't make more than this many ssh connections/s hashCommandNotSupported = "none" + minSleep = 100 * time.Millisecond + maxSleep = 2 * time.Second + decayConstant = 2 // bigger for slower decay, exponential ) var ( @@ -190,7 +192,7 @@ type Fs struct { cachedHashes *hash.Set poolMu sync.Mutex pool []*conn - connLimit *rate.Limiter // for limiting number of connections per second + pacer *fs.Pacer // pacer for operations } // Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading) @@ -270,10 +272,6 @@ func (c *conn) closed() error { // Open a new connection to the SFTP server. func (f *Fs) sftpConnection() (c *conn, err error) { // Rate limit rate of new connections - err = f.connLimit.Wait(context.Background()) - if err != nil { - return nil, errors.Wrap(err, "limiter failed in connect") - } c = &conn{ err: make(chan error, 1), } @@ -307,7 +305,14 @@ func (f *Fs) getSftpConnection() (c *conn, err error) { if c != nil { return c, nil } - return f.sftpConnection() + err = f.pacer.Call(func() (bool, error) { + c, err = f.sftpConnection() + if err != nil { + return true, err + } + return false, nil + }) + return c, err } // Return an SFTP connection to the pool @@ -465,7 +470,7 @@ func NewFsWithConnection(ctx context.Context, name string, root string, m config config: sshConfig, url: "sftp://" + opt.User + "@" + opt.Host + ":" + opt.Port + "/" + root, mkdirLock: newStringLock(), - connLimit: rate.NewLimiter(rate.Limit(connectionsPerSecond), 1), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.features = (&fs.Features{ CanHaveEmptyDirectories: true,