sftp: limit new connections per second
This commit is contained in:
parent
c6b844977a
commit
5b6585f57d
1 changed files with 13 additions and 0 deletions
13
sftp/sftp.go
13
sftp/sftp.go
|
@ -5,6 +5,7 @@
|
||||||
package sftp
|
package sftp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
@ -19,6 +20,11 @@ import (
|
||||||
"github.com/pkg/sftp"
|
"github.com/pkg/sftp"
|
||||||
"github.com/xanzy/ssh-agent"
|
"github.com/xanzy/ssh-agent"
|
||||||
"golang.org/x/crypto/ssh"
|
"golang.org/x/crypto/ssh"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
connectionsPerSecond = 10 // don't make more than this many ssh connections/s
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -69,6 +75,7 @@ type Fs struct {
|
||||||
cachedHashes *fs.HashSet
|
cachedHashes *fs.HashSet
|
||||||
poolMu sync.Mutex
|
poolMu sync.Mutex
|
||||||
pool []*conn
|
pool []*conn
|
||||||
|
connLimit *rate.Limiter // for limiting number of connections per second
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading)
|
// Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading)
|
||||||
|
@ -138,6 +145,11 @@ func (c *conn) closed() error {
|
||||||
|
|
||||||
// Open a new connection to the SFTP server.
|
// Open a new connection to the SFTP server.
|
||||||
func (f *Fs) sftpConnection() (c *conn, err error) {
|
func (f *Fs) sftpConnection() (c *conn, err error) {
|
||||||
|
// Rate limit rate of new connections
|
||||||
|
err = f.connLimit.Wait(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "limiter failed in connect")
|
||||||
|
}
|
||||||
c = &conn{
|
c = &conn{
|
||||||
err: make(chan error, 1),
|
err: make(chan error, 1),
|
||||||
}
|
}
|
||||||
|
@ -276,6 +288,7 @@ func NewFs(name, root string) (fs.Fs, error) {
|
||||||
port: port,
|
port: port,
|
||||||
url: "sftp://" + user + "@" + host + ":" + port + "/" + root,
|
url: "sftp://" + user + "@" + host + ":" + port + "/" + root,
|
||||||
mkdirLock: newStringLock(),
|
mkdirLock: newStringLock(),
|
||||||
|
connLimit: rate.NewLimiter(rate.Limit(connectionsPerSecond), 1),
|
||||||
}
|
}
|
||||||
f.features = (&fs.Features{}).Fill(f)
|
f.features = (&fs.Features{}).Fill(f)
|
||||||
// Make a connection and pool it to return errors early
|
// Make a connection and pool it to return errors early
|
||||||
|
|
Loading…
Reference in a new issue