forked from TrueCloudLab/rclone
accounting: fix locking in Transfer to avoid deadlock with --progress
Before this change, using -P occasionally deadlocked on the transfer mutex and the stats mutex since they call each other via the progress printing. This is fixed by shortening the locking windows and converting the mutex to a RW mutex.
This commit is contained in:
parent
9d1fb2f4e7
commit
6f87267b34
1 changed files with 20 additions and 19 deletions
|
@ -40,16 +40,17 @@ func (as TransferSnapshot) MarshalJSON() ([]byte, error) {
|
||||||
// accounting functions.
|
// accounting functions.
|
||||||
// Transfer needs to be closed on completion.
|
// Transfer needs to be closed on completion.
|
||||||
type Transfer struct {
|
type Transfer struct {
|
||||||
|
// these are initialised at creation and may be accessed without locking
|
||||||
stats *StatsInfo
|
stats *StatsInfo
|
||||||
remote string
|
remote string
|
||||||
size int64
|
size int64
|
||||||
|
startedAt time.Time
|
||||||
checking bool
|
checking bool
|
||||||
|
|
||||||
// Protects all bellow
|
// Protects all below
|
||||||
mu sync.Mutex
|
mu sync.RWMutex
|
||||||
acc *Account
|
acc *Account
|
||||||
err error
|
err error
|
||||||
startedAt time.Time
|
|
||||||
completedAt time.Time
|
completedAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +80,6 @@ func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking
|
||||||
// Must be called after transfer is finished to run proper cleanups.
|
// Must be called after transfer is finished to run proper cleanups.
|
||||||
func (tr *Transfer) Done(err error) {
|
func (tr *Transfer) Done(err error) {
|
||||||
tr.mu.Lock()
|
tr.mu.Lock()
|
||||||
defer tr.mu.Unlock()
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tr.stats.Error(err)
|
tr.stats.Error(err)
|
||||||
|
@ -90,46 +90,47 @@ func (tr *Transfer) Done(err error) {
|
||||||
fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err)
|
fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tr.completedAt = time.Now()
|
||||||
|
|
||||||
|
tr.mu.Unlock()
|
||||||
|
|
||||||
if tr.checking {
|
if tr.checking {
|
||||||
tr.stats.DoneChecking(tr.remote)
|
tr.stats.DoneChecking(tr.remote)
|
||||||
} else {
|
} else {
|
||||||
tr.stats.DoneTransferring(tr.remote, err == nil)
|
tr.stats.DoneTransferring(tr.remote, err == nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
tr.completedAt = time.Now()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Account returns reader that knows how to keep track of transfer progress.
|
// Account returns reader that knows how to keep track of transfer progress.
|
||||||
func (tr *Transfer) Account(in io.ReadCloser) *Account {
|
func (tr *Transfer) Account(in io.ReadCloser) *Account {
|
||||||
tr.mu.Lock()
|
tr.mu.Lock()
|
||||||
defer tr.mu.Unlock()
|
|
||||||
|
|
||||||
if tr.acc == nil {
|
if tr.acc == nil {
|
||||||
tr.acc = newAccountSizeName(tr.stats, in, tr.size, tr.remote)
|
tr.acc = newAccountSizeName(tr.stats, in, tr.size, tr.remote)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
tr.mu.Unlock()
|
||||||
return tr.acc
|
return tr.acc
|
||||||
}
|
}
|
||||||
|
|
||||||
// TimeRange returns the time transfer started and ended at. If not completed
|
// TimeRange returns the time transfer started and ended at. If not completed
|
||||||
// it will return zero time for end time.
|
// it will return zero time for end time.
|
||||||
func (tr *Transfer) TimeRange() (time.Time, time.Time) {
|
func (tr *Transfer) TimeRange() (time.Time, time.Time) {
|
||||||
tr.mu.Lock()
|
tr.mu.RLock()
|
||||||
defer tr.mu.Unlock()
|
defer tr.mu.RUnlock()
|
||||||
return tr.startedAt, tr.completedAt
|
return tr.startedAt, tr.completedAt
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsDone returns true if transfer is completed.
|
// IsDone returns true if transfer is completed.
|
||||||
func (tr *Transfer) IsDone() bool {
|
func (tr *Transfer) IsDone() bool {
|
||||||
tr.mu.Lock()
|
tr.mu.RLock()
|
||||||
defer tr.mu.Unlock()
|
defer tr.mu.RUnlock()
|
||||||
return !tr.completedAt.IsZero()
|
return !tr.completedAt.IsZero()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot produces stats for this account at point in time.
|
// Snapshot produces stats for this account at point in time.
|
||||||
func (tr *Transfer) Snapshot() TransferSnapshot {
|
func (tr *Transfer) Snapshot() TransferSnapshot {
|
||||||
tr.mu.Lock()
|
tr.mu.RLock()
|
||||||
defer tr.mu.Unlock()
|
defer tr.mu.RUnlock()
|
||||||
|
|
||||||
var s, b int64 = tr.size, 0
|
var s, b int64 = tr.size, 0
|
||||||
if tr.acc != nil {
|
if tr.acc != nil {
|
||||||
|
|
Loading…
Reference in a new issue