Display individual transfer progress

Improve progress printing by displaying individual file progress, as well
as a moving average speed with ETA. Example output:

2015/09/15 16:38:21
Transferred:    183599104 Bytes (4646.49 kByte/s)
Errors:                 0
Checks:                 1
Transferred:            0
Elapsed time:       38.5s
Transferring:
 * 01_06_14.mp3: 33% done. avg: 1280.5, cur: 1288.8 kByte/s. ETA: 1m12s
 * 01_12_15.mp3: 33% done. avg: 1002.2, cur:  943.4 kByte/s. ETA: 1m17s
 * 01_13_14.mp3: 48% done. avg: 1456.8, cur: 1425.2 kByte/s. ETA: 39s
 * 01_19_15.mp3: 28% done. avg: 1226.9, cur: 1114.4 kByte/s. ETA: 1m37s
This commit is contained in:
klauspost 2015-09-15 16:46:06 +02:00 committed by Nick Craig-Wood
parent 3219334c3e
commit 0a5870208e
2 changed files with 218 additions and 40 deletions

View file

@ -7,10 +7,12 @@ import (
"fmt"
"io"
"log"
"sort"
"strings"
"sync"
"time"
"github.com/VividCortex/ewma"
"github.com/tsenart/tb"
)
@ -28,21 +30,63 @@ func startTokenBucket() {
}
}
// Stringset holds some strings
type StringSet map[string]bool
// stringSet holds a set of strings
type stringSet map[string]struct{}
// Strings returns all the strings in the StringSet
func (ss StringSet) Strings() []string {
strings := make([]string, 0, len(ss))
for k := range ss {
strings = append(strings, k)
}
return strings
// inProgress holds a synchronizes map of in progress transfers
type inProgress struct {
mu sync.Mutex
m map[string]*Account
}
// String returns all the strings in the StringSet joined by comma
func (ss StringSet) String() string {
return strings.Join(ss.Strings(), ", ")
// newInProgress makes a new inProgress object
func newInProgress() *inProgress {
return &inProgress{
m: make(map[string]*Account, Config.Transfers),
}
}
// set marks the name as in progress
func (ip *inProgress) set(name string, acc *Account) {
ip.mu.Lock()
defer ip.mu.Unlock()
ip.m[name] = acc
}
// clear marks the name as no longer in progress
func (ip *inProgress) clear(name string) {
ip.mu.Lock()
defer ip.mu.Unlock()
delete(ip.m, name)
}
// get gets the account for name, of nil if not found
func (ip *inProgress) get(name string) *Account {
ip.mu.Lock()
defer ip.mu.Unlock()
return ip.m[name]
}
// Strings returns all the strings in the stringSet
func (ss stringSet) Strings() []string {
strings := make([]string, 0, len(ss))
for name, _ := range ss {
var out string
if acc := Stats.inProgress.get(name); acc != nil {
out = acc.String()
} else {
out = name
}
strings = append(strings, " * "+out)
}
sorted := sort.StringSlice(strings)
sorted.Sort()
return sorted
}
// String returns all the file names in the stringSet joined by newline
func (ss stringSet) String() string {
return strings.Join(ss.Strings(), "\n")
}
// Stats limits and accounts all transfers
@ -51,18 +95,20 @@ type StatsInfo struct {
bytes int64
errors int64
checks int64
checking StringSet
checking stringSet
transfers int64
transferring StringSet
transferring stringSet
start time.Time
inProgress *inProgress
}
// NewStats cretates an initialised StatsInfo
func NewStats() *StatsInfo {
return &StatsInfo{
checking: make(StringSet, Config.Checkers),
transferring: make(StringSet, Config.Transfers),
checking: make(stringSet, Config.Checkers),
transferring: make(stringSet, Config.Transfers),
start: time.Now(),
inProgress: newInProgress(),
}
}
@ -76,24 +122,25 @@ func (s *StatsInfo) String() string {
if dt > 0 {
speed = float64(s.bytes) / 1024 / dt_seconds
}
dt_rounded := dt - (dt % (time.Second / 10))
buf := &bytes.Buffer{}
fmt.Fprintf(buf, `
Transferred: %10d Bytes (%7.2f kByte/s)
Errors: %10d
Checks: %10d
Transferred: %10d
Elapsed time: %v
Elapsed time: %10v
`,
s.bytes, speed,
s.errors,
s.checks,
s.transfers,
dt)
dt_rounded)
if len(s.checking) > 0 {
fmt.Fprintf(buf, "Checking: %s\n", s.checking)
fmt.Fprintf(buf, "Checking:\n%s\n", s.checking)
}
if len(s.transferring) > 0 {
fmt.Fprintf(buf, "Transferring: %s\n", s.transferring)
fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring)
}
return buf.String()
}
@ -159,7 +206,7 @@ func (s *StatsInfo) Error() {
func (s *StatsInfo) Checking(o Object) {
s.lock.Lock()
defer s.lock.Unlock()
s.checking[o.Remote()] = true
s.checking[o.Remote()] = struct{}{}
}
// DoneChecking removes a check from the stats
@ -181,7 +228,7 @@ func (s *StatsInfo) GetTransfers() int64 {
func (s *StatsInfo) Transferring(o Object) {
s.lock.Lock()
defer s.lock.Unlock()
s.transferring[o.Remote()] = true
s.transferring[o.Remote()] = struct{}{}
}
// DoneTransferring removes a transfer from the stats
@ -199,15 +246,52 @@ type Account struct {
// in http transport calls Read() after Do() returns on
// CancelRequest so this race can happen when it apparently
// shouldn't.
mu sync.Mutex
in io.ReadCloser
bytes int64
mu sync.Mutex
in io.ReadCloser
size int64
name string
statmu sync.Mutex // Separate mutex for stat values.
bytes int64 // Total number of bytes read
start time.Time // Start time of first read
lpTime time.Time // Time of last average measurement
lpBytes int // Number of bytes read since last measurement
avg ewma.MovingAverage // Moving average of last few measurements
exit chan struct{} // channel that will be closed when transfer is finished
}
// NewAccount makes a Account reader
func NewAccount(in io.ReadCloser) *Account {
return &Account{
in: in,
// NewAccount makes a Account reader for an object
func NewAccount(in io.ReadCloser, obj Object) *Account {
acc := &Account{
in: in,
size: obj.Size(),
name: obj.Remote(),
exit: make(chan struct{}),
avg: ewma.NewMovingAverage(),
lpTime: time.Now(),
}
go acc.averageLoop()
Stats.inProgress.set(acc.name, acc)
return acc
}
func (file *Account) averageLoop() {
tick := time.NewTicker(time.Second)
defer tick.Stop()
for {
select {
case now := <-tick.C:
file.statmu.Lock()
// Add average of last second.
elapsed := now.Sub(file.lpTime).Seconds()
avg := float64(file.lpBytes) / elapsed
file.avg.Add(avg)
file.lpBytes = 0
file.lpTime = now
// Unlock stats
file.statmu.Unlock()
case <-file.exit:
return
}
}
}
@ -215,12 +299,24 @@ func NewAccount(in io.ReadCloser) *Account {
func (file *Account) Read(p []byte) (n int, err error) {
file.mu.Lock()
defer file.mu.Unlock()
n, err = file.in.Read(p)
file.bytes += int64(n)
Stats.Bytes(int64(n))
if err == io.EOF {
// FIXME Do something?
// Set start time.
file.statmu.Lock()
if file.start.IsZero() {
file.start = time.Now()
}
file.statmu.Unlock()
n, err = file.in.Read(p)
// Update Stats
file.statmu.Lock()
file.lpBytes += n
file.bytes += int64(n)
file.statmu.Unlock()
Stats.Bytes(int64(n))
// Limit the transfer speed if required
if tokenBucket != nil {
tokenBucket.Wait(int64(n))
@ -228,11 +324,93 @@ func (file *Account) Read(p []byte) (n int, err error) {
return
}
// Returns bytes read as well as the size.
// Size can be <= 0 if the size is unknown.
func (file *Account) Progress() (bytes, size int64) {
if file == nil {
return 0, 0
}
file.statmu.Lock()
if bytes > size {
size = 0
}
defer file.statmu.Unlock()
return file.bytes, file.size
}
// Speed returns the speed of the current file transfer
// in bytes per second, as well a an exponentially weighted moving average
// If no read has completed yet, 0 is returned for both values.
func (file *Account) Speed() (bps, current float64) {
if file == nil {
return 0, 0
}
file.statmu.Lock()
defer file.statmu.Unlock()
if file.bytes == 0 {
return 0, 0
}
// Calculate speed from first read.
total := float64(time.Now().Sub(file.start)) / float64(time.Second)
bps = float64(file.bytes) / total
current = file.avg.Value()
return
}
// ETA returns the ETA of the current operation,
// rounded to full seconds.
// If the ETA cannot be determined 'ok' returns false.
func (file *Account) ETA() (eta time.Duration, ok bool) {
if file == nil || file.size <= 0 {
return 0, false
}
file.statmu.Lock()
defer file.statmu.Unlock()
if file.bytes == 0 {
return 0, false
}
left := file.size - file.bytes
if left <= 0 {
return 0, true
}
avg := file.avg.Value()
if avg <= 0 {
return 0, false
}
seconds := float64(left) / file.avg.Value()
return time.Duration(time.Second * time.Duration(int(seconds))), true
}
// String produces stats for this file
func (file *Account) String() string {
a, b := file.Progress()
avg, cur := file.Speed()
eta, etaok := file.ETA()
etas := "-"
if etaok {
if eta > 0 {
etas = fmt.Sprintf("%v", eta)
} else {
etas = "0s"
}
}
name := []rune(file.name)
if len(name) > 25 {
name = name[:25]
}
if b <= 0 {
return fmt.Sprintf("%s: avg:%7.1f, cur: %6.1f kByte/s. ETA: %s", string(name), avg/1024, cur/1024, etas)
}
return fmt.Sprintf("%s: %2d%% done. avg: %6.1f, cur: %6.1f kByte/s. ETA: %s", string(name), int(100*float64(a)/float64(b)), avg/1024, cur/1024, etas)
}
// Close the object
func (file *Account) Close() error {
close(file.exit)
file.mu.Lock()
defer file.mu.Unlock()
// FIXME do something?
Stats.inProgress.clear(file.name)
return file.in.Close()
}

View file

@ -196,7 +196,7 @@ tryAgain:
ErrorLog(src, "Failed to open: %s", err)
return
}
in := NewAccount(in0) // account the transfer
in := NewAccount(in0, src) // account the transfer
if doUpdate {
actionTaken = "Copied (updated existing)"
@ -621,7 +621,7 @@ func syncFprintf(w io.Writer, format string, a ...interface{}) (n int, err error
return fmt.Fprintf(w, format, a...)
}
// List the Fs to stdout
// List the Fs to the supplied writer
//
// Shows size and path
//
@ -632,7 +632,7 @@ func List(f Fs, w io.Writer) error {
})
}
// List the Fs to stdout
// List the Fs to the supplied writer
//
// Shows size, mod time and path
//
@ -646,7 +646,7 @@ func ListLong(f Fs, w io.Writer) error {
})
}
// List the Fs to stdout
// List the Fs to the supplied writer
//
// Produces the same output as the md5sum command
//
@ -664,7 +664,7 @@ func Md5sum(f Fs, w io.Writer) error {
})
}
// List the directories/buckets/containers in the Fs to stdout
// List the directories/buckets/containers in the Fs to the supplied writer
func ListDir(f Fs, w io.Writer) error {
for dir := range f.ListDir() {
syncFprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)