retry: add circuit breaker to load method

If a file exhausts its retry attempts, then it is likely not accessible
the next time. Thus, immediately fail all load calls for that file to
avoid useless retries.
This commit is contained in:
Michael Eischer 2024-05-12 12:34:54 +02:00
parent 394c8ca3ed
commit 53d15bcd1b
2 changed files with 80 additions and 1 deletions

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
@ -20,6 +21,8 @@ type Backend struct {
MaxTries int
Report func(string, error, time.Duration)
Success func(string, int)
failedLoads sync.Map
}
// statically ensure that RetryBackend implements backend.Backend.
@ -132,15 +135,39 @@ func (be *Backend) Save(ctx context.Context, h backend.Handle, rd backend.Rewind
})
}
// Failed loads expire after an hour
var failedLoadExpiry = time.Hour
// Load returns a reader that yields the contents of the file at h at the
// given offset. If length is larger than zero, only a portion of the file
// is returned. rd must be closed after use. If an error is returned, the
// ReadCloser must be nil.
func (be *Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, consumer func(rd io.Reader) error) (err error) {
return be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
key := h
key.IsMetadata = false
// Implement the circuit breaker pattern for files that exhausted all retries due to a non-permanent error
if v, ok := be.failedLoads.Load(key); ok {
if time.Since(v.(time.Time)) > failedLoadExpiry {
be.failedLoads.Delete(key)
} else {
// fail immediately if the file was already problematic during the last hour
return fmt.Errorf("circuit breaker open for file %v", h)
}
}
err = be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
func() error {
return be.Backend.Load(ctx, h, length, offset, consumer)
})
if feature.Flag.Enabled(feature.BackendErrorRedesign) && err != nil && !be.IsPermanentError(err) {
// We've exhausted the retries, the file is likely inaccessible. By excluding permanent
// errors, not found or truncated files are not recorded.
be.failedLoads.LoadOrStore(key, time.Now())
}
return err
}
// Stat returns information about the File identified by h.

View file

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"io"
"strings"
"testing"
"time"
@ -303,6 +304,57 @@ func TestBackendLoadNotExists(t *testing.T) {
test.Equals(t, 1, attempt)
}
func TestBackendLoadCircuitBreaker(t *testing.T) {
// retry should not retry if the error matches IsPermanentError
notFound := errors.New("not found")
otherError := errors.New("something")
attempt := 0
be := mock.NewBackend()
be.IsPermanentErrorFn = func(err error) bool {
return errors.Is(err, notFound)
}
be.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
attempt++
return nil, otherError
}
nilRd := func(rd io.Reader) (err error) {
return nil
}
TestFastRetries(t)
retryBackend := New(be, 2, nil, nil)
// trip the circuit breaker for file "other"
err := retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd)
test.Equals(t, otherError, err, "unexpected error")
test.Equals(t, 3, attempt)
attempt = 0
err = retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd)
test.Assert(t, strings.Contains(err.Error(), "circuit breaker open for file"), "expected circuit breaker error, got %v")
test.Equals(t, 0, attempt)
// don't trip for permanent errors
be.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
attempt++
return nil, notFound
}
err = retryBackend.Load(context.TODO(), backend.Handle{Name: "notfound"}, 0, 0, nilRd)
test.Equals(t, notFound, err, "expected circuit breaker to only affect other file, got %v")
err = retryBackend.Load(context.TODO(), backend.Handle{Name: "notfound"}, 0, 0, nilRd)
test.Equals(t, notFound, err, "persistent error must not trigger circuit breaker, got %v")
// wait for circuit breaker to expire
time.Sleep(5 * time.Millisecond)
old := failedLoadExpiry
defer func() {
failedLoadExpiry = old
}()
failedLoadExpiry = 3 * time.Millisecond
err = retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd)
test.Equals(t, notFound, err, "expected circuit breaker to reset, got %v")
}
func TestBackendStatNotExists(t *testing.T) {
// stat should not retry if the error matches IsNotExist
notFound := errors.New("not found")