sftp: Limit concurrent backend operations

This commit is contained in:
Michael Eischer 2021-08-07 19:56:59 +02:00
parent cd783358d3
commit ece06f125e
6 changed files with 112 additions and 41 deletions

View file

@ -138,9 +138,10 @@ var parseTests = []struct {
"sftp:user@host:/srv/repo", "sftp:user@host:/srv/repo",
Location{Scheme: "sftp", Location{Scheme: "sftp",
Config: sftp.Config{ Config: sftp.Config{
User: "user", User: "user",
Host: "host", Host: "host",
Path: "/srv/repo", Path: "/srv/repo",
Connections: 5,
}, },
}, },
}, },
@ -148,9 +149,10 @@ var parseTests = []struct {
"sftp:host:/srv/repo", "sftp:host:/srv/repo",
Location{Scheme: "sftp", Location{Scheme: "sftp",
Config: sftp.Config{ Config: sftp.Config{
User: "", User: "",
Host: "host", Host: "host",
Path: "/srv/repo", Path: "/srv/repo",
Connections: 5,
}, },
}, },
}, },
@ -158,9 +160,10 @@ var parseTests = []struct {
"sftp://user@host/srv/repo", "sftp://user@host/srv/repo",
Location{Scheme: "sftp", Location{Scheme: "sftp",
Config: sftp.Config{ Config: sftp.Config{
User: "user", User: "user",
Host: "host", Host: "host",
Path: "srv/repo", Path: "srv/repo",
Connections: 5,
}, },
}, },
}, },
@ -168,9 +171,10 @@ var parseTests = []struct {
"sftp://user@host//srv/repo", "sftp://user@host//srv/repo",
Location{Scheme: "sftp", Location{Scheme: "sftp",
Config: sftp.Config{ Config: sftp.Config{
User: "user", User: "user",
Host: "host", Host: "host",
Path: "/srv/repo", Path: "/srv/repo",
Connections: 5,
}, },
}, },
}, },

View file

@ -15,6 +15,15 @@ type Config struct {
Layout string `option:"layout" help:"use this backend directory layout (default: auto-detect)"` Layout string `option:"layout" help:"use this backend directory layout (default: auto-detect)"`
Command string `option:"command" help:"specify command to create sftp connection"` Command string `option:"command" help:"specify command to create sftp connection"`
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
}
// NewConfig returns a new config with default options applied.
func NewConfig() Config {
return Config{
Connections: 5,
}
} }
func init() { func init() {
@ -75,10 +84,11 @@ func ParseConfig(s string) (interface{}, error) {
return nil, errors.Fatal("sftp path starts with the tilde (~) character, that fails for most sftp servers.\nUse a relative directory, most servers interpret this as relative to the user's home directory.") return nil, errors.Fatal("sftp path starts with the tilde (~) character, that fails for most sftp servers.\nUse a relative directory, most servers interpret this as relative to the user's home directory.")
} }
return Config{ cfg := NewConfig()
User: user, cfg.User = user
Host: host, cfg.Host = host
Port: port, cfg.Port = port
Path: p, cfg.Path = p
}, nil
return cfg, nil
} }

View file

@ -11,68 +11,68 @@ var configTests = []struct {
// first form, user specified sftp://user@host/dir // first form, user specified sftp://user@host/dir
{ {
"sftp://user@host/dir/subdir", "sftp://user@host/dir/subdir",
Config{User: "user", Host: "host", Path: "dir/subdir"}, Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5},
}, },
{ {
"sftp://host/dir/subdir", "sftp://host/dir/subdir",
Config{Host: "host", Path: "dir/subdir"}, Config{Host: "host", Path: "dir/subdir", Connections: 5},
}, },
{ {
"sftp://host//dir/subdir", "sftp://host//dir/subdir",
Config{Host: "host", Path: "/dir/subdir"}, Config{Host: "host", Path: "/dir/subdir", Connections: 5},
}, },
{ {
"sftp://host:10022//dir/subdir", "sftp://host:10022//dir/subdir",
Config{Host: "host", Port: "10022", Path: "/dir/subdir"}, Config{Host: "host", Port: "10022", Path: "/dir/subdir", Connections: 5},
}, },
{ {
"sftp://user@host:10022//dir/subdir", "sftp://user@host:10022//dir/subdir",
Config{User: "user", Host: "host", Port: "10022", Path: "/dir/subdir"}, Config{User: "user", Host: "host", Port: "10022", Path: "/dir/subdir", Connections: 5},
}, },
{ {
"sftp://user@host/dir/subdir/../other", "sftp://user@host/dir/subdir/../other",
Config{User: "user", Host: "host", Path: "dir/other"}, Config{User: "user", Host: "host", Path: "dir/other", Connections: 5},
}, },
{ {
"sftp://user@host/dir///subdir", "sftp://user@host/dir///subdir",
Config{User: "user", Host: "host", Path: "dir/subdir"}, Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5},
}, },
// IPv6 address. // IPv6 address.
{ {
"sftp://user@[::1]/dir", "sftp://user@[::1]/dir",
Config{User: "user", Host: "::1", Path: "dir"}, Config{User: "user", Host: "::1", Path: "dir", Connections: 5},
}, },
// IPv6 address with port. // IPv6 address with port.
{ {
"sftp://user@[::1]:22/dir", "sftp://user@[::1]:22/dir",
Config{User: "user", Host: "::1", Port: "22", Path: "dir"}, Config{User: "user", Host: "::1", Port: "22", Path: "dir", Connections: 5},
}, },
// second form, user specified sftp:user@host:/dir // second form, user specified sftp:user@host:/dir
{ {
"sftp:user@host:/dir/subdir", "sftp:user@host:/dir/subdir",
Config{User: "user", Host: "host", Path: "/dir/subdir"}, Config{User: "user", Host: "host", Path: "/dir/subdir", Connections: 5},
}, },
{ {
"sftp:user@domain@host:/dir/subdir", "sftp:user@domain@host:/dir/subdir",
Config{User: "user@domain", Host: "host", Path: "/dir/subdir"}, Config{User: "user@domain", Host: "host", Path: "/dir/subdir", Connections: 5},
}, },
{ {
"sftp:host:../dir/subdir", "sftp:host:../dir/subdir",
Config{Host: "host", Path: "../dir/subdir"}, Config{Host: "host", Path: "../dir/subdir", Connections: 5},
}, },
{ {
"sftp:user@host:dir/subdir:suffix", "sftp:user@host:dir/subdir:suffix",
Config{User: "user", Host: "host", Path: "dir/subdir:suffix"}, Config{User: "user", Host: "host", Path: "dir/subdir:suffix", Connections: 5},
}, },
{ {
"sftp:user@host:dir/subdir/../other", "sftp:user@host:dir/subdir/../other",
Config{User: "user", Host: "host", Path: "dir/other"}, Config{User: "user", Host: "host", Path: "dir/other", Connections: 5},
}, },
{ {
"sftp:user@host:dir///subdir", "sftp:user@host:dir///subdir",
Config{User: "user", Host: "host", Path: "dir/subdir"}, Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5},
}, },
} }

View file

@ -43,9 +43,10 @@ func TestLayout(t *testing.T) {
repo := filepath.Join(path, "repo") repo := filepath.Join(path, "repo")
be, err := sftp.Open(context.TODO(), sftp.Config{ be, err := sftp.Open(context.TODO(), sftp.Config{
Command: fmt.Sprintf("%q -e", sftpServer), Command: fmt.Sprintf("%q -e", sftpServer),
Path: repo, Path: repo,
Layout: test.layout, Layout: test.layout,
Connections: 5,
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View file

@ -31,6 +31,7 @@ type SFTP struct {
cmd *exec.Cmd cmd *exec.Cmd
result <-chan error result <-chan error
sem *backend.Semaphore
backend.Layout backend.Layout
Config Config
} }
@ -116,6 +117,11 @@ func (r *SFTP) clientError() error {
func Open(ctx context.Context, cfg Config) (*SFTP, error) { func Open(ctx context.Context, cfg Config) (*SFTP, error) {
debug.Log("open backend with config %#v", cfg) debug.Log("open backend with config %#v", cfg)
sem, err := backend.NewSemaphore(cfg.Connections)
if err != nil {
return nil, err
}
cmd, args, err := buildSSHCommand(cfg) cmd, args, err := buildSSHCommand(cfg)
if err != nil { if err != nil {
return nil, err return nil, err
@ -136,6 +142,7 @@ func Open(ctx context.Context, cfg Config) (*SFTP, error) {
sftp.Config = cfg sftp.Config = cfg
sftp.p = cfg.Path sftp.p = cfg.Path
sftp.sem = sem
return sftp, nil return sftp, nil
} }
@ -238,6 +245,10 @@ func Create(ctx context.Context, cfg Config) (*SFTP, error) {
return Open(ctx, cfg) return Open(ctx, cfg)
} }
func (r *SFTP) Connections() uint {
return r.Config.Connections
}
// Location returns this backend's location (the directory name). // Location returns this backend's location (the directory name).
func (r *SFTP) Location() string { func (r *SFTP) Location() string {
return r.p return r.p
@ -280,6 +291,9 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader
tmpFilename := filename + "-restic-temp-" + tempSuffix() tmpFilename := filename + "-restic-temp-" + tempSuffix()
dirname := r.Dirname(h) dirname := r.Dirname(h)
r.sem.GetToken()
defer r.sem.ReleaseToken()
// create new file // create new file
f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY) f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY)
@ -371,6 +385,19 @@ func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int
return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn) return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn)
} }
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
type wrapReader struct {
io.ReadCloser
io.WriterTo
f func()
}
func (wr *wrapReader) Close() error {
err := wr.ReadCloser.Close()
wr.f()
return err
}
func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v", h, length, offset) debug.Log("Load %v, length %v, offset %v", h, length, offset)
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
@ -381,26 +408,38 @@ func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offs
return nil, errors.New("offset is negative") return nil, errors.New("offset is negative")
} }
r.sem.GetToken()
f, err := r.c.Open(r.Filename(h)) f, err := r.c.Open(r.Filename(h))
if err != nil { if err != nil {
r.sem.ReleaseToken()
return nil, err return nil, err
} }
if offset > 0 { if offset > 0 {
_, err = f.Seek(offset, 0) _, err = f.Seek(offset, 0)
if err != nil { if err != nil {
r.sem.ReleaseToken()
_ = f.Close() _ = f.Close()
return nil, err return nil, err
} }
} }
// use custom close wrapper to also provide WriteTo() on the wrapper
rd := &wrapReader{
ReadCloser: f,
WriterTo: f,
f: func() {
r.sem.ReleaseToken()
},
}
if length > 0 { if length > 0 {
// unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader // unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader
// limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go // limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go
return backend.LimitReadCloser(f, int64(length)), nil return backend.LimitReadCloser(rd, int64(length)), nil
} }
return f, nil return rd, nil
} }
// Stat returns information about a blob. // Stat returns information about a blob.
@ -414,6 +453,9 @@ func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, erro
return restic.FileInfo{}, backoff.Permanent(err) return restic.FileInfo{}, backoff.Permanent(err)
} }
r.sem.GetToken()
defer r.sem.ReleaseToken()
fi, err := r.c.Lstat(r.Filename(h)) fi, err := r.c.Lstat(r.Filename(h))
if err != nil { if err != nil {
return restic.FileInfo{}, errors.Wrap(err, "Lstat") return restic.FileInfo{}, errors.Wrap(err, "Lstat")
@ -429,6 +471,9 @@ func (r *SFTP) Test(ctx context.Context, h restic.Handle) (bool, error) {
return false, err return false, err
} }
r.sem.GetToken()
defer r.sem.ReleaseToken()
_, err := r.c.Lstat(r.Filename(h)) _, err := r.c.Lstat(r.Filename(h))
if os.IsNotExist(errors.Cause(err)) { if os.IsNotExist(errors.Cause(err)) {
return false, nil return false, nil
@ -448,6 +493,9 @@ func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error {
return err return err
} }
r.sem.GetToken()
defer r.sem.ReleaseToken()
return r.c.Remove(r.Filename(h)) return r.c.Remove(r.Filename(h))
} }
@ -458,7 +506,14 @@ func (r *SFTP) List(ctx context.Context, t restic.FileType, fn func(restic.FileI
basedir, subdirs := r.Basedir(t) basedir, subdirs := r.Basedir(t)
walker := r.c.Walk(basedir) walker := r.c.Walk(basedir)
for walker.Step() { for {
r.sem.GetToken()
ok := walker.Step()
r.sem.ReleaseToken()
if !ok {
break
}
if walker.Err() != nil { if walker.Err() != nil {
if r.IsNotExist(walker.Err()) { if r.IsNotExist(walker.Err()) {
debug.Log("ignoring non-existing directory") debug.Log("ignoring non-existing directory")

View file

@ -42,8 +42,9 @@ func newTestSuite(t testing.TB) *test.Suite {
t.Logf("create new backend at %v", dir) t.Logf("create new backend at %v", dir)
cfg := sftp.Config{ cfg := sftp.Config{
Path: dir, Path: dir,
Command: fmt.Sprintf("%q -e", sftpServer), Command: fmt.Sprintf("%q -e", sftpServer),
Connections: 5,
} }
return cfg, nil return cfg, nil
}, },