accounting: factor stats into its own structure

This makes it very obvious which mutex to take for accessing the
values.
This commit is contained in:
Nick Craig-Wood 2020-05-12 16:16:17 +01:00
parent 32507774de
commit cb5979a468
2 changed files with 77 additions and 69 deletions

View file

@ -32,22 +32,28 @@ type Account struct {
// in http transport calls Read() after Do() returns on // in http transport calls Read() after Do() returns on
// CancelRequest so this race can happen when it apparently // CancelRequest so this race can happen when it apparently
// shouldn't. // shouldn't.
mu sync.Mutex mu sync.Mutex // mutex protects these values
in io.Reader in io.Reader
origIn io.ReadCloser origIn io.ReadCloser
close io.Closer close io.Closer
size int64 size int64
name string name string
statmu sync.Mutex // Separate mutex for stat values. closed bool // set if the file is closed
exit chan struct{} // channel that will be closed when transfer is finished
withBuf bool // is using a buffered in
values accountValues
}
// accountValues holds statistics for this Account
type accountValues struct {
mu sync.Mutex // Mutex for stat values.
bytes int64 // Total number of bytes read bytes int64 // Total number of bytes read
max int64 // if >=0 the max number of bytes to transfer max int64 // if >=0 the max number of bytes to transfer
start time.Time // Start time of first read start time.Time // Start time of first read
lpTime time.Time // Time of last average measurement lpTime time.Time // Time of last average measurement
lpBytes int // Number of bytes read since last measurement lpBytes int // Number of bytes read since last measurement
avg float64 // Moving average of last few measurements in bytes/s avg float64 // Moving average of last few measurements in bytes/s
closed bool // set if the file is closed
exit chan struct{} // channel that will be closed when transfer is finished
withBuf bool // is using a buffered in
} }
const averagePeriod = 16 // period to do exponentially weighted averages over const averagePeriod = 16 // period to do exponentially weighted averages over
@ -63,12 +69,14 @@ func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name str
size: size, size: size,
name: name, name: name,
exit: make(chan struct{}), exit: make(chan struct{}),
values: accountValues{
avg: 0, avg: 0,
lpTime: time.Now(), lpTime: time.Now(),
max: -1, max: -1,
},
} }
if fs.Config.CutoffMode == fs.CutoffModeHard { if fs.Config.CutoffMode == fs.CutoffModeHard {
acc.max = int64((fs.Config.MaxTransfer)) acc.values.max = int64((fs.Config.MaxTransfer))
} }
go acc.averageLoop() go acc.averageLoop()
stats.inProgress.set(acc.name, acc) stats.inProgress.set(acc.name, acc)
@ -154,19 +162,19 @@ func (acc *Account) averageLoop() {
for { for {
select { select {
case now := <-tick.C: case now := <-tick.C:
acc.statmu.Lock() acc.values.mu.Lock()
// Add average of last second. // Add average of last second.
elapsed := now.Sub(acc.lpTime).Seconds() elapsed := now.Sub(acc.values.lpTime).Seconds()
avg := float64(acc.lpBytes) / elapsed avg := float64(acc.values.lpBytes) / elapsed
// Soft start the moving average // Soft start the moving average
if period < averagePeriod { if period < averagePeriod {
period++ period++
} }
acc.avg = (avg + (period-1)*acc.avg) / period acc.values.avg = (avg + (period-1)*acc.values.avg) / period
acc.lpBytes = 0 acc.values.lpBytes = 0
acc.lpTime = now acc.values.lpTime = now
// Unlock stats // Unlock stats
acc.statmu.Unlock() acc.values.mu.Unlock()
case <-acc.exit: case <-acc.exit:
return return
} }
@ -176,21 +184,21 @@ func (acc *Account) averageLoop() {
// Check the read before it has happened is valid returning the number // Check the read before it has happened is valid returning the number
// of bytes remaining to read. // of bytes remaining to read.
func (acc *Account) checkReadBefore() (bytesUntilLimit int64, err error) { func (acc *Account) checkReadBefore() (bytesUntilLimit int64, err error) {
acc.statmu.Lock() acc.values.mu.Lock()
if acc.max >= 0 { if acc.values.max >= 0 {
bytesUntilLimit = acc.max - acc.stats.GetBytes() bytesUntilLimit = acc.values.max - acc.stats.GetBytes()
if bytesUntilLimit < 0 { if bytesUntilLimit < 0 {
acc.statmu.Unlock() acc.values.mu.Unlock()
return bytesUntilLimit, ErrorMaxTransferLimitReachedFatal return bytesUntilLimit, ErrorMaxTransferLimitReachedFatal
} }
} else { } else {
bytesUntilLimit = 1 << 62 bytesUntilLimit = 1 << 62
} }
// Set start time. // Set start time.
if acc.start.IsZero() { if acc.values.start.IsZero() {
acc.start = time.Now() acc.values.start = time.Now()
} }
acc.statmu.Unlock() acc.values.mu.Unlock()
return bytesUntilLimit, nil return bytesUntilLimit, nil
} }
@ -212,20 +220,20 @@ func checkReadAfter(bytesUntilLimit int64, n int, err error) (outN int, outErr e
// //
// This pretends a transfer has started // This pretends a transfer has started
func (acc *Account) ServerSideCopyStart() { func (acc *Account) ServerSideCopyStart() {
acc.statmu.Lock() acc.values.mu.Lock()
// Set start time. // Set start time.
if acc.start.IsZero() { if acc.values.start.IsZero() {
acc.start = time.Now() acc.values.start = time.Now()
} }
acc.statmu.Unlock() acc.values.mu.Unlock()
} }
// ServerSideCopyEnd accounts for a read of n bytes in a sever side copy // ServerSideCopyEnd accounts for a read of n bytes in a sever side copy
func (acc *Account) ServerSideCopyEnd(n int64) { func (acc *Account) ServerSideCopyEnd(n int64) {
// Update Stats // Update Stats
acc.statmu.Lock() acc.values.mu.Lock()
acc.bytes += n acc.values.bytes += n
acc.statmu.Unlock() acc.values.mu.Unlock()
acc.stats.Bytes(n) acc.stats.Bytes(n)
} }
@ -233,10 +241,10 @@ func (acc *Account) ServerSideCopyEnd(n int64) {
// Account the read and limit bandwidth // Account the read and limit bandwidth
func (acc *Account) accountRead(n int) { func (acc *Account) accountRead(n int) {
// Update Stats // Update Stats
acc.statmu.Lock() acc.values.mu.Lock()
acc.lpBytes += n acc.values.lpBytes += n
acc.bytes += int64(n) acc.values.bytes += int64(n)
acc.statmu.Unlock() acc.values.mu.Unlock()
acc.stats.Bytes(int64(n)) acc.stats.Bytes(int64(n))
@ -340,9 +348,9 @@ func (acc *Account) progress() (bytes, size int64) {
if acc == nil { if acc == nil {
return 0, 0 return 0, 0
} }
acc.statmu.Lock() acc.values.mu.Lock()
bytes, size = acc.bytes, acc.size bytes, size = acc.values.bytes, acc.size
acc.statmu.Unlock() acc.values.mu.Unlock()
return bytes, size return bytes, size
} }
@ -353,15 +361,15 @@ func (acc *Account) speed() (bps, current float64) {
if acc == nil { if acc == nil {
return 0, 0 return 0, 0
} }
acc.statmu.Lock() acc.values.mu.Lock()
defer acc.statmu.Unlock() defer acc.values.mu.Unlock()
if acc.bytes == 0 { if acc.values.bytes == 0 {
return 0, 0 return 0, 0
} }
// Calculate speed from first read. // Calculate speed from first read.
total := float64(time.Now().Sub(acc.start)) / float64(time.Second) total := float64(time.Now().Sub(acc.values.start)) / float64(time.Second)
bps = float64(acc.bytes) / total bps = float64(acc.values.bytes) / total
current = acc.avg current = acc.values.avg
return return
} }
@ -372,9 +380,9 @@ func (acc *Account) eta() (etaDuration time.Duration, ok bool) {
if acc == nil { if acc == nil {
return 0, false return 0, false
} }
acc.statmu.Lock() acc.values.mu.Lock()
defer acc.statmu.Unlock() defer acc.values.mu.Unlock()
return eta(acc.bytes, acc.size, acc.avg) return eta(acc.values.bytes, acc.size, acc.values.avg)
} }
// shortenName shortens in to size runes long // shortenName shortens in to size runes long

View file

@ -91,11 +91,11 @@ func TestAccountRead(t *testing.T) {
stats := NewStats() stats := NewStats()
acc := newAccountSizeName(stats, in, 1, "test") acc := newAccountSizeName(stats, in, 1, "test")
assert.True(t, acc.start.IsZero()) assert.True(t, acc.values.start.IsZero())
acc.statmu.Lock() acc.values.mu.Lock()
assert.Equal(t, 0, acc.lpBytes) assert.Equal(t, 0, acc.values.lpBytes)
assert.Equal(t, int64(0), acc.bytes) assert.Equal(t, int64(0), acc.values.bytes)
acc.statmu.Unlock() acc.values.mu.Unlock()
assert.Equal(t, int64(0), stats.bytes) assert.Equal(t, int64(0), stats.bytes)
var buf = make([]byte, 2) var buf = make([]byte, 2)
@ -104,11 +104,11 @@ func TestAccountRead(t *testing.T) {
assert.Equal(t, 2, n) assert.Equal(t, 2, n)
assert.Equal(t, []byte{1, 2}, buf[:n]) assert.Equal(t, []byte{1, 2}, buf[:n])
assert.False(t, acc.start.IsZero()) assert.False(t, acc.values.start.IsZero())
acc.statmu.Lock() acc.values.mu.Lock()
assert.Equal(t, 2, acc.lpBytes) assert.Equal(t, 2, acc.values.lpBytes)
assert.Equal(t, int64(2), acc.bytes) assert.Equal(t, int64(2), acc.values.bytes)
acc.statmu.Unlock() acc.values.mu.Unlock()
assert.Equal(t, int64(2), stats.bytes) assert.Equal(t, int64(2), stats.bytes)
n, err = acc.Read(buf) n, err = acc.Read(buf)
@ -135,11 +135,11 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) {
acc = acc.WithBuffer() acc = acc.WithBuffer()
} }
assert.True(t, acc.start.IsZero()) assert.True(t, acc.values.start.IsZero())
acc.statmu.Lock() acc.values.mu.Lock()
assert.Equal(t, 0, acc.lpBytes) assert.Equal(t, 0, acc.values.lpBytes)
assert.Equal(t, int64(0), acc.bytes) assert.Equal(t, int64(0), acc.values.bytes)
acc.statmu.Unlock() acc.values.mu.Unlock()
assert.Equal(t, int64(0), stats.bytes) assert.Equal(t, int64(0), stats.bytes)
var out bytes.Buffer var out bytes.Buffer
@ -149,11 +149,11 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) {
assert.Equal(t, int64(len(buf)), n) assert.Equal(t, int64(len(buf)), n)
assert.Equal(t, buf, out.Bytes()) assert.Equal(t, buf, out.Bytes())
assert.False(t, acc.start.IsZero()) assert.False(t, acc.values.start.IsZero())
acc.statmu.Lock() acc.values.mu.Lock()
assert.Equal(t, len(buf), acc.lpBytes) assert.Equal(t, len(buf), acc.values.lpBytes)
assert.Equal(t, int64(len(buf)), acc.bytes) assert.Equal(t, int64(len(buf)), acc.values.bytes)
acc.statmu.Unlock() acc.values.mu.Unlock()
assert.Equal(t, int64(len(buf)), stats.bytes) assert.Equal(t, int64(len(buf)), stats.bytes)
assert.NoError(t, acc.Close()) assert.NoError(t, acc.Close())