f5439ddc54
The deadlock was caused in transfermap.go by calling mu.RLock() in one
function then calling it again in a sub function. Normally this is
fine, however this leaves a window where mu.Lock() can be called. When
mu.Lock() is called it doesn't allow the second mu.RLock() and
deadlocks.
Thead 1 Thread 2
String():mu.RLock()
del():mu.Lock()
sortedSlice():mu.RLock() - DEADLOCK
Lesson learnt: don't try using locks recursively ever!
This patch fixes the problem by removing the second mu.RLock(). This
was done by factoring the code that was calling it into the
transfermap.go file so all the locking can be seen at once which was
ultimately the cause of the problem - the code which used the locks
was too far away from the rest of the code using the lock.
This problem was introduced in:
bfa5715017
fs/accounting: sort transfers by start time
Which hasn't been released in a stable version yet
193 lines
4.6 KiB
Go
193 lines
4.6 KiB
Go
package accounting
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/rc"
|
|
)
|
|
|
|
// TransferSnapshot represents state of an account at point in time.
|
|
type TransferSnapshot struct {
|
|
Name string `json:"name"`
|
|
Size int64 `json:"size"`
|
|
Bytes int64 `json:"bytes"`
|
|
Checked bool `json:"checked"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
CompletedAt time.Time `json:"completed_at,omitempty"`
|
|
Error error `json:"-"`
|
|
Group string `json:"group"`
|
|
}
|
|
|
|
// MarshalJSON implements json.Marshaler interface.
|
|
func (as TransferSnapshot) MarshalJSON() ([]byte, error) {
|
|
err := ""
|
|
if as.Error != nil {
|
|
err = as.Error.Error()
|
|
}
|
|
|
|
type Alias TransferSnapshot
|
|
return json.Marshal(&struct {
|
|
Error string `json:"error"`
|
|
Alias
|
|
}{
|
|
Error: err,
|
|
Alias: (Alias)(as),
|
|
})
|
|
}
|
|
|
|
// Transfer keeps track of initiated transfers and provides access to
|
|
// accounting functions.
|
|
// Transfer needs to be closed on completion.
|
|
type Transfer struct {
|
|
// these are initialised at creation and may be accessed without locking
|
|
stats *StatsInfo
|
|
remote string
|
|
size int64
|
|
startedAt time.Time
|
|
checking bool
|
|
|
|
// Protects all below
|
|
//
|
|
// NB to avoid deadlocks we must release this lock before
|
|
// calling any methods on Transfer.stats. This is because
|
|
// StatsInfo calls back into Transfer.
|
|
mu sync.RWMutex
|
|
acc *Account
|
|
err error
|
|
completedAt time.Time
|
|
}
|
|
|
|
// newCheckingTransfer instantiates new checking of the object.
|
|
func newCheckingTransfer(stats *StatsInfo, obj fs.Object) *Transfer {
|
|
return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), true)
|
|
}
|
|
|
|
// newTransfer instantiates new transfer.
|
|
func newTransfer(stats *StatsInfo, obj fs.Object) *Transfer {
|
|
return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), false)
|
|
}
|
|
|
|
func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking bool) *Transfer {
|
|
tr := &Transfer{
|
|
stats: stats,
|
|
remote: remote,
|
|
size: size,
|
|
startedAt: time.Now(),
|
|
checking: checking,
|
|
}
|
|
stats.AddTransfer(tr)
|
|
return tr
|
|
}
|
|
|
|
// Done ends the transfer.
|
|
// Must be called after transfer is finished to run proper cleanups.
|
|
func (tr *Transfer) Done(err error) {
|
|
if err != nil {
|
|
err = tr.stats.Error(err)
|
|
|
|
tr.mu.Lock()
|
|
tr.err = err
|
|
tr.mu.Unlock()
|
|
}
|
|
|
|
tr.mu.RLock()
|
|
acc := tr.acc
|
|
tr.mu.RUnlock()
|
|
|
|
if acc != nil {
|
|
// Close the file if it is still open
|
|
if err := acc.Close(); err != nil {
|
|
fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err)
|
|
}
|
|
// Signal done with accounting
|
|
acc.Done()
|
|
// free the account since we may keep the transfer
|
|
acc = nil
|
|
}
|
|
|
|
tr.mu.Lock()
|
|
tr.completedAt = time.Now()
|
|
tr.mu.Unlock()
|
|
|
|
if tr.checking {
|
|
tr.stats.DoneChecking(tr.remote)
|
|
} else {
|
|
tr.stats.DoneTransferring(tr.remote, err == nil)
|
|
}
|
|
tr.stats.PruneTransfers()
|
|
}
|
|
|
|
// Reset allows to switch the Account to another transfer method.
|
|
func (tr *Transfer) Reset() {
|
|
tr.mu.RLock()
|
|
acc := tr.acc
|
|
tr.acc = nil
|
|
tr.mu.RUnlock()
|
|
|
|
if acc != nil {
|
|
if err := acc.Close(); err != nil {
|
|
fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Account returns reader that knows how to keep track of transfer progress.
|
|
func (tr *Transfer) Account(ctx context.Context, in io.ReadCloser) *Account {
|
|
tr.mu.Lock()
|
|
if tr.acc == nil {
|
|
tr.acc = newAccountSizeName(ctx, tr.stats, in, tr.size, tr.remote)
|
|
} else {
|
|
tr.acc.UpdateReader(ctx, in)
|
|
}
|
|
tr.mu.Unlock()
|
|
return tr.acc
|
|
}
|
|
|
|
// TimeRange returns the time transfer started and ended at. If not completed
|
|
// it will return zero time for end time.
|
|
func (tr *Transfer) TimeRange() (time.Time, time.Time) {
|
|
tr.mu.RLock()
|
|
defer tr.mu.RUnlock()
|
|
return tr.startedAt, tr.completedAt
|
|
}
|
|
|
|
// IsDone returns true if transfer is completed.
|
|
func (tr *Transfer) IsDone() bool {
|
|
tr.mu.RLock()
|
|
defer tr.mu.RUnlock()
|
|
return !tr.completedAt.IsZero()
|
|
}
|
|
|
|
// Snapshot produces stats for this account at point in time.
|
|
func (tr *Transfer) Snapshot() TransferSnapshot {
|
|
tr.mu.RLock()
|
|
defer tr.mu.RUnlock()
|
|
|
|
var s, b int64 = tr.size, 0
|
|
if tr.acc != nil {
|
|
b, s = tr.acc.progress()
|
|
}
|
|
return TransferSnapshot{
|
|
Name: tr.remote,
|
|
Checked: tr.checking,
|
|
Size: s,
|
|
Bytes: b,
|
|
StartedAt: tr.startedAt,
|
|
CompletedAt: tr.completedAt,
|
|
Error: tr.err,
|
|
Group: tr.stats.group,
|
|
}
|
|
}
|
|
|
|
// rcStats returns stats for the transfer suitable for the rc
|
|
func (tr *Transfer) rcStats() rc.Params {
|
|
return rc.Params{
|
|
"name": tr.remote, // no locking needed to access thess
|
|
"size": tr.size,
|
|
}
|
|
}
|