Merge pull request #3563 from matta/improve-eta

Improve the ETA displayed during backup
This commit is contained in:
Michael Eischer 2023-06-08 20:49:19 +02:00 committed by GitHub
commit 88c63a029c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 334 additions and 4 deletions

View file

@ -0,0 +1,11 @@
Enhancement: Improve the ETA displayed during backup
Restic's `backup` command displayed an ETA that did not adapt when the rate
of progress made during the backup changed during the course of the
backup. Restic now uses recent progress when computing the ETA. It is
important to realize that the estimate may still be wrong, because restic
cannot predict the future, but the hope is that the ETA will be more
accurate in most cases.
https://github.com/restic/restic/issues/3397
https://github.com/restic/restic/pull/3563

View file

@ -44,6 +44,7 @@ type Progress struct {
mu sync.Mutex
start time.Time
estimator rateEstimator
scanStarted, scanFinished bool
@ -60,6 +61,7 @@ func NewProgress(printer ProgressPrinter, interval time.Duration) *Progress {
start: time.Now(),
currentFiles: make(map[string]struct{}),
printer: printer,
estimator: *newRateEstimator(time.Now()),
}
p.Updater = *progress.NewUpdater(interval, func(runtime time.Duration, final bool) {
if final {
@ -73,9 +75,14 @@ func NewProgress(printer ProgressPrinter, interval time.Duration) *Progress {
var secondsRemaining uint64
if p.scanFinished {
secs := float64(runtime / time.Second)
rate := p.estimator.rate(time.Now())
tooSlowCutoff := 1024.
if rate <= tooSlowCutoff {
secondsRemaining = 0
} else {
todo := float64(p.total.Bytes - p.processed.Bytes)
secondsRemaining = uint64(secs / float64(p.processed.Bytes) * todo)
secondsRemaining = uint64(todo / rate)
}
}
p.printer.Update(p.total, p.processed, p.errors, p.currentFiles, p.start, secondsRemaining)
@ -105,6 +112,7 @@ func (p *Progress) addProcessed(c Counter) {
p.processed.Files += c.Files
p.processed.Dirs += c.Dirs
p.processed.Bytes += c.Bytes
p.estimator.recordBytes(time.Now(), c.Bytes)
p.scanStarted = true
}

View file

@ -0,0 +1,98 @@
package backup
import (
"container/list"
"time"
)
// rateBucket represents a one second window of recorded progress.
type rateBucket struct {
totalBytes uint64
end time.Time // the end of the time window, exclusive
}
// rateEstimator represents an estimate of the time to complete an operation.
type rateEstimator struct {
buckets *list.List
start time.Time
totalBytes uint64
}
// newRateEstimator returns an estimator initialized to a presumed start time.
func newRateEstimator(start time.Time) *rateEstimator {
return &rateEstimator{buckets: list.New(), start: start}
}
// See trim(), below.
const (
bucketWidth = time.Second
minRateEstimatorBytes = 100 * 1000 * 1000
minRateEstimatorBuckets = 20
minRateEstimatorMinutes = 2
)
// trim removes the oldest history from the estimator assuming a given
// current time.
func (r *rateEstimator) trim(now time.Time) {
// The estimator retains byte transfer counts over a two minute window.
// However, to avoid removing too much history when transfer rates are
// low, the estimator also retains a minimum number of processed bytes
// across a minimum number of buckets. An operation that is processing a
// significant number of bytes per second will typically retain only a
// two minute window's worth of information. One that is making slow
// progress, such as one being over a rate limited connection, typically
// observes bursts of updates as infrequently as every ten or twenty
// seconds, in which case the other limiters will kick in. This heuristic
// avoids wildly fluctuating estimates over rate limited connections.
start := now.Add(-minRateEstimatorMinutes * time.Minute)
for e := r.buckets.Front(); e != nil; e = r.buckets.Front() {
if r.buckets.Len() <= minRateEstimatorBuckets {
break
}
b := e.Value.(*rateBucket)
if b.end.After(start) {
break
}
total := r.totalBytes - b.totalBytes
if total < minRateEstimatorBytes {
break
}
r.start = b.end
r.totalBytes = total
r.buckets.Remove(e)
}
}
// recordBytes records the transfer of a number of bytes at a given
// time. Times passed in successive calls should advance monotonically (as
// is the case with time.Now().
func (r *rateEstimator) recordBytes(now time.Time, bytes uint64) {
if bytes == 0 {
return
}
var tail *rateBucket
if r.buckets.Len() > 0 {
tail = r.buckets.Back().Value.(*rateBucket)
}
if tail == nil || !tail.end.After(now) {
// The new bucket holds measurements in the time range [now .. now+1sec).
tail = &rateBucket{end: now.Add(bucketWidth)}
r.buckets.PushBack(tail)
}
tail.totalBytes += bytes
r.totalBytes += bytes
r.trim(now)
}
// rate returns an estimated bytes per second rate at a given time, or zero
// if there is not enough data to compute a rate.
func (r *rateEstimator) rate(now time.Time) float64 {
r.trim(now)
if !r.start.Before(now) {
return 0
}
elapsed := float64(now.Sub(r.start)) / float64(time.Second)
rate := float64(r.totalBytes) / elapsed
return rate
}

View file

@ -0,0 +1,213 @@
package backup
import (
"fmt"
"math"
"testing"
"time"
rtest "github.com/restic/restic/internal/test"
)
const float64EqualityThreshold = 1e-6
func almostEqual(a, b float64) bool {
if math.IsNaN(a) || math.IsNaN(b) {
panic("almostEqual passed a NaN")
}
return math.Abs(a-b) <= float64EqualityThreshold
}
func TestEstimatorDefault(t *testing.T) {
var start time.Time
e := newRateEstimator(start)
r := e.rate(start)
rtest.Assert(t, r == 0, "e.Rate == %v, want zero", r)
r = e.rate(start.Add(time.Hour))
rtest.Assert(t, r == 0, "e.Rate == %v, want zero", r)
}
func TestEstimatorSimple(t *testing.T) {
var start time.Time
type testcase struct {
bytes uint64
when time.Duration
rate float64
}
cases := []testcase{
{0, 0, 0},
{1, time.Second, 1},
{60, time.Second, 60},
{60, time.Minute, 1},
}
for _, c := range cases {
name := fmt.Sprintf("%+v", c)
t.Run(name, func(t *testing.T) {
e := newRateEstimator(start)
e.recordBytes(start.Add(time.Second), c.bytes)
rate := e.rate(start.Add(c.when))
rtest.Assert(t, almostEqual(rate, c.rate), "e.Rate == %v, want %v", rate, c.rate)
})
}
}
func TestBucketWidth(t *testing.T) {
var when time.Time
// Recording byte transfers within a bucket width's time window uses one
// bucket.
e := newRateEstimator(when)
e.recordBytes(when, 1)
e.recordBytes(when.Add(bucketWidth-time.Nanosecond), 1)
rtest.Assert(t, e.buckets.Len() == 1, "e.buckets.Len() is %d, want 1", e.buckets.Len())
b := e.buckets.Back().Value.(*rateBucket)
rtest.Assert(t, b.totalBytes == 2, "b.totalBytes is %d, want 2", b.totalBytes)
rtest.Assert(t, b.end == when.Add(bucketWidth), "b.end is %v, want %v", b.end, when.Add(bucketWidth))
// Recording a byte outside the bucket width causes another bucket.
e.recordBytes(when.Add(bucketWidth), 1)
rtest.Assert(t, e.buckets.Len() == 2, "e.buckets.Len() is %d, want 2", e.buckets.Len())
b = e.buckets.Back().Value.(*rateBucket)
rtest.Assert(t, b.totalBytes == 1, "b.totalBytes is %d, want 1", b.totalBytes)
rtest.Assert(t, b.end == when.Add(2*bucketWidth), "b.end is %v, want %v", b.end, when.Add(bucketWidth))
// Recording a byte after a longer delay creates a sparse bucket list.
e.recordBytes(when.Add(time.Hour+time.Millisecond), 7)
rtest.Assert(t, e.buckets.Len() == 3, "e.buckets.Len() is %d, want 3", e.buckets.Len())
b = e.buckets.Back().Value.(*rateBucket)
rtest.Assert(t, b.totalBytes == 7, "b.totalBytes is %d, want 7", b.totalBytes)
rtest.Equals(t, when.Add(time.Hour+time.Millisecond+time.Second), b.end)
}
type chunk struct {
repetitions uint64 // repetition count
bytes uint64 // byte count (every second)
}
func applyChunks(chunks []chunk, t time.Time, e *rateEstimator) time.Time {
for _, c := range chunks {
for i := uint64(0); i < c.repetitions; i++ {
e.recordBytes(t, c.bytes)
t = t.Add(time.Second)
}
}
return t
}
func TestEstimatorResponsiveness(t *testing.T) {
type testcase struct {
description string
chunks []chunk
rate float64
}
cases := []testcase{
{
"1000 bytes/sec over one second",
[]chunk{
{1, 1000},
},
1000,
},
{
"1000 bytes/sec over one minute",
[]chunk{
{60, 1000},
},
1000,
},
{
"1000 bytes/sec for 10 seconds, then 2000 bytes/sec for 10 seconds",
[]chunk{
{10, 1000},
{10, 2000},
},
1500,
},
{
"1000 bytes/sec for one minute, then 2000 bytes/sec for one minute",
[]chunk{
{60, 1000},
{60, 2000},
},
1500,
},
{
"rate doubles after 30 seconds",
[]chunk{
{30, minRateEstimatorBytes},
{90, 2 * minRateEstimatorBytes},
},
minRateEstimatorBytes * 1.75,
},
{
"rate doubles after 31 seconds",
[]chunk{
{31, minRateEstimatorBytes},
{90, 2 * minRateEstimatorBytes},
},
// The expected rate is the same as the prior test case because the
// first second has rolled off the estimator.
minRateEstimatorBytes * 1.75,
},
{
"rate doubles after 90 seconds",
[]chunk{
{90, minRateEstimatorBytes},
{90, 2 * minRateEstimatorBytes},
},
// The expected rate is the same as the prior test case because the
// first 60 seconds have rolled off the estimator.
minRateEstimatorBytes * 1.75,
},
{
"rate doubles for two full minutes",
[]chunk{
{60, minRateEstimatorBytes},
{120, 2 * minRateEstimatorBytes},
},
2 * minRateEstimatorBytes,
},
{
"rate falls to zero",
[]chunk{
{30, minRateEstimatorBytes},
{30, 0},
},
minRateEstimatorBytes / 2,
},
{
"rate falls to zero for extended time",
[]chunk{
{60, 1000},
{300, 0},
},
1000 * 60 / (60 + 300.0),
},
{
"rate falls to zero for extended time (from high rate)",
[]chunk{
{2 * minRateEstimatorBuckets, minRateEstimatorBytes},
{300, 0},
},
// Expect that only minRateEstimatorBuckets buckets are used in the
// rate estimate.
minRateEstimatorBytes * minRateEstimatorBuckets /
(minRateEstimatorBuckets + 300.0),
},
}
for _, c := range cases {
t.Run(c.description, func(t *testing.T) {
var w time.Time
e := newRateEstimator(w)
w = applyChunks(c.chunks, w, e)
r := e.rate(w)
rtest.Assert(t, almostEqual(r, c.rate), "e.Rate == %f, want %f", r, c.rate)
})
}
}