diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 50282ff26..11018c199 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -124,6 +124,7 @@ func (acc *Account) UpdateReader(in io.ReadCloser) { acc.in = in acc.close = in acc.origIn = in + acc.closed = false if acc.withBuf { acc.WithBuffer() } @@ -243,14 +244,20 @@ func (acc *Account) Close() error { return nil } acc.closed = true - close(acc.exit) - acc.stats.inProgress.clear(acc.name) if acc.close == nil { return nil } return acc.close.Close() } +// Done with accounting - must be called to free accounting goroutine +func (acc *Account) Done() { + acc.mu.Lock() + defer acc.mu.Unlock() + close(acc.exit) + acc.stats.inProgress.clear(acc.name) +} + // progress returns bytes read as well as the size. // Size can be <= 0 if the size is unknown. func (acc *Account) progress() (bytes, size int64) { diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go index 3986902a8..3f365e493 100644 --- a/fs/accounting/accounting_test.go +++ b/fs/accounting/accounting_test.go @@ -32,6 +32,8 @@ func TestNewAccountSizeName(t *testing.T) { assert.Equal(t, acc, stats.inProgress.get("test")) err := acc.Close() assert.NoError(t, err) + assert.Equal(t, acc, stats.inProgress.get("test")) + acc.Done() assert.Nil(t, stats.inProgress.get("test")) } @@ -55,18 +57,31 @@ func TestAccountWithBuffer(t *testing.T) { } func TestAccountGetUpdateReader(t *testing.T) { - in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + test := func(doClose bool) func(t *testing.T) { + return func(t *testing.T) { + in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) + stats := NewStats() + acc := newAccountSizeName(stats, in, 1, "test") - assert.Equal(t, in, acc.GetReader()) + assert.Equal(t, in, acc.GetReader()) + assert.Equal(t, acc, stats.inProgress.get("test")) - in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc.UpdateReader(in2) + if doClose { + // close the account before swapping it out + require.NoError(t, acc.Close()) + } - assert.Equal(t, in2, acc.GetReader()) + in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) + acc.UpdateReader(in2) - assert.NoError(t, acc.Close()) + assert.Equal(t, in2, acc.GetReader()) + assert.Equal(t, acc, stats.inProgress.get("test")) + + assert.NoError(t, acc.Close()) + } + } + t.Run("NoClose", test(false)) + t.Run("Close", test(true)) } func TestAccountRead(t *testing.T) { diff --git a/fs/accounting/transfer.go b/fs/accounting/transfer.go index a1ab8d023..dc5e90d0e 100644 --- a/fs/accounting/transfer.go +++ b/fs/accounting/transfer.go @@ -96,9 +96,12 @@ func (tr *Transfer) Done(err error) { tr.mu.RUnlock() if acc != nil { + // Close the file if it is still open if err := acc.Close(); err != nil { fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err) } + // Signal done with accounting + acc.Done() } tr.mu.Lock()