sftp: --sftp-connections to limit maximum number of connections

Done based on a similar feature in the ftp remote. However, the switch
name is different, as `concurrency` is already taken by a different
feature.
This commit is contained in:
Tomasz Melcer 2024-06-02 17:28:52 +02:00 committed by Nick Craig-Wood
parent c8d6b02dd6
commit d2af114139

View file

@ -332,6 +332,25 @@ cost of using more memory.
`,
Default: 64,
Advanced: true,
}, {
Name: "connections",
Help: strings.Replace(`Maximum number of SFTP simultaneous connections, 0 for unlimited.
Note that setting this is very likely to cause deadlocks so it should
be used with care.
If you are doing a sync or copy then make sure concurrency is one more
than the sum of |--transfers| and |--checkers|.
If you use |--check-first| then it just needs to be one more than the
maximum of |--checkers| and |--transfers|.
So for |concurrency 3| you'd use |--checkers 2 --transfers 2
--check-first| or |--checkers 1 --transfers 1|.
`, "|", "`", -1),
Default: 0,
Advanced: true,
}, {
Name: "set_env",
Default: fs.SpaceSepList{},
@ -502,6 +521,7 @@ type Options struct {
IdleTimeout fs.Duration `config:"idle_timeout"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
Concurrency int `config:"concurrency"`
Connections int `config:"connections"`
SetEnv fs.SpaceSepList `config:"set_env"`
Ciphers fs.SpaceSepList `config:"ciphers"`
KeyExchange fs.SpaceSepList `config:"key_exchange"`
@ -533,6 +553,7 @@ type Fs struct {
pacer *fs.Pacer // pacer for operations
savedpswd string
sessions atomic.Int32 // count in use sessions
tokens *pacer.TokenDispenser
}
// Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading)
@ -695,6 +716,9 @@ func (f *Fs) newSftpClient(client sshClient, opts ...sftp.ClientOption) (*sftp.C
// Get an SFTP connection from the pool, or open a new one
func (f *Fs) getSftpConnection(ctx context.Context) (c *conn, err error) {
accounting.LimitTPS(ctx)
if f.opt.Connections > 0 {
f.tokens.Get()
}
f.poolMu.Lock()
for len(f.pool) > 0 {
c = f.pool[0]
@ -717,6 +741,9 @@ func (f *Fs) getSftpConnection(ctx context.Context) (c *conn, err error) {
}
return false, nil
})
if f.opt.Connections > 0 && c == nil {
f.tokens.Put()
}
return c, err
}
@ -727,6 +754,9 @@ func (f *Fs) getSftpConnection(ctx context.Context) (c *conn, err error) {
// if err is not nil then it checks the connection is alive using a
// Getwd request
func (f *Fs) putSftpConnection(pc **conn, err error) {
if f.opt.Connections > 0 {
defer f.tokens.Put()
}
c := *pc
if !c.sshClient.CanReuse() {
return
@ -812,6 +842,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
if len(opt.SSH) != 0 && ((opt.User != currentUser && opt.User != "") || opt.Host != "" || (opt.Port != "22" && opt.Port != "")) {
fs.Logf(name, "--sftp-ssh is in use - ignoring user/host/port from config - set in the parameters to --sftp-ssh (remove them from the config to silence this warning)")
}
f.tokens = pacer.NewTokenDispenser(opt.Connections)
if opt.User == "" {
opt.User = currentUser