From ece06f125ef7bd07a49560abb8f3aff28f60a4ee Mon Sep 17 00:00:00 2001
From: Michael Eischer <michael.eischer@fau.de>
Date: Sat, 7 Aug 2021 19:56:59 +0200
Subject: [PATCH] sftp: Limit concurrent backend operations

---
 internal/backend/location/location_test.go | 28 +++++-----
 internal/backend/sftp/config.go            | 22 +++++---
 internal/backend/sftp/config_test.go       | 30 +++++------
 internal/backend/sftp/layout_test.go       |  7 +--
 internal/backend/sftp/sftp.go              | 61 ++++++++++++++++++++--
 internal/backend/sftp/sftp_test.go         |  5 +-
 6 files changed, 112 insertions(+), 41 deletions(-)

diff --git a/internal/backend/location/location_test.go b/internal/backend/location/location_test.go
index ded9450e9..809379850 100644
--- a/internal/backend/location/location_test.go
+++ b/internal/backend/location/location_test.go
@@ -138,9 +138,10 @@ var parseTests = []struct {
 		"sftp:user@host:/srv/repo",
 		Location{Scheme: "sftp",
 			Config: sftp.Config{
-				User: "user",
-				Host: "host",
-				Path: "/srv/repo",
+				User:        "user",
+				Host:        "host",
+				Path:        "/srv/repo",
+				Connections: 5,
 			},
 		},
 	},
@@ -148,9 +149,10 @@ var parseTests = []struct {
 		"sftp:host:/srv/repo",
 		Location{Scheme: "sftp",
 			Config: sftp.Config{
-				User: "",
-				Host: "host",
-				Path: "/srv/repo",
+				User:        "",
+				Host:        "host",
+				Path:        "/srv/repo",
+				Connections: 5,
 			},
 		},
 	},
@@ -158,9 +160,10 @@ var parseTests = []struct {
 		"sftp://user@host/srv/repo",
 		Location{Scheme: "sftp",
 			Config: sftp.Config{
-				User: "user",
-				Host: "host",
-				Path: "srv/repo",
+				User:        "user",
+				Host:        "host",
+				Path:        "srv/repo",
+				Connections: 5,
 			},
 		},
 	},
@@ -168,9 +171,10 @@ var parseTests = []struct {
 		"sftp://user@host//srv/repo",
 		Location{Scheme: "sftp",
 			Config: sftp.Config{
-				User: "user",
-				Host: "host",
-				Path: "/srv/repo",
+				User:        "user",
+				Host:        "host",
+				Path:        "/srv/repo",
+				Connections: 5,
 			},
 		},
 	},
diff --git a/internal/backend/sftp/config.go b/internal/backend/sftp/config.go
index d5e0e5182..3b3d622a0 100644
--- a/internal/backend/sftp/config.go
+++ b/internal/backend/sftp/config.go
@@ -15,6 +15,15 @@ type Config struct {
 
 	Layout  string `option:"layout" help:"use this backend directory layout (default: auto-detect)"`
 	Command string `option:"command" help:"specify command to create sftp connection"`
+
+	Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
+}
+
+// NewConfig returns a new config with default options applied.
+func NewConfig() Config {
+	return Config{
+		Connections: 5,
+	}
 }
 
 func init() {
@@ -75,10 +84,11 @@ func ParseConfig(s string) (interface{}, error) {
 		return nil, errors.Fatal("sftp path starts with the tilde (~) character, that fails for most sftp servers.\nUse a relative directory, most servers interpret this as relative to the user's home directory.")
 	}
 
-	return Config{
-		User: user,
-		Host: host,
-		Port: port,
-		Path: p,
-	}, nil
+	cfg := NewConfig()
+	cfg.User = user
+	cfg.Host = host
+	cfg.Port = port
+	cfg.Path = p
+
+	return cfg, nil
 }
diff --git a/internal/backend/sftp/config_test.go b/internal/backend/sftp/config_test.go
index d785a4113..3772c038b 100644
--- a/internal/backend/sftp/config_test.go
+++ b/internal/backend/sftp/config_test.go
@@ -11,68 +11,68 @@ var configTests = []struct {
 	// first form, user specified sftp://user@host/dir
 	{
 		"sftp://user@host/dir/subdir",
-		Config{User: "user", Host: "host", Path: "dir/subdir"},
+		Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5},
 	},
 	{
 		"sftp://host/dir/subdir",
-		Config{Host: "host", Path: "dir/subdir"},
+		Config{Host: "host", Path: "dir/subdir", Connections: 5},
 	},
 	{
 		"sftp://host//dir/subdir",
-		Config{Host: "host", Path: "/dir/subdir"},
+		Config{Host: "host", Path: "/dir/subdir", Connections: 5},
 	},
 	{
 		"sftp://host:10022//dir/subdir",
-		Config{Host: "host", Port: "10022", Path: "/dir/subdir"},
+		Config{Host: "host", Port: "10022", Path: "/dir/subdir", Connections: 5},
 	},
 	{
 		"sftp://user@host:10022//dir/subdir",
-		Config{User: "user", Host: "host", Port: "10022", Path: "/dir/subdir"},
+		Config{User: "user", Host: "host", Port: "10022", Path: "/dir/subdir", Connections: 5},
 	},
 	{
 		"sftp://user@host/dir/subdir/../other",
-		Config{User: "user", Host: "host", Path: "dir/other"},
+		Config{User: "user", Host: "host", Path: "dir/other", Connections: 5},
 	},
 	{
 		"sftp://user@host/dir///subdir",
-		Config{User: "user", Host: "host", Path: "dir/subdir"},
+		Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5},
 	},
 
 	// IPv6 address.
 	{
 		"sftp://user@[::1]/dir",
-		Config{User: "user", Host: "::1", Path: "dir"},
+		Config{User: "user", Host: "::1", Path: "dir", Connections: 5},
 	},
 	// IPv6 address with port.
 	{
 		"sftp://user@[::1]:22/dir",
-		Config{User: "user", Host: "::1", Port: "22", Path: "dir"},
+		Config{User: "user", Host: "::1", Port: "22", Path: "dir", Connections: 5},
 	},
 
 	// second form, user specified sftp:user@host:/dir
 	{
 		"sftp:user@host:/dir/subdir",
-		Config{User: "user", Host: "host", Path: "/dir/subdir"},
+		Config{User: "user", Host: "host", Path: "/dir/subdir", Connections: 5},
 	},
 	{
 		"sftp:user@domain@host:/dir/subdir",
-		Config{User: "user@domain", Host: "host", Path: "/dir/subdir"},
+		Config{User: "user@domain", Host: "host", Path: "/dir/subdir", Connections: 5},
 	},
 	{
 		"sftp:host:../dir/subdir",
-		Config{Host: "host", Path: "../dir/subdir"},
+		Config{Host: "host", Path: "../dir/subdir", Connections: 5},
 	},
 	{
 		"sftp:user@host:dir/subdir:suffix",
-		Config{User: "user", Host: "host", Path: "dir/subdir:suffix"},
+		Config{User: "user", Host: "host", Path: "dir/subdir:suffix", Connections: 5},
 	},
 	{
 		"sftp:user@host:dir/subdir/../other",
-		Config{User: "user", Host: "host", Path: "dir/other"},
+		Config{User: "user", Host: "host", Path: "dir/other", Connections: 5},
 	},
 	{
 		"sftp:user@host:dir///subdir",
-		Config{User: "user", Host: "host", Path: "dir/subdir"},
+		Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5},
 	},
 }
 
diff --git a/internal/backend/sftp/layout_test.go b/internal/backend/sftp/layout_test.go
index 0d0214669..3b654b1bb 100644
--- a/internal/backend/sftp/layout_test.go
+++ b/internal/backend/sftp/layout_test.go
@@ -43,9 +43,10 @@ func TestLayout(t *testing.T) {
 
 			repo := filepath.Join(path, "repo")
 			be, err := sftp.Open(context.TODO(), sftp.Config{
-				Command: fmt.Sprintf("%q -e", sftpServer),
-				Path:    repo,
-				Layout:  test.layout,
+				Command:     fmt.Sprintf("%q -e", sftpServer),
+				Path:        repo,
+				Layout:      test.layout,
+				Connections: 5,
 			})
 			if err != nil {
 				t.Fatal(err)
diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go
index 3e803a0f4..ad38e19ab 100644
--- a/internal/backend/sftp/sftp.go
+++ b/internal/backend/sftp/sftp.go
@@ -31,6 +31,7 @@ type SFTP struct {
 	cmd    *exec.Cmd
 	result <-chan error
 
+	sem *backend.Semaphore
 	backend.Layout
 	Config
 }
@@ -116,6 +117,11 @@ func (r *SFTP) clientError() error {
 func Open(ctx context.Context, cfg Config) (*SFTP, error) {
 	debug.Log("open backend with config %#v", cfg)
 
+	sem, err := backend.NewSemaphore(cfg.Connections)
+	if err != nil {
+		return nil, err
+	}
+
 	cmd, args, err := buildSSHCommand(cfg)
 	if err != nil {
 		return nil, err
@@ -136,6 +142,7 @@ func Open(ctx context.Context, cfg Config) (*SFTP, error) {
 
 	sftp.Config = cfg
 	sftp.p = cfg.Path
+	sftp.sem = sem
 	return sftp, nil
 }
 
@@ -238,6 +245,10 @@ func Create(ctx context.Context, cfg Config) (*SFTP, error) {
 	return Open(ctx, cfg)
 }
 
+func (r *SFTP) Connections() uint {
+	return r.Config.Connections
+}
+
 // Location returns this backend's location (the directory name).
 func (r *SFTP) Location() string {
 	return r.p
@@ -280,6 +291,9 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader
 	tmpFilename := filename + "-restic-temp-" + tempSuffix()
 	dirname := r.Dirname(h)
 
+	r.sem.GetToken()
+	defer r.sem.ReleaseToken()
+
 	// create new file
 	f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY)
 
@@ -371,6 +385,19 @@ func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int
 	return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn)
 }
 
+// wrapReader wraps an io.ReadCloser to run an additional function on Close.
+type wrapReader struct {
+	io.ReadCloser
+	io.WriterTo
+	f func()
+}
+
+func (wr *wrapReader) Close() error {
+	err := wr.ReadCloser.Close()
+	wr.f()
+	return err
+}
+
 func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
 	debug.Log("Load %v, length %v, offset %v", h, length, offset)
 	if err := h.Valid(); err != nil {
@@ -381,26 +408,38 @@ func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offs
 		return nil, errors.New("offset is negative")
 	}
 
+	r.sem.GetToken()
 	f, err := r.c.Open(r.Filename(h))
 	if err != nil {
+		r.sem.ReleaseToken()
 		return nil, err
 	}
 
 	if offset > 0 {
 		_, err = f.Seek(offset, 0)
 		if err != nil {
+			r.sem.ReleaseToken()
 			_ = f.Close()
 			return nil, err
 		}
 	}
 
+	// use custom close wrapper to also provide WriteTo() on the wrapper
+	rd := &wrapReader{
+		ReadCloser: f,
+		WriterTo:   f,
+		f: func() {
+			r.sem.ReleaseToken()
+		},
+	}
+
 	if length > 0 {
 		// unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader
 		// limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go
-		return backend.LimitReadCloser(f, int64(length)), nil
+		return backend.LimitReadCloser(rd, int64(length)), nil
 	}
 
-	return f, nil
+	return rd, nil
 }
 
 // Stat returns information about a blob.
@@ -414,6 +453,9 @@ func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, erro
 		return restic.FileInfo{}, backoff.Permanent(err)
 	}
 
+	r.sem.GetToken()
+	defer r.sem.ReleaseToken()
+
 	fi, err := r.c.Lstat(r.Filename(h))
 	if err != nil {
 		return restic.FileInfo{}, errors.Wrap(err, "Lstat")
@@ -429,6 +471,9 @@ func (r *SFTP) Test(ctx context.Context, h restic.Handle) (bool, error) {
 		return false, err
 	}
 
+	r.sem.GetToken()
+	defer r.sem.ReleaseToken()
+
 	_, err := r.c.Lstat(r.Filename(h))
 	if os.IsNotExist(errors.Cause(err)) {
 		return false, nil
@@ -448,6 +493,9 @@ func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error {
 		return err
 	}
 
+	r.sem.GetToken()
+	defer r.sem.ReleaseToken()
+
 	return r.c.Remove(r.Filename(h))
 }
 
@@ -458,7 +506,14 @@ func (r *SFTP) List(ctx context.Context, t restic.FileType, fn func(restic.FileI
 
 	basedir, subdirs := r.Basedir(t)
 	walker := r.c.Walk(basedir)
-	for walker.Step() {
+	for {
+		r.sem.GetToken()
+		ok := walker.Step()
+		r.sem.ReleaseToken()
+		if !ok {
+			break
+		}
+
 		if walker.Err() != nil {
 			if r.IsNotExist(walker.Err()) {
 				debug.Log("ignoring non-existing directory")
diff --git a/internal/backend/sftp/sftp_test.go b/internal/backend/sftp/sftp_test.go
index 61bc49dc8..f0573dcb5 100644
--- a/internal/backend/sftp/sftp_test.go
+++ b/internal/backend/sftp/sftp_test.go
@@ -42,8 +42,9 @@ func newTestSuite(t testing.TB) *test.Suite {
 			t.Logf("create new backend at %v", dir)
 
 			cfg := sftp.Config{
-				Path:    dir,
-				Command: fmt.Sprintf("%q -e", sftpServer),
+				Path:        dir,
+				Command:     fmt.Sprintf("%q -e", sftpServer),
+				Connections: 5,
 			}
 			return cfg, nil
 		},