forked from TrueCloudLab/rclone
accounting: check for max transfer in WriteTo
Before this change the max transfer tests were failing for remotes which were using WriterTo.
This commit is contained in:
parent
b705ead3fd
commit
f5455d865b
3 changed files with 78 additions and 45 deletions
|
@ -171,22 +171,39 @@ func (acc *Account) averageLoop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the read is valid returning the number of bytes it is over
|
// Check the read before it has happened is valid returning the number
|
||||||
func (acc *Account) checkRead() (over int64, err error) {
|
// of bytes remaining to read.
|
||||||
|
func (acc *Account) checkReadBefore() (bytesUntilLimit int64, err error) {
|
||||||
acc.statmu.Lock()
|
acc.statmu.Lock()
|
||||||
if acc.max >= 0 {
|
if acc.max >= 0 {
|
||||||
over = acc.stats.GetBytes() - acc.max
|
bytesUntilLimit = acc.max - acc.stats.GetBytes()
|
||||||
if over >= 0 {
|
if bytesUntilLimit < 0 {
|
||||||
acc.statmu.Unlock()
|
acc.statmu.Unlock()
|
||||||
return over, ErrorMaxTransferLimitReachedFatal
|
return bytesUntilLimit, ErrorMaxTransferLimitReachedFatal
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
bytesUntilLimit = 1 << 62
|
||||||
}
|
}
|
||||||
// Set start time.
|
// Set start time.
|
||||||
if acc.start.IsZero() {
|
if acc.start.IsZero() {
|
||||||
acc.start = time.Now()
|
acc.start = time.Now()
|
||||||
}
|
}
|
||||||
acc.statmu.Unlock()
|
acc.statmu.Unlock()
|
||||||
return over, nil
|
return bytesUntilLimit, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the read call after the read has happened
|
||||||
|
func checkReadAfter(bytesUntilLimit int64, n int, err error) (outN int, outErr error) {
|
||||||
|
bytesUntilLimit -= int64(n)
|
||||||
|
if bytesUntilLimit < 0 {
|
||||||
|
// chop the overage off
|
||||||
|
n += int(bytesUntilLimit)
|
||||||
|
if n < 0 {
|
||||||
|
n = 0
|
||||||
|
}
|
||||||
|
err = ErrorMaxTransferLimitReachedFatal
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServerSideCopyStart should be called at the start of a server side copy
|
// ServerSideCopyStart should be called at the start of a server side copy
|
||||||
|
@ -226,18 +243,11 @@ func (acc *Account) accountRead(n int) {
|
||||||
|
|
||||||
// read bytes from the io.Reader passed in and account them
|
// read bytes from the io.Reader passed in and account them
|
||||||
func (acc *Account) read(in io.Reader, p []byte) (n int, err error) {
|
func (acc *Account) read(in io.Reader, p []byte) (n int, err error) {
|
||||||
_, err = acc.checkRead()
|
bytesUntilLimit, err := acc.checkReadBefore()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
n, err = in.Read(p)
|
n, err = in.Read(p)
|
||||||
acc.accountRead(n)
|
acc.accountRead(n)
|
||||||
if over, checkErr := acc.checkRead(); checkErr == ErrorMaxTransferLimitReachedFatal {
|
n, err = checkReadAfter(bytesUntilLimit, n, err)
|
||||||
// chop the overage off
|
|
||||||
n -= int(over)
|
|
||||||
if n < 0 {
|
|
||||||
n = 0
|
|
||||||
}
|
|
||||||
err = checkErr
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
@ -263,9 +273,10 @@ type accountWriteTo struct {
|
||||||
//
|
//
|
||||||
// Implementations must not retain p.
|
// Implementations must not retain p.
|
||||||
func (awt *accountWriteTo) Write(p []byte) (n int, err error) {
|
func (awt *accountWriteTo) Write(p []byte) (n int, err error) {
|
||||||
_, err = awt.acc.checkRead()
|
bytesUntilLimit, err := awt.acc.checkReadBefore()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
n, err = awt.w.Write(p)
|
n, err = awt.w.Write(p)
|
||||||
|
n, err = checkReadAfter(bytesUntilLimit, n, err)
|
||||||
awt.acc.accountRead(n)
|
awt.acc.accountRead(n)
|
||||||
}
|
}
|
||||||
return n, err
|
return n, err
|
||||||
|
@ -291,8 +302,9 @@ func (acc *Account) WriteTo(w io.Writer) (n int64, err error) {
|
||||||
func (acc *Account) AccountRead(n int) (err error) {
|
func (acc *Account) AccountRead(n int) (err error) {
|
||||||
acc.mu.Lock()
|
acc.mu.Lock()
|
||||||
defer acc.mu.Unlock()
|
defer acc.mu.Unlock()
|
||||||
_, err = acc.checkRead()
|
bytesUntilLimit, err := acc.checkReadBefore()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
n, err = checkReadAfter(bytesUntilLimit, n, err)
|
||||||
acc.accountRead(n)
|
acc.accountRead(n)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
"github.com/rclone/rclone/fs/asyncreader"
|
"github.com/rclone/rclone/fs/asyncreader"
|
||||||
"github.com/rclone/rclone/fs/fserrors"
|
"github.com/rclone/rclone/fs/fserrors"
|
||||||
|
"github.com/rclone/rclone/lib/readers"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -278,6 +279,27 @@ func TestAccountMaxTransfer(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAccountMaxTransferWriteTo(t *testing.T) {
|
||||||
|
old := fs.Config.MaxTransfer
|
||||||
|
oldMode := fs.Config.CutoffMode
|
||||||
|
|
||||||
|
fs.Config.MaxTransfer = 15
|
||||||
|
defer func() {
|
||||||
|
fs.Config.MaxTransfer = old
|
||||||
|
fs.Config.CutoffMode = oldMode
|
||||||
|
}()
|
||||||
|
|
||||||
|
in := ioutil.NopCloser(readers.NewPatternReader(1024))
|
||||||
|
stats := NewStats()
|
||||||
|
acc := newAccountSizeName(stats, in, 1, "test")
|
||||||
|
|
||||||
|
var b bytes.Buffer
|
||||||
|
|
||||||
|
n, err := acc.WriteTo(&b)
|
||||||
|
assert.Equal(t, int64(15), n)
|
||||||
|
assert.Equal(t, ErrorMaxTransferLimitReachedFatal, err)
|
||||||
|
}
|
||||||
|
|
||||||
func TestShortenName(t *testing.T) {
|
func TestShortenName(t *testing.T) {
|
||||||
for _, test := range []struct {
|
for _, test := range []struct {
|
||||||
in string
|
in string
|
||||||
|
|
|
@ -1539,56 +1539,55 @@ func TestCopyFileMaxTransfer(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
const sizeCutoff = 2048
|
const sizeCutoff = 2048
|
||||||
file1 := r.WriteFile("file1", "file1 contents", t1)
|
file1 := r.WriteFile("TestCopyFileMaxTransfer/file1", "file1 contents", t1)
|
||||||
file2 := r.WriteFile("file2", "file2 contents"+random.String(sizeCutoff), t2)
|
file2 := r.WriteFile("TestCopyFileMaxTransfer/file2", "file2 contents"+random.String(sizeCutoff), t2)
|
||||||
|
file3 := r.WriteFile("TestCopyFileMaxTransfer/file3", "file3 contents"+random.String(sizeCutoff), t2)
|
||||||
rfile1 := file1
|
file4 := r.WriteFile("TestCopyFileMaxTransfer/file4", "file4 contents"+random.String(sizeCutoff), t2)
|
||||||
rfile1.Path = "sub/file1"
|
|
||||||
rfile2a := file2
|
|
||||||
rfile2a.Path = "sub/file2a"
|
|
||||||
rfile2b := file2
|
|
||||||
rfile2b.Path = "sub/file2b"
|
|
||||||
rfile2c := file2
|
|
||||||
rfile2c.Path = "sub/file2c"
|
|
||||||
|
|
||||||
|
// Cutoff mode: Hard
|
||||||
fs.Config.MaxTransfer = sizeCutoff
|
fs.Config.MaxTransfer = sizeCutoff
|
||||||
fs.Config.CutoffMode = fs.CutoffModeHard
|
fs.Config.CutoffMode = fs.CutoffModeHard
|
||||||
accounting.Stats(ctx).ResetCounters()
|
|
||||||
|
|
||||||
err := operations.CopyFile(ctx, r.Fremote, r.Flocal, rfile1.Path, file1.Path)
|
// file1: Show a small file gets transferred OK
|
||||||
|
accounting.Stats(ctx).ResetCounters()
|
||||||
|
err := operations.CopyFile(ctx, r.Fremote, r.Flocal, file1.Path, file1.Path)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
fstest.CheckItems(t, r.Flocal, file1, file2)
|
fstest.CheckItems(t, r.Flocal, file1, file2, file3, file4)
|
||||||
fstest.CheckItems(t, r.Fremote, rfile1)
|
fstest.CheckItems(t, r.Fremote, file1)
|
||||||
|
|
||||||
|
// file2: show a large file does not get transferred
|
||||||
accounting.Stats(ctx).ResetCounters()
|
accounting.Stats(ctx).ResetCounters()
|
||||||
|
err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file2.Path)
|
||||||
err = operations.CopyFile(ctx, r.Fremote, r.Flocal, rfile2a.Path, file2.Path)
|
require.NotNil(t, err, "Did not get expected max transfer limit error")
|
||||||
require.NotNil(t, err)
|
|
||||||
assert.Contains(t, err.Error(), "Max transfer limit reached")
|
assert.Contains(t, err.Error(), "Max transfer limit reached")
|
||||||
assert.True(t, fserrors.IsFatalError(err))
|
assert.True(t, fserrors.IsFatalError(err))
|
||||||
fstest.CheckItems(t, r.Flocal, file1, file2)
|
fstest.CheckItems(t, r.Flocal, file1, file2, file3, file4)
|
||||||
fstest.CheckItems(t, r.Fremote, rfile1)
|
fstest.CheckItems(t, r.Fremote, file1)
|
||||||
|
|
||||||
|
// Cutoff mode: Cautious
|
||||||
fs.Config.CutoffMode = fs.CutoffModeCautious
|
fs.Config.CutoffMode = fs.CutoffModeCautious
|
||||||
accounting.Stats(ctx).ResetCounters()
|
|
||||||
|
|
||||||
err = operations.CopyFile(ctx, r.Fremote, r.Flocal, rfile2b.Path, file2.Path)
|
// file3: show a large file does not get transferred
|
||||||
|
accounting.Stats(ctx).ResetCounters()
|
||||||
|
err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file3.Path, file3.Path)
|
||||||
require.NotNil(t, err)
|
require.NotNil(t, err)
|
||||||
assert.Contains(t, err.Error(), "Max transfer limit reached")
|
assert.Contains(t, err.Error(), "Max transfer limit reached")
|
||||||
assert.True(t, fserrors.IsFatalError(err))
|
assert.True(t, fserrors.IsFatalError(err))
|
||||||
fstest.CheckItems(t, r.Flocal, file1, file2)
|
fstest.CheckItems(t, r.Flocal, file1, file2, file3, file4)
|
||||||
fstest.CheckItems(t, r.Fremote, rfile1)
|
fstest.CheckItems(t, r.Fremote, file1)
|
||||||
|
|
||||||
if strings.HasPrefix(r.Fremote.Name(), "TestChunker") {
|
if strings.HasPrefix(r.Fremote.Name(), "TestChunker") {
|
||||||
t.Log("skipping remainder of test for chunker as it involves multiple transfers")
|
t.Log("skipping remainder of test for chunker as it involves multiple transfers")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cutoff mode: Soft
|
||||||
fs.Config.CutoffMode = fs.CutoffModeSoft
|
fs.Config.CutoffMode = fs.CutoffModeSoft
|
||||||
accounting.Stats(ctx).ResetCounters()
|
|
||||||
|
|
||||||
err = operations.CopyFile(ctx, r.Fremote, r.Flocal, rfile2c.Path, file2.Path)
|
// file4: show a large file does get transferred this time
|
||||||
|
accounting.Stats(ctx).ResetCounters()
|
||||||
|
err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file4.Path, file4.Path)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
fstest.CheckItems(t, r.Flocal, file1, file2)
|
fstest.CheckItems(t, r.Flocal, file1, file2, file3, file4)
|
||||||
fstest.CheckItems(t, r.Fremote, rfile1, rfile2c)
|
fstest.CheckItems(t, r.Fremote, file1, file4)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue