accounting: fix file handle leak on errors - fixes #3547
In53a1a0e3ef
we introduced a problem where if there was an error on the file being transferred then the file was re-opened and the old one wasn't closed. This was partially fixed inbfbddab46b
however this didn't address the case of the old file being closed. This is now fixed by - marking the file as open again in UpdateReader - moving the stopping the accounting machinery to a new method Done
This commit is contained in:
parent
70e043e641
commit
56544bb2fd
3 changed files with 35 additions and 10 deletions
|
@ -124,6 +124,7 @@ func (acc *Account) UpdateReader(in io.ReadCloser) {
|
|||
acc.in = in
|
||||
acc.close = in
|
||||
acc.origIn = in
|
||||
acc.closed = false
|
||||
if acc.withBuf {
|
||||
acc.WithBuffer()
|
||||
}
|
||||
|
@ -243,14 +244,20 @@ func (acc *Account) Close() error {
|
|||
return nil
|
||||
}
|
||||
acc.closed = true
|
||||
close(acc.exit)
|
||||
acc.stats.inProgress.clear(acc.name)
|
||||
if acc.close == nil {
|
||||
return nil
|
||||
}
|
||||
return acc.close.Close()
|
||||
}
|
||||
|
||||
// Done with accounting - must be called to free accounting goroutine
|
||||
func (acc *Account) Done() {
|
||||
acc.mu.Lock()
|
||||
defer acc.mu.Unlock()
|
||||
close(acc.exit)
|
||||
acc.stats.inProgress.clear(acc.name)
|
||||
}
|
||||
|
||||
// progress returns bytes read as well as the size.
|
||||
// Size can be <= 0 if the size is unknown.
|
||||
func (acc *Account) progress() (bytes, size int64) {
|
||||
|
|
|
@ -32,6 +32,8 @@ func TestNewAccountSizeName(t *testing.T) {
|
|||
assert.Equal(t, acc, stats.inProgress.get("test"))
|
||||
err := acc.Close()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, acc, stats.inProgress.get("test"))
|
||||
acc.Done()
|
||||
assert.Nil(t, stats.inProgress.get("test"))
|
||||
}
|
||||
|
||||
|
@ -55,18 +57,31 @@ func TestAccountWithBuffer(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAccountGetUpdateReader(t *testing.T) {
|
||||
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
|
||||
stats := NewStats()
|
||||
acc := newAccountSizeName(stats, in, 1, "test")
|
||||
test := func(doClose bool) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
|
||||
stats := NewStats()
|
||||
acc := newAccountSizeName(stats, in, 1, "test")
|
||||
|
||||
assert.Equal(t, in, acc.GetReader())
|
||||
assert.Equal(t, in, acc.GetReader())
|
||||
assert.Equal(t, acc, stats.inProgress.get("test"))
|
||||
|
||||
in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
|
||||
acc.UpdateReader(in2)
|
||||
if doClose {
|
||||
// close the account before swapping it out
|
||||
require.NoError(t, acc.Close())
|
||||
}
|
||||
|
||||
assert.Equal(t, in2, acc.GetReader())
|
||||
in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
|
||||
acc.UpdateReader(in2)
|
||||
|
||||
assert.NoError(t, acc.Close())
|
||||
assert.Equal(t, in2, acc.GetReader())
|
||||
assert.Equal(t, acc, stats.inProgress.get("test"))
|
||||
|
||||
assert.NoError(t, acc.Close())
|
||||
}
|
||||
}
|
||||
t.Run("NoClose", test(false))
|
||||
t.Run("Close", test(true))
|
||||
}
|
||||
|
||||
func TestAccountRead(t *testing.T) {
|
||||
|
|
|
@ -96,9 +96,12 @@ func (tr *Transfer) Done(err error) {
|
|||
tr.mu.RUnlock()
|
||||
|
||||
if acc != nil {
|
||||
// Close the file if it is still open
|
||||
if err := acc.Close(); err != nil {
|
||||
fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err)
|
||||
}
|
||||
// Signal done with accounting
|
||||
acc.Done()
|
||||
}
|
||||
|
||||
tr.mu.Lock()
|
||||
|
|
Loading…
Reference in a new issue