diff --git a/internal/backend/retry/backend_retry.go b/internal/backend/retry/backend_retry.go index 4f25e0c7c..31934ec96 100644 --- a/internal/backend/retry/backend_retry.go +++ b/internal/backend/retry/backend_retry.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "sync" "time" "github.com/cenkalti/backoff/v4" @@ -20,6 +21,8 @@ type Backend struct { MaxTries int Report func(string, error, time.Duration) Success func(string, int) + + failedLoads sync.Map } // statically ensure that RetryBackend implements backend.Backend. @@ -132,15 +135,39 @@ func (be *Backend) Save(ctx context.Context, h backend.Handle, rd backend.Rewind }) } +// Failed loads expire after an hour +var failedLoadExpiry = time.Hour + // Load returns a reader that yields the contents of the file at h at the // given offset. If length is larger than zero, only a portion of the file // is returned. rd must be closed after use. If an error is returned, the // ReadCloser must be nil. func (be *Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, consumer func(rd io.Reader) error) (err error) { - return be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset), + key := h + key.IsMetadata = false + + // Implement the circuit breaker pattern for files that exhausted all retries due to a non-permanent error + if v, ok := be.failedLoads.Load(key); ok { + if time.Since(v.(time.Time)) > failedLoadExpiry { + be.failedLoads.Delete(key) + } else { + // fail immediately if the file was already problematic during the last hour + return fmt.Errorf("circuit breaker open for file %v", h) + } + } + + err = be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset), func() error { return be.Backend.Load(ctx, h, length, offset, consumer) }) + + if feature.Flag.Enabled(feature.BackendErrorRedesign) && err != nil && !be.IsPermanentError(err) { + // We've exhausted the retries, the file is likely inaccessible. By excluding permanent + // errors, not found or truncated files are not recorded. + be.failedLoads.LoadOrStore(key, time.Now()) + } + + return err } // Stat returns information about the File identified by h. diff --git a/internal/backend/retry/backend_retry_test.go b/internal/backend/retry/backend_retry_test.go index 80964fb37..a515b0b7d 100644 --- a/internal/backend/retry/backend_retry_test.go +++ b/internal/backend/retry/backend_retry_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "strings" "testing" "time" @@ -303,6 +304,57 @@ func TestBackendLoadNotExists(t *testing.T) { test.Equals(t, 1, attempt) } +func TestBackendLoadCircuitBreaker(t *testing.T) { + // retry should not retry if the error matches IsPermanentError + notFound := errors.New("not found") + otherError := errors.New("something") + attempt := 0 + + be := mock.NewBackend() + be.IsPermanentErrorFn = func(err error) bool { + return errors.Is(err, notFound) + } + be.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) { + attempt++ + return nil, otherError + } + nilRd := func(rd io.Reader) (err error) { + return nil + } + + TestFastRetries(t) + retryBackend := New(be, 2, nil, nil) + // trip the circuit breaker for file "other" + err := retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd) + test.Equals(t, otherError, err, "unexpected error") + test.Equals(t, 3, attempt) + + attempt = 0 + err = retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd) + test.Assert(t, strings.Contains(err.Error(), "circuit breaker open for file"), "expected circuit breaker error, got %v") + test.Equals(t, 0, attempt) + + // don't trip for permanent errors + be.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) { + attempt++ + return nil, notFound + } + err = retryBackend.Load(context.TODO(), backend.Handle{Name: "notfound"}, 0, 0, nilRd) + test.Equals(t, notFound, err, "expected circuit breaker to only affect other file, got %v") + err = retryBackend.Load(context.TODO(), backend.Handle{Name: "notfound"}, 0, 0, nilRd) + test.Equals(t, notFound, err, "persistent error must not trigger circuit breaker, got %v") + + // wait for circuit breaker to expire + time.Sleep(5 * time.Millisecond) + old := failedLoadExpiry + defer func() { + failedLoadExpiry = old + }() + failedLoadExpiry = 3 * time.Millisecond + err = retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd) + test.Equals(t, notFound, err, "expected circuit breaker to reset, got %v") +} + func TestBackendStatNotExists(t *testing.T) { // stat should not retry if the error matches IsNotExist notFound := errors.New("not found")