From edfe183ba2f95120062596c98f23fe91e615b6f7 Mon Sep 17 00:00:00 2001 From: Max Sum Date: Fri, 29 Jan 2021 11:41:50 +0800 Subject: [PATCH] fshttp: add DSCP support with --dscp for QoS with differentiated services --- backend/ftp/ftp.go | 55 +++++++++---- docs/content/docs.md | 15 ++++ docs/content/flags.md | 1 + fs/config.go | 1 + fs/config/configflags/configflags.go | 68 ++++++++++++++++ fs/fshttp/dialer.go | 114 +++++++++++++++++++++++++++ fs/fshttp/http.go | 78 +----------------- 7 files changed, 240 insertions(+), 92 deletions(-) create mode 100644 fs/fshttp/dialer.go diff --git a/backend/ftp/ftp.go b/backend/ftp/ftp.go index 387ef1f78..0fab0081c 100644 --- a/backend/ftp/ftp.go +++ b/backend/ftp/ftp.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "io" + "net" "net/textproto" "path" "runtime" @@ -20,6 +21,7 @@ import ( "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/config/obscure" + "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/env" @@ -135,6 +137,7 @@ type Fs struct { poolMu sync.Mutex pool []*ftp.ServerConn tokens *pacer.TokenDispenser + tlsConf *tls.Config } // Object describes an FTP file @@ -211,25 +214,36 @@ func (dl *debugLog) Write(p []byte) (n int, err error) { return len(p), nil } +type dialCtx struct { + f *Fs + ctx context.Context +} + +// dial a new connection with fshttp dialer +func (d *dialCtx) dial(network, address string) (net.Conn, error) { + conn, err := fshttp.NewDialer(d.ctx).Dial(network, address) + if err != nil { + return nil, err + } + if d.f.tlsConf != nil { + conn = tls.Client(conn, d.f.tlsConf) + } + return conn, err +} + // Open a new connection to the FTP server. func (f *Fs) ftpConnection(ctx context.Context) (*ftp.ServerConn, error) { fs.Debugf(f, "Connecting to FTP server") - ftpConfig := []ftp.DialOption{ftp.DialWithTimeout(f.ci.ConnectTimeout)} - if f.opt.TLS && f.opt.ExplicitTLS { - fs.Errorf(f, "Implicit TLS and explicit TLS are mutually incompatible. Please revise your config") - return nil, errors.New("Implicit TLS and explicit TLS are mutually incompatible. Please revise your config") - } else if f.opt.TLS { - tlsConfig := &tls.Config{ - ServerName: f.opt.Host, - InsecureSkipVerify: f.opt.SkipVerifyTLSCert, + dCtx := dialCtx{f, ctx} + ftpConfig := []ftp.DialOption{ftp.DialWithDialFunc(dCtx.dial)} + if f.opt.ExplicitTLS { + ftpConfig = append(ftpConfig, ftp.DialWithExplicitTLS(f.tlsConf)) + // Initial connection needs to be cleartext for explicit TLS + conn, err := fshttp.NewDialer(ctx).Dial("tcp", f.dialAddr) + if err != nil { + return nil, err } - ftpConfig = append(ftpConfig, ftp.DialWithTLS(tlsConfig)) - } else if f.opt.ExplicitTLS { - tlsConfig := &tls.Config{ - ServerName: f.opt.Host, - InsecureSkipVerify: f.opt.SkipVerifyTLSCert, - } - ftpConfig = append(ftpConfig, ftp.DialWithExplicitTLS(tlsConfig)) + ftpConfig = append(ftpConfig, ftp.DialWithNetConn(conn)) } if f.opt.DisableEPSV { ftpConfig = append(ftpConfig, ftp.DialWithDisabledEPSV(true)) @@ -338,6 +352,16 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (ff fs.Fs if opt.TLS { protocol = "ftps://" } + if opt.TLS && opt.ExplicitTLS { + return nil, errors.New("Implicit TLS and explicit TLS are mutually incompatible. Please revise your config") + } + var tlsConfig *tls.Config + if opt.TLS || opt.ExplicitTLS { + tlsConfig = &tls.Config{ + ServerName: opt.Host, + InsecureSkipVerify: opt.SkipVerifyTLSCert, + } + } u := protocol + path.Join(dialAddr+"/", root) ci := fs.GetConfig(ctx) f := &Fs{ @@ -350,6 +374,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (ff fs.Fs pass: pass, dialAddr: dialAddr, tokens: pacer.NewTokenDispenser(opt.Concurrency), + tlsConf: tlsConfig, } f.features = (&fs.Features{ CanHaveEmptyDirectories: true, diff --git a/docs/content/docs.md b/docs/content/docs.md index 93311ca85..73779d6be 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -600,6 +600,21 @@ This flag can be useful for debugging and in exceptional circumstances (e.g. Google Drive limiting the total volume of Server Side Copies to 100GB/day). +### --dscp VALUE ### + +Specify a DSCP value or name to use in connections. This could help QoS +system to identify traffic class. BE, EF, DF, LE, CSx and AFxx are allowed. + +See the description of [differentiated services](https://en.wikipedia.org/wiki/Differentiated_services) to get an idea of +this field. Setting this to 1 (LE) to identify the flow to SCAVENGER class +can avoid occupying too much bandwidth in a network with DiffServ support ([RFC 8622](https://tools.ietf.org/html/rfc8622)). + +For example, if you configured QoS on router to handle LE properly. Running: +``` +rclone copy --dscp LE from:/from to:/to +``` +would make the priority lower than usual internet flows. + ### -n, --dry-run ### Do a trial run with no permanent changes. Use this to see what rclone diff --git a/docs/content/flags.md b/docs/content/flags.md index 5dafa07a5..b1caae010 100755 --- a/docs/content/flags.md +++ b/docs/content/flags.md @@ -42,6 +42,7 @@ These flags are available for every command. --dump DumpFlags List of items to dump from: headers,bodies,requests,responses,auth,filters,goroutines,openfiles --dump-bodies Dump HTTP headers and bodies - may contain sensitive info --dump-headers Dump HTTP headers - may contain sensitive info + --dscp DSCP Name or Value (default 0) --error-on-no-transfer Sets exit code 9 if no files are transferred, useful in scripts --exclude stringArray Exclude files matching pattern --exclude-from stringArray Read exclude patterns from file (use - to read from stdin) diff --git a/fs/config.go b/fs/config.go index f913de579..8481bd4e0 100644 --- a/fs/config.go +++ b/fs/config.go @@ -122,6 +122,7 @@ type ConfigInfo struct { Headers []*HTTPOption RefreshTimes bool NoConsole bool + TrafficClass uint8 } // NewConfig creates a new config with everything set to the default diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index 2cdbbe3d7..0a4f177c7 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -7,6 +7,7 @@ import ( "log" "net" "path/filepath" + "strconv" "strings" "github.com/rclone/rclone/fs" @@ -29,6 +30,7 @@ var ( deleteAfter bool bindAddr string disableFeatures string + dscp string uploadHeaders []string downloadHeaders []string headers []string @@ -125,6 +127,7 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) { flags.StringArrayVarP(flagSet, &headers, "header", "", nil, "Set HTTP header for all transactions") flags.BoolVarP(flagSet, &ci.RefreshTimes, "refresh-times", "", ci.RefreshTimes, "Refresh the modtime of remote files.") flags.BoolVarP(flagSet, &ci.NoConsole, "no-console", "", ci.NoConsole, "Hide console window. Supported on Windows only.") + flags.StringVarP(flagSet, &dscp, "dscp", "", "", "Set DSCP value to connections. Can be value or names, eg. CS1, LE, DF, AF21.") } // ParseHeaders converts the strings passed in via the header flags into HTTPOptions @@ -254,6 +257,13 @@ func SetFlags(ci *fs.ConfigInfo) { if len(headers) != 0 { ci.Headers = ParseHeaders(headers) } + if len(dscp) != 0 { + if value, ok := parseDSCP(dscp); ok { + ci.TrafficClass = value << 2 + } else { + log.Fatalf("--dscp: Invalid DSCP name: %v", dscp) + } + } // Make the config file absolute configPath, err := filepath.Abs(config.ConfigPath) @@ -266,3 +276,61 @@ func SetFlags(ci *fs.ConfigInfo) { ci.MultiThreadSet = multiThreadStreamsFlag != nil && multiThreadStreamsFlag.Changed } + +// parseHeaders converts DSCP names to value +func parseDSCP(dscp string) (uint8, bool) { + if s, err := strconv.ParseUint(dscp, 10, 6); err == nil { + return uint8(s), true + } + dscp = strings.ToUpper(dscp) + switch dscp { + case "BE": + fallthrough + case "DF": + fallthrough + case "CS0": + return 0x00, true + case "CS1": + return 0x08, true + case "AF11": + return 0x0A, true + case "AF12": + return 0x0C, true + case "AF13": + return 0x0E, true + case "CS2": + return 0x10, true + case "AF21": + return 0x12, true + case "AF22": + return 0x14, true + case "AF23": + return 0x16, true + case "CS3": + return 0x18, true + case "AF31": + return 0x1A, true + case "AF32": + return 0x1C, true + case "AF33": + return 0x1E, true + case "CS4": + return 0x20, true + case "AF41": + return 0x22, true + case "AF42": + return 0x24, true + case "AF43": + return 0x26, true + case "CS5": + return 0x28, true + case "EF": + return 0x2E, true + case "CS6": + return 0x30, true + case "LE": + return 0x01, true + default: + return 0, false + } +} diff --git a/fs/fshttp/dialer.go b/fs/fshttp/dialer.go new file mode 100644 index 000000000..b15ce4847 --- /dev/null +++ b/fs/fshttp/dialer.go @@ -0,0 +1,114 @@ +package fshttp + +import ( + "context" + "net" + "time" + + "github.com/rclone/rclone/fs" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +func dialContext(ctx context.Context, network, address string, ci *fs.ConfigInfo) (net.Conn, error) { + return NewDialer(ctx).DialContext(ctx, network, address) +} + +// Dialer structure contains default dialer and timeout, tclass support +type Dialer struct { + net.Dialer + timeout time.Duration + tclass int +} + +// NewDialer creates a Dialer structure with Timeout, Keepalive, +// LocalAddr and DSCP set from rclone flags. +func NewDialer(ctx context.Context) *Dialer { + ci := fs.GetConfig(ctx) + dialer := &Dialer{ + Dialer: net.Dialer{ + Timeout: ci.ConnectTimeout, + KeepAlive: 30 * time.Second, + }, + timeout: ci.Timeout, + tclass: int(ci.TrafficClass), + } + if ci.BindAddr != nil { + dialer.Dialer.LocalAddr = &net.TCPAddr{IP: ci.BindAddr} + } + return dialer +} + +// Dial connects to the address on the named network. +func (d *Dialer) Dial(network, address string) (net.Conn, error) { + return d.DialContext(context.Background(), network, address) +} + +// DialContext connects to the address on the named network using +// the provided context. +func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + c, err := d.Dialer.DialContext(ctx, network, address) + if err != nil { + return c, err + } + if d.tclass != 0 { + if addr, ok := c.RemoteAddr().(*net.IPAddr); ok { + if addr.IP.To16() != nil && addr.IP.To4() == nil { + err = ipv6.NewConn(c).SetTrafficClass(d.tclass) + } else { + err = ipv4.NewConn(c).SetTOS(d.tclass) + } + if err != nil { + return c, err + } + } + } + return newTimeoutConn(c, d.timeout) +} + +// A net.Conn that sets a deadline for every Read or Write operation +type timeoutConn struct { + net.Conn + timeout time.Duration +} + +// create a timeoutConn using the timeout +func newTimeoutConn(conn net.Conn, timeout time.Duration) (c *timeoutConn, err error) { + c = &timeoutConn{ + Conn: conn, + timeout: timeout, + } + err = c.nudgeDeadline() + return +} + +// Nudge the deadline for an idle timeout on by c.timeout if non-zero +func (c *timeoutConn) nudgeDeadline() (err error) { + if c.timeout == 0 { + return nil + } + when := time.Now().Add(c.timeout) + return c.Conn.SetDeadline(when) +} + +// readOrWrite bytes doing idle timeouts +func (c *timeoutConn) readOrWrite(f func([]byte) (int, error), b []byte) (n int, err error) { + n, err = f(b) + // Don't nudge if no bytes or an error + if n == 0 || err != nil { + return + } + // Nudge the deadline on successful Read or Write + err = c.nudgeDeadline() + return +} + +// Read bytes doing idle timeouts +func (c *timeoutConn) Read(b []byte) (n int, err error) { + return c.readOrWrite(c.Conn.Read, b) +} + +// Write bytes doing idle timeouts +func (c *timeoutConn) Write(b []byte) (n int, err error) { + return c.readOrWrite(c.Conn.Write, b) +} diff --git a/fs/fshttp/http.go b/fs/fshttp/http.go index be3029fb3..fb8e6945d 100644 --- a/fs/fshttp/http.go +++ b/fs/fshttp/http.go @@ -33,68 +33,6 @@ var ( logMutex sync.Mutex ) -// A net.Conn that sets a deadline for every Read or Write operation -type timeoutConn struct { - net.Conn - timeout time.Duration -} - -// create a timeoutConn using the timeout -func newTimeoutConn(conn net.Conn, timeout time.Duration) (c *timeoutConn, err error) { - c = &timeoutConn{ - Conn: conn, - timeout: timeout, - } - err = c.nudgeDeadline() - return -} - -// Nudge the deadline for an idle timeout on by c.timeout if non-zero -func (c *timeoutConn) nudgeDeadline() (err error) { - if c.timeout == 0 { - return nil - } - when := time.Now().Add(c.timeout) - return c.Conn.SetDeadline(when) -} - -// Read bytes doing idle timeouts -func (c *timeoutConn) Read(b []byte) (n int, err error) { - // Ideally we would LimitBandwidth(len(b)) here and replace tokens we didn't use - n, err = c.Conn.Read(b) - accounting.TokenBucket.LimitBandwidth(accounting.TokenBucketSlotTransportRx, n) - // Don't nudge if no bytes or an error - if n == 0 || err != nil { - return - } - // Nudge the deadline on successful Read or Write - err = c.nudgeDeadline() - return n, err -} - -// Write bytes doing idle timeouts -func (c *timeoutConn) Write(b []byte) (n int, err error) { - accounting.TokenBucket.LimitBandwidth(accounting.TokenBucketSlotTransportTx, len(b)) - n, err = c.Conn.Write(b) - // Don't nudge if no bytes or an error - if n == 0 || err != nil { - return - } - // Nudge the deadline on successful Read or Write - err = c.nudgeDeadline() - return n, err -} - -// dial with context and timeouts -func dialContextTimeout(ctx context.Context, network, address string, ci *fs.ConfigInfo) (net.Conn, error) { - dialer := NewDialer(ctx) - c, err := dialer.DialContext(ctx, network, address) - if err != nil { - return c, err - } - return newTimeoutConn(c, ci.Timeout) -} - // ResetTransport resets the existing transport, allowing it to take new settings. // Should only be used for testing. func ResetTransport() { @@ -150,7 +88,7 @@ func NewTransportCustom(ctx context.Context, customize func(*http.Transport)) ht t.DisableCompression = ci.NoGzip t.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { - return dialContextTimeout(ctx, network, addr, ci) + return dialContext(ctx, network, addr, ci) } t.IdleConnTimeout = 60 * time.Second t.ExpectContinueTimeout = ci.ExpectContinueTimeout @@ -346,17 +284,3 @@ func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error } return resp, err } - -// NewDialer creates a net.Dialer structure with Timeout, Keepalive -// and LocalAddr set from rclone flags. -func NewDialer(ctx context.Context) *net.Dialer { - ci := fs.GetConfig(ctx) - dialer := &net.Dialer{ - Timeout: ci.ConnectTimeout, - KeepAlive: 30 * time.Second, - } - if ci.BindAddr != nil { - dialer.LocalAddr = &net.TCPAddr{IP: ci.BindAddr} - } - return dialer -}