fshttp: add DSCP support with --dscp for QoS with differentiated services
This commit is contained in:
parent
dfc63eb8f1
commit
edfe183ba2
7 changed files with 240 additions and 92 deletions
|
@ -33,68 +33,6 @@ var (
|
|||
logMutex sync.Mutex
|
||||
)
|
||||
|
||||
// A net.Conn that sets a deadline for every Read or Write operation
|
||||
type timeoutConn struct {
|
||||
net.Conn
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// create a timeoutConn using the timeout
|
||||
func newTimeoutConn(conn net.Conn, timeout time.Duration) (c *timeoutConn, err error) {
|
||||
c = &timeoutConn{
|
||||
Conn: conn,
|
||||
timeout: timeout,
|
||||
}
|
||||
err = c.nudgeDeadline()
|
||||
return
|
||||
}
|
||||
|
||||
// Nudge the deadline for an idle timeout on by c.timeout if non-zero
|
||||
func (c *timeoutConn) nudgeDeadline() (err error) {
|
||||
if c.timeout == 0 {
|
||||
return nil
|
||||
}
|
||||
when := time.Now().Add(c.timeout)
|
||||
return c.Conn.SetDeadline(when)
|
||||
}
|
||||
|
||||
// Read bytes doing idle timeouts
|
||||
func (c *timeoutConn) Read(b []byte) (n int, err error) {
|
||||
// Ideally we would LimitBandwidth(len(b)) here and replace tokens we didn't use
|
||||
n, err = c.Conn.Read(b)
|
||||
accounting.TokenBucket.LimitBandwidth(accounting.TokenBucketSlotTransportRx, n)
|
||||
// Don't nudge if no bytes or an error
|
||||
if n == 0 || err != nil {
|
||||
return
|
||||
}
|
||||
// Nudge the deadline on successful Read or Write
|
||||
err = c.nudgeDeadline()
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write bytes doing idle timeouts
|
||||
func (c *timeoutConn) Write(b []byte) (n int, err error) {
|
||||
accounting.TokenBucket.LimitBandwidth(accounting.TokenBucketSlotTransportTx, len(b))
|
||||
n, err = c.Conn.Write(b)
|
||||
// Don't nudge if no bytes or an error
|
||||
if n == 0 || err != nil {
|
||||
return
|
||||
}
|
||||
// Nudge the deadline on successful Read or Write
|
||||
err = c.nudgeDeadline()
|
||||
return n, err
|
||||
}
|
||||
|
||||
// dial with context and timeouts
|
||||
func dialContextTimeout(ctx context.Context, network, address string, ci *fs.ConfigInfo) (net.Conn, error) {
|
||||
dialer := NewDialer(ctx)
|
||||
c, err := dialer.DialContext(ctx, network, address)
|
||||
if err != nil {
|
||||
return c, err
|
||||
}
|
||||
return newTimeoutConn(c, ci.Timeout)
|
||||
}
|
||||
|
||||
// ResetTransport resets the existing transport, allowing it to take new settings.
|
||||
// Should only be used for testing.
|
||||
func ResetTransport() {
|
||||
|
@ -150,7 +88,7 @@ func NewTransportCustom(ctx context.Context, customize func(*http.Transport)) ht
|
|||
|
||||
t.DisableCompression = ci.NoGzip
|
||||
t.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
return dialContextTimeout(ctx, network, addr, ci)
|
||||
return dialContext(ctx, network, addr, ci)
|
||||
}
|
||||
t.IdleConnTimeout = 60 * time.Second
|
||||
t.ExpectContinueTimeout = ci.ExpectContinueTimeout
|
||||
|
@ -346,17 +284,3 @@ func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error
|
|||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// NewDialer creates a net.Dialer structure with Timeout, Keepalive
|
||||
// and LocalAddr set from rclone flags.
|
||||
func NewDialer(ctx context.Context) *net.Dialer {
|
||||
ci := fs.GetConfig(ctx)
|
||||
dialer := &net.Dialer{
|
||||
Timeout: ci.ConnectTimeout,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}
|
||||
if ci.BindAddr != nil {
|
||||
dialer.LocalAddr = &net.TCPAddr{IP: ci.BindAddr}
|
||||
}
|
||||
return dialer
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue