rclone: Respect bandwith limits

This commit is contained in:
Alexander Neumann 2018-05-22 20:48:17 +02:00
parent 20bfed5985
commit bfd923e81e
5 changed files with 68 additions and 15 deletions

View file

@ -561,17 +561,18 @@ func open(s string, gopts GlobalOptions, opts options.Options) (restic.Backend,
} }
// wrap the transport so that the throughput via HTTP is limited // wrap the transport so that the throughput via HTTP is limited
rt = limiter.NewStaticLimiter(gopts.LimitUploadKb, gopts.LimitDownloadKb).Transport(rt) lim := limiter.NewStaticLimiter(gopts.LimitUploadKb, gopts.LimitDownloadKb)
rt = lim.Transport(rt)
switch loc.Scheme { switch loc.Scheme {
case "local": case "local":
be, err = local.Open(cfg.(local.Config)) be, err = local.Open(cfg.(local.Config))
// wrap the backend in a LimitBackend so that the throughput is limited // wrap the backend in a LimitBackend so that the throughput is limited
be = limiter.LimitBackend(be, limiter.NewStaticLimiter(gopts.LimitUploadKb, gopts.LimitDownloadKb)) be = limiter.LimitBackend(be, lim)
case "sftp": case "sftp":
be, err = sftp.Open(cfg.(sftp.Config)) be, err = sftp.Open(cfg.(sftp.Config))
// wrap the backend in a LimitBackend so that the throughput is limited // wrap the backend in a LimitBackend so that the throughput is limited
be = limiter.LimitBackend(be, limiter.NewStaticLimiter(gopts.LimitUploadKb, gopts.LimitDownloadKb)) be = limiter.LimitBackend(be, lim)
case "s3": case "s3":
be, err = s3.Open(cfg.(s3.Config), rt) be, err = s3.Open(cfg.(s3.Config), rt)
case "gs": case "gs":
@ -585,7 +586,7 @@ func open(s string, gopts GlobalOptions, opts options.Options) (restic.Backend,
case "rest": case "rest":
be, err = rest.Open(cfg.(rest.Config), rt) be, err = rest.Open(cfg.(rest.Config), rt)
case "rclone": case "rclone":
be, err = rclone.Open(cfg.(rclone.Config)) be, err = rclone.Open(cfg.(rclone.Config), lim)
default: default:
return nil, errors.Fatalf("invalid backend: %q", loc.Scheme) return nil, errors.Fatalf("invalid backend: %q", loc.Scheme)
@ -648,7 +649,7 @@ func create(s string, opts options.Options) (restic.Backend, error) {
case "rest": case "rest":
return rest.Create(cfg.(rest.Config), rt) return rest.Create(cfg.(rest.Config), rt)
case "rclone": case "rclone":
return rclone.Open(cfg.(rclone.Config)) return rclone.Open(cfg.(rclone.Config), nil)
} }
debug.Log("invalid repository scheme: %v", s) debug.Log("invalid repository scheme: %v", s)

View file

@ -5,6 +5,7 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"io"
"math/rand" "math/rand"
"net" "net"
"net/http" "net/http"
@ -18,6 +19,7 @@ import (
"github.com/restic/restic/internal/backend/rest" "github.com/restic/restic/internal/backend/rest"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/limiter"
"golang.org/x/net/context/ctxhttp" "golang.org/x/net/context/ctxhttp"
"golang.org/x/net/http2" "golang.org/x/net/http2"
) )
@ -81,8 +83,38 @@ func run(command string, args ...string) (*StdioConn, *exec.Cmd, *sync.WaitGroup
return c, cmd, &wg, bg, nil return c, cmd, &wg, bg, nil
} }
// wrappedConn adds bandwidth limiting capabilities to the StdioConn by
// wrapping the Read/Write methods.
type wrappedConn struct {
*StdioConn
io.Reader
io.Writer
}
func (c wrappedConn) Read(p []byte) (int, error) {
return c.Reader.Read(p)
}
func (c wrappedConn) Write(p []byte) (int, error) {
return c.Writer.Write(p)
}
func wrapConn(c *StdioConn, lim limiter.Limiter) wrappedConn {
wc := wrappedConn{
StdioConn: c,
Reader: c,
Writer: c,
}
if lim != nil {
wc.Reader = lim.Downstream(c)
wc.Writer = lim.UpstreamWriter(c)
}
return wc
}
// New initializes a Backend and starts the process. // New initializes a Backend and starts the process.
func New(cfg Config) (*Backend, error) { func New(cfg Config, lim limiter.Limiter) (*Backend, error) {
var ( var (
args []string args []string
err error err error
@ -118,11 +150,16 @@ func New(cfg Config) (*Backend, error) {
arg0, args := args[0], args[1:] arg0, args := args[0], args[1:]
debug.Log("running command: %v %v", arg0, args) debug.Log("running command: %v %v", arg0, args)
conn, cmd, wg, bg, err := run(arg0, args...) stdioConn, cmd, wg, bg, err := run(arg0, args...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var conn net.Conn = stdioConn
if lim != nil {
conn = wrapConn(stdioConn, lim)
}
dialCount := 0 dialCount := 0
tr := &http2.Transport{ tr := &http2.Transport{
AllowHTTP: true, // this is not really HTTP, just stdin/stdout AllowHTTP: true, // this is not really HTTP, just stdin/stdout
@ -141,7 +178,7 @@ func New(cfg Config) (*Backend, error) {
tr: tr, tr: tr,
cmd: cmd, cmd: cmd,
waitCh: waitCh, waitCh: waitCh,
conn: conn, conn: stdioConn,
wg: wg, wg: wg,
} }
@ -202,8 +239,8 @@ func New(cfg Config) (*Backend, error) {
} }
// Open starts an rclone process with the given config. // Open starts an rclone process with the given config.
func Open(cfg Config) (*Backend, error) { func Open(cfg Config, lim limiter.Limiter) (*Backend, error) {
be, err := New(cfg) be, err := New(cfg, lim)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -229,7 +266,7 @@ func Open(cfg Config) (*Backend, error) {
// Create initializes a new restic repo with clone. // Create initializes a new restic repo with clone.
func Create(cfg Config) (*Backend, error) { func Create(cfg Config) (*Backend, error) {
be, err := New(cfg) be, err := New(cfg, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -39,7 +39,7 @@ func newTestSuite(t testing.TB) *test.Suite {
Open: func(config interface{}) (restic.Backend, error) { Open: func(config interface{}) (restic.Backend, error) {
t.Logf("Open()") t.Logf("Open()")
cfg := config.(rclone.Config) cfg := config.(rclone.Config)
return rclone.Open(cfg) return rclone.Open(cfg, nil)
}, },
// CleanupFn removes data created during the tests. // CleanupFn removes data created during the tests.

View file

@ -12,6 +12,10 @@ type Limiter interface {
// uploads. // uploads.
Upstream(r io.Reader) io.Reader Upstream(r io.Reader) io.Reader
// UpstreamWriter returns a rate limited writer that is intended to be used
// in uploads.
UpstreamWriter(w io.Writer) io.Writer
// Downstream returns a rate limited reader that is intended to be used // Downstream returns a rate limited reader that is intended to be used
// for downloads. // for downloads.
Downstream(r io.Reader) io.Reader Downstream(r io.Reader) io.Reader

View file

@ -35,11 +35,15 @@ func NewStaticLimiter(uploadKb, downloadKb int) Limiter {
} }
func (l staticLimiter) Upstream(r io.Reader) io.Reader { func (l staticLimiter) Upstream(r io.Reader) io.Reader {
return l.limit(r, l.upstream) return l.limitReader(r, l.upstream)
}
func (l staticLimiter) UpstreamWriter(w io.Writer) io.Writer {
return l.limitWriter(w, l.upstream)
} }
func (l staticLimiter) Downstream(r io.Reader) io.Reader { func (l staticLimiter) Downstream(r io.Reader) io.Reader {
return l.limit(r, l.downstream) return l.limitReader(r, l.downstream)
} }
type roundTripper func(*http.Request) (*http.Response, error) type roundTripper func(*http.Request) (*http.Response, error)
@ -75,13 +79,20 @@ func (l staticLimiter) Transport(rt http.RoundTripper) http.RoundTripper {
}) })
} }
func (l staticLimiter) limit(r io.Reader, b *ratelimit.Bucket) io.Reader { func (l staticLimiter) limitReader(r io.Reader, b *ratelimit.Bucket) io.Reader {
if b == nil { if b == nil {
return r return r
} }
return ratelimit.Reader(r, b) return ratelimit.Reader(r, b)
} }
func (l staticLimiter) limitWriter(w io.Writer, b *ratelimit.Bucket) io.Writer {
if b == nil {
return w
}
return ratelimit.Writer(w, b)
}
func toByteRate(val int) float64 { func toByteRate(val int) float64 {
return float64(val) * 1024. return float64(val) * 1024.
} }