accounting: add context to Account and propagate changes #3257

This is preparation for getting the Accounting to check the context,
buf first we need to get it in place. Since this is one of those
changes that makes lots of noise, this is in a seperate commit.
This commit is contained in:
Nick Craig-Wood 2020-06-04 15:09:03 +01:00
parent 0bab9903ee
commit 421585dd72
9 changed files with 34 additions and 29 deletions

View file

@ -79,7 +79,7 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) {
defer func() {
tr.Done(err)
}()
in := tr.Account(file) // account the transfer (no buffering)
in := tr.Account(r.Context(), file) // account the transfer (no buffering)
w.WriteHeader(code)

View file

@ -36,6 +36,7 @@ type Account struct {
// shouldn't.
mu sync.Mutex // mutex protects these values
in io.Reader
ctx context.Context // current context for transfer - may change
origIn io.ReadCloser
close io.Closer
size int64
@ -64,10 +65,11 @@ const averagePeriod = 16 // period to do exponentially weighted averages over
// newAccountSizeName makes an Account reader for an io.ReadCloser of
// the given size and name
func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name string) *Account {
func newAccountSizeName(ctx context.Context, stats *StatsInfo, in io.ReadCloser, size int64, name string) *Account {
acc := &Account{
stats: stats,
in: in,
ctx: ctx,
close: in,
origIn: in,
size: size,
@ -160,7 +162,7 @@ func (acc *Account) Abandon() {
// UpdateReader updates the underlying io.ReadCloser stopping the
// async buffer (if any) and re-adding it
func (acc *Account) UpdateReader(in io.ReadCloser) {
func (acc *Account) UpdateReader(ctx context.Context, in io.ReadCloser) {
acc.mu.Lock()
withBuf := acc.withBuf
if withBuf {
@ -168,6 +170,7 @@ func (acc *Account) UpdateReader(in io.ReadCloser) {
acc.withBuf = false
}
acc.in = in
acc.ctx = ctx
acc.close = in
acc.origIn = in
acc.closed = false

View file

@ -2,6 +2,7 @@ package accounting
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
@ -29,7 +30,7 @@ var (
func TestNewAccountSizeName(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
stats := NewStats()
acc := newAccountSizeName(stats, in, 1, "test")
acc := newAccountSizeName(context.Background(), stats, in, 1, "test")
assert.Equal(t, in, acc.in)
assert.Equal(t, acc, stats.inProgress.get("test"))
err := acc.Close()
@ -44,7 +45,7 @@ func TestAccountWithBuffer(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
stats := NewStats()
acc := newAccountSizeName(stats, in, -1, "test")
acc := newAccountSizeName(context.Background(), stats, in, -1, "test")
assert.False(t, acc.HasBuffer())
acc.WithBuffer()
assert.True(t, acc.HasBuffer())
@ -53,7 +54,7 @@ func TestAccountWithBuffer(t *testing.T) {
require.True(t, ok)
assert.NoError(t, acc.Close())
acc = newAccountSizeName(stats, in, 1, "test")
acc = newAccountSizeName(context.Background(), stats, in, 1, "test")
acc.WithBuffer()
// should not have a buffer for a small size
_, ok = acc.in.(*asyncreader.AsyncReader)
@ -66,7 +67,7 @@ func TestAccountGetUpdateReader(t *testing.T) {
return func(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
stats := NewStats()
acc := newAccountSizeName(stats, in, 1, "test")
acc := newAccountSizeName(context.Background(), stats, in, 1, "test")
assert.Equal(t, in, acc.GetReader())
assert.Equal(t, acc, stats.inProgress.get("test"))
@ -77,7 +78,7 @@ func TestAccountGetUpdateReader(t *testing.T) {
}
in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
acc.UpdateReader(in2)
acc.UpdateReader(context.Background(), in2)
assert.Equal(t, in2, acc.GetReader())
assert.Equal(t, acc, stats.inProgress.get("test"))
@ -92,7 +93,7 @@ func TestAccountGetUpdateReader(t *testing.T) {
func TestAccountRead(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
stats := NewStats()
acc := newAccountSizeName(stats, in, 1, "test")
acc := newAccountSizeName(context.Background(), stats, in, 1, "test")
assert.True(t, acc.values.start.IsZero())
acc.values.mu.Lock()
@ -133,7 +134,7 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) {
}
in := ioutil.NopCloser(bytes.NewBuffer(buf))
stats := NewStats()
acc := newAccountSizeName(stats, in, int64(len(buf)), "test")
acc := newAccountSizeName(context.Background(), stats, in, int64(len(buf)), "test")
if withBuffer {
acc = acc.WithBuffer()
}
@ -173,7 +174,7 @@ func TestAccountWriteToWithBuffer(t *testing.T) {
func TestAccountString(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
stats := NewStats()
acc := newAccountSizeName(stats, in, 3, "test")
acc := newAccountSizeName(context.Background(), stats, in, 3, "test")
// FIXME not an exhaustive test!
@ -193,7 +194,7 @@ func TestAccountString(t *testing.T) {
func TestAccountAccounter(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
stats := NewStats()
acc := newAccountSizeName(stats, in, 3, "test")
acc := newAccountSizeName(context.Background(), stats, in, 3, "test")
assert.True(t, in == acc.OldStream())
@ -260,7 +261,7 @@ func TestAccountMaxTransfer(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100)))
stats := NewStats()
acc := newAccountSizeName(stats, in, 1, "test")
acc := newAccountSizeName(context.Background(), stats, in, 1, "test")
var b = make([]byte, 10)
@ -277,7 +278,7 @@ func TestAccountMaxTransfer(t *testing.T) {
fs.Config.CutoffMode = fs.CutoffModeSoft
stats = NewStats()
acc = newAccountSizeName(stats, in, 1, "test")
acc = newAccountSizeName(context.Background(), stats, in, 1, "test")
n, err = acc.Read(b)
assert.Equal(t, 10, n)
@ -302,7 +303,7 @@ func TestAccountMaxTransferWriteTo(t *testing.T) {
in := ioutil.NopCloser(readers.NewPatternReader(1024))
stats := NewStats()
acc := newAccountSizeName(stats, in, 1, "test")
acc := newAccountSizeName(context.Background(), stats, in, 1, "test")
var b bytes.Buffer

View file

@ -1,6 +1,7 @@
package accounting
import (
"context"
"encoding/json"
"io"
"sync"
@ -135,12 +136,12 @@ func (tr *Transfer) Reset() {
}
// Account returns reader that knows how to keep track of transfer progress.
func (tr *Transfer) Account(in io.ReadCloser) *Account {
func (tr *Transfer) Account(ctx context.Context, in io.ReadCloser) *Account {
tr.mu.Lock()
if tr.acc == nil {
tr.acc = newAccountSizeName(tr.stats, in, tr.size, tr.remote)
tr.acc = newAccountSizeName(ctx, tr.stats, in, tr.size, tr.remote)
} else {
tr.acc.UpdateReader(in)
tr.acc.UpdateReader(ctx, in)
}
tr.mu.Unlock()
return tr.acc

View file

@ -318,7 +318,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo
defer func() {
tr1.Done(nil) // error handling is done by the caller
}()
in1 = tr1.Account(in1).WithBuffer() // account and buffer the transfer
in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer
in2, err := src.Open(ctx)
if err != nil {
@ -328,7 +328,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo
defer func() {
tr2.Done(nil) // error handling is done by the caller
}()
in2 = tr2.Account(in2).WithBuffer() // account and buffer the transfer
in2 = tr2.Account(ctx, in2).WithBuffer() // account and buffer the transfer
// To assign err variable before defer.
differ, err = CheckEqualReaders(in1, in2)

View file

@ -158,7 +158,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
mc.calculateChunks()
// Make accounting
mc.acc = tr.Account(nil)
mc.acc = tr.Account(ctx, nil)
// create write file handle
mc.wc, err = openWriterAt(gCtx, remote, mc.size)

View file

@ -366,7 +366,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
return nil, accounting.ErrorMaxTransferLimitReachedFatal
}
if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) {
in := tr.Account(nil) // account the transfer
in := tr.Account(ctx, nil) // account the transfer
in.ServerSideCopyStart()
newDst, err = doCopy(ctx, src, remote)
if err == nil {
@ -421,7 +421,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx))
newDst = dst
} else {
in := tr.Account(in0).WithBuffer() // account and buffer the transfer
in := tr.Account(ctx, in0).WithBuffer() // account and buffer the transfer
var wrappedSrc fs.ObjectInfo = src
// We try to pass the original object if possible
if src.Remote() != remote {
@ -1054,7 +1054,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error {
if count >= 0 {
in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in}
}
in = tr.Account(in).WithBuffer() // account and buffer the transfer
in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer
// take the lock just before we output stuff, so at the last possible moment
mu.Lock()
defer mu.Unlock()
@ -1072,7 +1072,7 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser,
defer func() {
tr.Done(err)
}()
in = tr.Account(in).WithBuffer()
in = tr.Account(ctx, in).WithBuffer()
readCounter := readers.NewCountingReader(in)
var trackingIn io.Reader
@ -1420,7 +1420,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo
tr.Done(err)
}()
body := ioutil.NopCloser(in) // we let the server close the body
in := tr.Account(body) // account the transfer (no buffering)
in := tr.Account(ctx, body) // account the transfer (no buffering)
if SkipDestructive(ctx, dstFileName, "upload from pipe") {
// prevents "broken pipe" errors

View file

@ -79,7 +79,7 @@ func (fh *ReadFileHandle) openPending() (err error) {
}
tr := accounting.GlobalStats().NewTransfer(o)
fh.done = tr.Done
fh.r = tr.Account(r).WithBuffer() // account the transfer
fh.r = tr.Account(context.TODO(), r).WithBuffer() // account the transfer
fh.opened = true
return nil
@ -158,7 +158,7 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
return err
}
}
fh.r.UpdateReader(r)
fh.r.UpdateReader(context.TODO(), r)
fh.offset = offset
return nil
}

View file

@ -495,7 +495,7 @@ func (dl *downloader) open(offset int64) (err error) {
if err != nil {
return errors.Wrap(err, "vfs reader: failed to open source file")
}
dl.in = dl.tr.Account(in0).WithBuffer() // account and buffer the transfer
dl.in = dl.tr.Account(dl.dls.ctx, in0).WithBuffer() // account and buffer the transfer
dl.offset = offset