fshttp: add DSCP support with --dscp for QoS with differentiated services

This commit is contained in:
Max Sum 2021-01-29 11:41:50 +08:00 committed by Nick Craig-Wood
parent dfc63eb8f1
commit edfe183ba2
7 changed files with 240 additions and 92 deletions

View file

@ -5,6 +5,7 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"io" "io"
"net"
"net/textproto" "net/textproto"
"path" "path"
"runtime" "runtime"
@ -20,6 +21,7 @@ import (
"github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/config/obscure" "github.com/rclone/rclone/fs/config/obscure"
"github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/env" "github.com/rclone/rclone/lib/env"
@ -135,6 +137,7 @@ type Fs struct {
poolMu sync.Mutex poolMu sync.Mutex
pool []*ftp.ServerConn pool []*ftp.ServerConn
tokens *pacer.TokenDispenser tokens *pacer.TokenDispenser
tlsConf *tls.Config
} }
// Object describes an FTP file // Object describes an FTP file
@ -211,25 +214,36 @@ func (dl *debugLog) Write(p []byte) (n int, err error) {
return len(p), nil return len(p), nil
} }
type dialCtx struct {
f *Fs
ctx context.Context
}
// dial a new connection with fshttp dialer
func (d *dialCtx) dial(network, address string) (net.Conn, error) {
conn, err := fshttp.NewDialer(d.ctx).Dial(network, address)
if err != nil {
return nil, err
}
if d.f.tlsConf != nil {
conn = tls.Client(conn, d.f.tlsConf)
}
return conn, err
}
// Open a new connection to the FTP server. // Open a new connection to the FTP server.
func (f *Fs) ftpConnection(ctx context.Context) (*ftp.ServerConn, error) { func (f *Fs) ftpConnection(ctx context.Context) (*ftp.ServerConn, error) {
fs.Debugf(f, "Connecting to FTP server") fs.Debugf(f, "Connecting to FTP server")
ftpConfig := []ftp.DialOption{ftp.DialWithTimeout(f.ci.ConnectTimeout)} dCtx := dialCtx{f, ctx}
if f.opt.TLS && f.opt.ExplicitTLS { ftpConfig := []ftp.DialOption{ftp.DialWithDialFunc(dCtx.dial)}
fs.Errorf(f, "Implicit TLS and explicit TLS are mutually incompatible. Please revise your config") if f.opt.ExplicitTLS {
return nil, errors.New("Implicit TLS and explicit TLS are mutually incompatible. Please revise your config") ftpConfig = append(ftpConfig, ftp.DialWithExplicitTLS(f.tlsConf))
} else if f.opt.TLS { // Initial connection needs to be cleartext for explicit TLS
tlsConfig := &tls.Config{ conn, err := fshttp.NewDialer(ctx).Dial("tcp", f.dialAddr)
ServerName: f.opt.Host, if err != nil {
InsecureSkipVerify: f.opt.SkipVerifyTLSCert, return nil, err
} }
ftpConfig = append(ftpConfig, ftp.DialWithTLS(tlsConfig)) ftpConfig = append(ftpConfig, ftp.DialWithNetConn(conn))
} else if f.opt.ExplicitTLS {
tlsConfig := &tls.Config{
ServerName: f.opt.Host,
InsecureSkipVerify: f.opt.SkipVerifyTLSCert,
}
ftpConfig = append(ftpConfig, ftp.DialWithExplicitTLS(tlsConfig))
} }
if f.opt.DisableEPSV { if f.opt.DisableEPSV {
ftpConfig = append(ftpConfig, ftp.DialWithDisabledEPSV(true)) ftpConfig = append(ftpConfig, ftp.DialWithDisabledEPSV(true))
@ -338,6 +352,16 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (ff fs.Fs
if opt.TLS { if opt.TLS {
protocol = "ftps://" protocol = "ftps://"
} }
if opt.TLS && opt.ExplicitTLS {
return nil, errors.New("Implicit TLS and explicit TLS are mutually incompatible. Please revise your config")
}
var tlsConfig *tls.Config
if opt.TLS || opt.ExplicitTLS {
tlsConfig = &tls.Config{
ServerName: opt.Host,
InsecureSkipVerify: opt.SkipVerifyTLSCert,
}
}
u := protocol + path.Join(dialAddr+"/", root) u := protocol + path.Join(dialAddr+"/", root)
ci := fs.GetConfig(ctx) ci := fs.GetConfig(ctx)
f := &Fs{ f := &Fs{
@ -350,6 +374,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (ff fs.Fs
pass: pass, pass: pass,
dialAddr: dialAddr, dialAddr: dialAddr,
tokens: pacer.NewTokenDispenser(opt.Concurrency), tokens: pacer.NewTokenDispenser(opt.Concurrency),
tlsConf: tlsConfig,
} }
f.features = (&fs.Features{ f.features = (&fs.Features{
CanHaveEmptyDirectories: true, CanHaveEmptyDirectories: true,

View file

@ -600,6 +600,21 @@ This flag can be useful for debugging and in exceptional circumstances
(e.g. Google Drive limiting the total volume of Server Side Copies to (e.g. Google Drive limiting the total volume of Server Side Copies to
100GB/day). 100GB/day).
### --dscp VALUE ###
Specify a DSCP value or name to use in connections. This could help QoS
system to identify traffic class. BE, EF, DF, LE, CSx and AFxx are allowed.
See the description of [differentiated services](https://en.wikipedia.org/wiki/Differentiated_services) to get an idea of
this field. Setting this to 1 (LE) to identify the flow to SCAVENGER class
can avoid occupying too much bandwidth in a network with DiffServ support ([RFC 8622](https://tools.ietf.org/html/rfc8622)).
For example, if you configured QoS on router to handle LE properly. Running:
```
rclone copy --dscp LE from:/from to:/to
```
would make the priority lower than usual internet flows.
### -n, --dry-run ### ### -n, --dry-run ###
Do a trial run with no permanent changes. Use this to see what rclone Do a trial run with no permanent changes. Use this to see what rclone

View file

@ -42,6 +42,7 @@ These flags are available for every command.
--dump DumpFlags List of items to dump from: headers,bodies,requests,responses,auth,filters,goroutines,openfiles --dump DumpFlags List of items to dump from: headers,bodies,requests,responses,auth,filters,goroutines,openfiles
--dump-bodies Dump HTTP headers and bodies - may contain sensitive info --dump-bodies Dump HTTP headers and bodies - may contain sensitive info
--dump-headers Dump HTTP headers - may contain sensitive info --dump-headers Dump HTTP headers - may contain sensitive info
--dscp DSCP Name or Value (default 0)
--error-on-no-transfer Sets exit code 9 if no files are transferred, useful in scripts --error-on-no-transfer Sets exit code 9 if no files are transferred, useful in scripts
--exclude stringArray Exclude files matching pattern --exclude stringArray Exclude files matching pattern
--exclude-from stringArray Read exclude patterns from file (use - to read from stdin) --exclude-from stringArray Read exclude patterns from file (use - to read from stdin)

View file

@ -122,6 +122,7 @@ type ConfigInfo struct {
Headers []*HTTPOption Headers []*HTTPOption
RefreshTimes bool RefreshTimes bool
NoConsole bool NoConsole bool
TrafficClass uint8
} }
// NewConfig creates a new config with everything set to the default // NewConfig creates a new config with everything set to the default

View file

@ -7,6 +7,7 @@ import (
"log" "log"
"net" "net"
"path/filepath" "path/filepath"
"strconv"
"strings" "strings"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
@ -29,6 +30,7 @@ var (
deleteAfter bool deleteAfter bool
bindAddr string bindAddr string
disableFeatures string disableFeatures string
dscp string
uploadHeaders []string uploadHeaders []string
downloadHeaders []string downloadHeaders []string
headers []string headers []string
@ -125,6 +127,7 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) {
flags.StringArrayVarP(flagSet, &headers, "header", "", nil, "Set HTTP header for all transactions") flags.StringArrayVarP(flagSet, &headers, "header", "", nil, "Set HTTP header for all transactions")
flags.BoolVarP(flagSet, &ci.RefreshTimes, "refresh-times", "", ci.RefreshTimes, "Refresh the modtime of remote files.") flags.BoolVarP(flagSet, &ci.RefreshTimes, "refresh-times", "", ci.RefreshTimes, "Refresh the modtime of remote files.")
flags.BoolVarP(flagSet, &ci.NoConsole, "no-console", "", ci.NoConsole, "Hide console window. Supported on Windows only.") flags.BoolVarP(flagSet, &ci.NoConsole, "no-console", "", ci.NoConsole, "Hide console window. Supported on Windows only.")
flags.StringVarP(flagSet, &dscp, "dscp", "", "", "Set DSCP value to connections. Can be value or names, eg. CS1, LE, DF, AF21.")
} }
// ParseHeaders converts the strings passed in via the header flags into HTTPOptions // ParseHeaders converts the strings passed in via the header flags into HTTPOptions
@ -254,6 +257,13 @@ func SetFlags(ci *fs.ConfigInfo) {
if len(headers) != 0 { if len(headers) != 0 {
ci.Headers = ParseHeaders(headers) ci.Headers = ParseHeaders(headers)
} }
if len(dscp) != 0 {
if value, ok := parseDSCP(dscp); ok {
ci.TrafficClass = value << 2
} else {
log.Fatalf("--dscp: Invalid DSCP name: %v", dscp)
}
}
// Make the config file absolute // Make the config file absolute
configPath, err := filepath.Abs(config.ConfigPath) configPath, err := filepath.Abs(config.ConfigPath)
@ -266,3 +276,61 @@ func SetFlags(ci *fs.ConfigInfo) {
ci.MultiThreadSet = multiThreadStreamsFlag != nil && multiThreadStreamsFlag.Changed ci.MultiThreadSet = multiThreadStreamsFlag != nil && multiThreadStreamsFlag.Changed
} }
// parseHeaders converts DSCP names to value
func parseDSCP(dscp string) (uint8, bool) {
if s, err := strconv.ParseUint(dscp, 10, 6); err == nil {
return uint8(s), true
}
dscp = strings.ToUpper(dscp)
switch dscp {
case "BE":
fallthrough
case "DF":
fallthrough
case "CS0":
return 0x00, true
case "CS1":
return 0x08, true
case "AF11":
return 0x0A, true
case "AF12":
return 0x0C, true
case "AF13":
return 0x0E, true
case "CS2":
return 0x10, true
case "AF21":
return 0x12, true
case "AF22":
return 0x14, true
case "AF23":
return 0x16, true
case "CS3":
return 0x18, true
case "AF31":
return 0x1A, true
case "AF32":
return 0x1C, true
case "AF33":
return 0x1E, true
case "CS4":
return 0x20, true
case "AF41":
return 0x22, true
case "AF42":
return 0x24, true
case "AF43":
return 0x26, true
case "CS5":
return 0x28, true
case "EF":
return 0x2E, true
case "CS6":
return 0x30, true
case "LE":
return 0x01, true
default:
return 0, false
}
}

114
fs/fshttp/dialer.go Normal file
View file

@ -0,0 +1,114 @@
package fshttp
import (
"context"
"net"
"time"
"github.com/rclone/rclone/fs"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)
func dialContext(ctx context.Context, network, address string, ci *fs.ConfigInfo) (net.Conn, error) {
return NewDialer(ctx).DialContext(ctx, network, address)
}
// Dialer structure contains default dialer and timeout, tclass support
type Dialer struct {
net.Dialer
timeout time.Duration
tclass int
}
// NewDialer creates a Dialer structure with Timeout, Keepalive,
// LocalAddr and DSCP set from rclone flags.
func NewDialer(ctx context.Context) *Dialer {
ci := fs.GetConfig(ctx)
dialer := &Dialer{
Dialer: net.Dialer{
Timeout: ci.ConnectTimeout,
KeepAlive: 30 * time.Second,
},
timeout: ci.Timeout,
tclass: int(ci.TrafficClass),
}
if ci.BindAddr != nil {
dialer.Dialer.LocalAddr = &net.TCPAddr{IP: ci.BindAddr}
}
return dialer
}
// Dial connects to the address on the named network.
func (d *Dialer) Dial(network, address string) (net.Conn, error) {
return d.DialContext(context.Background(), network, address)
}
// DialContext connects to the address on the named network using
// the provided context.
func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
c, err := d.Dialer.DialContext(ctx, network, address)
if err != nil {
return c, err
}
if d.tclass != 0 {
if addr, ok := c.RemoteAddr().(*net.IPAddr); ok {
if addr.IP.To16() != nil && addr.IP.To4() == nil {
err = ipv6.NewConn(c).SetTrafficClass(d.tclass)
} else {
err = ipv4.NewConn(c).SetTOS(d.tclass)
}
if err != nil {
return c, err
}
}
}
return newTimeoutConn(c, d.timeout)
}
// A net.Conn that sets a deadline for every Read or Write operation
type timeoutConn struct {
net.Conn
timeout time.Duration
}
// create a timeoutConn using the timeout
func newTimeoutConn(conn net.Conn, timeout time.Duration) (c *timeoutConn, err error) {
c = &timeoutConn{
Conn: conn,
timeout: timeout,
}
err = c.nudgeDeadline()
return
}
// Nudge the deadline for an idle timeout on by c.timeout if non-zero
func (c *timeoutConn) nudgeDeadline() (err error) {
if c.timeout == 0 {
return nil
}
when := time.Now().Add(c.timeout)
return c.Conn.SetDeadline(when)
}
// readOrWrite bytes doing idle timeouts
func (c *timeoutConn) readOrWrite(f func([]byte) (int, error), b []byte) (n int, err error) {
n, err = f(b)
// Don't nudge if no bytes or an error
if n == 0 || err != nil {
return
}
// Nudge the deadline on successful Read or Write
err = c.nudgeDeadline()
return
}
// Read bytes doing idle timeouts
func (c *timeoutConn) Read(b []byte) (n int, err error) {
return c.readOrWrite(c.Conn.Read, b)
}
// Write bytes doing idle timeouts
func (c *timeoutConn) Write(b []byte) (n int, err error) {
return c.readOrWrite(c.Conn.Write, b)
}

View file

@ -33,68 +33,6 @@ var (
logMutex sync.Mutex logMutex sync.Mutex
) )
// A net.Conn that sets a deadline for every Read or Write operation
type timeoutConn struct {
net.Conn
timeout time.Duration
}
// create a timeoutConn using the timeout
func newTimeoutConn(conn net.Conn, timeout time.Duration) (c *timeoutConn, err error) {
c = &timeoutConn{
Conn: conn,
timeout: timeout,
}
err = c.nudgeDeadline()
return
}
// Nudge the deadline for an idle timeout on by c.timeout if non-zero
func (c *timeoutConn) nudgeDeadline() (err error) {
if c.timeout == 0 {
return nil
}
when := time.Now().Add(c.timeout)
return c.Conn.SetDeadline(when)
}
// Read bytes doing idle timeouts
func (c *timeoutConn) Read(b []byte) (n int, err error) {
// Ideally we would LimitBandwidth(len(b)) here and replace tokens we didn't use
n, err = c.Conn.Read(b)
accounting.TokenBucket.LimitBandwidth(accounting.TokenBucketSlotTransportRx, n)
// Don't nudge if no bytes or an error
if n == 0 || err != nil {
return
}
// Nudge the deadline on successful Read or Write
err = c.nudgeDeadline()
return n, err
}
// Write bytes doing idle timeouts
func (c *timeoutConn) Write(b []byte) (n int, err error) {
accounting.TokenBucket.LimitBandwidth(accounting.TokenBucketSlotTransportTx, len(b))
n, err = c.Conn.Write(b)
// Don't nudge if no bytes or an error
if n == 0 || err != nil {
return
}
// Nudge the deadline on successful Read or Write
err = c.nudgeDeadline()
return n, err
}
// dial with context and timeouts
func dialContextTimeout(ctx context.Context, network, address string, ci *fs.ConfigInfo) (net.Conn, error) {
dialer := NewDialer(ctx)
c, err := dialer.DialContext(ctx, network, address)
if err != nil {
return c, err
}
return newTimeoutConn(c, ci.Timeout)
}
// ResetTransport resets the existing transport, allowing it to take new settings. // ResetTransport resets the existing transport, allowing it to take new settings.
// Should only be used for testing. // Should only be used for testing.
func ResetTransport() { func ResetTransport() {
@ -150,7 +88,7 @@ func NewTransportCustom(ctx context.Context, customize func(*http.Transport)) ht
t.DisableCompression = ci.NoGzip t.DisableCompression = ci.NoGzip
t.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { t.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
return dialContextTimeout(ctx, network, addr, ci) return dialContext(ctx, network, addr, ci)
} }
t.IdleConnTimeout = 60 * time.Second t.IdleConnTimeout = 60 * time.Second
t.ExpectContinueTimeout = ci.ExpectContinueTimeout t.ExpectContinueTimeout = ci.ExpectContinueTimeout
@ -346,17 +284,3 @@ func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error
} }
return resp, err return resp, err
} }
// NewDialer creates a net.Dialer structure with Timeout, Keepalive
// and LocalAddr set from rclone flags.
func NewDialer(ctx context.Context) *net.Dialer {
ci := fs.GetConfig(ctx)
dialer := &net.Dialer{
Timeout: ci.ConnectTimeout,
KeepAlive: 30 * time.Second,
}
if ci.BindAddr != nil {
dialer.LocalAddr = &net.TCPAddr{IP: ci.BindAddr}
}
return dialer
}