distribution/vendor/github.com/docker/go-events/retry.go

261 lines
6.6 KiB
Go
Raw Permalink Normal View History

package events
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/sirupsen/logrus"
)
// RetryingSink retries the write until success or an ErrSinkClosed is
// returned. Underlying sink must have p > 0 of succeeding or the sink will
// block. Retry is configured with a RetryStrategy. Concurrent calls to a
// retrying sink are serialized through the sink, meaning that if one is
// in-flight, another will not proceed.
type RetryingSink struct {
sink Sink
strategy RetryStrategy
closed chan struct{}
once sync.Once
}
// NewRetryingSink returns a sink that will retry writes to a sink, backing
// off on failure. Parameters threshold and backoff adjust the behavior of the
// circuit breaker.
func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink {
rs := &RetryingSink{
sink: sink,
strategy: strategy,
closed: make(chan struct{}),
}
return rs
}
// Write attempts to flush the events to the downstream sink until it succeeds
// or the sink is closed.
func (rs *RetryingSink) Write(event Event) error {
logger := logrus.WithField("event", event)
retry:
select {
case <-rs.closed:
return ErrSinkClosed
default:
}
if backoff := rs.strategy.Proceed(event); backoff > 0 {
select {
case <-time.After(backoff):
// TODO(stevvooe): This branch holds up the next try. Before, we
// would simply break to the "retry" label and then possibly wait
// again. However, this requires all retry strategies to have a
// large probability of probing the sync for success, rather than
// just backing off and sending the request.
case <-rs.closed:
return ErrSinkClosed
}
}
if err := rs.sink.Write(event); err != nil {
if err == ErrSinkClosed {
// terminal!
return err
}
logger := logger.WithError(err) // shadow!!
if rs.strategy.Failure(event, err) {
logger.Errorf("retryingsink: dropped event")
return nil
}
logger.Errorf("retryingsink: error writing event, retrying")
goto retry
}
rs.strategy.Success(event)
return nil
}
// Close closes the sink and the underlying sink.
func (rs *RetryingSink) Close() error {
rs.once.Do(func() {
close(rs.closed)
})
return nil
}
func (rs *RetryingSink) String() string {
// Serialize a copy of the RetryingSink without the sync.Once, to avoid
// a data race.
rs2 := map[string]interface{}{
"sink": rs.sink,
"strategy": rs.strategy,
"closed": rs.closed,
}
return fmt.Sprint(rs2)
}
// RetryStrategy defines a strategy for retrying event sink writes.
//
// All methods should be goroutine safe.
type RetryStrategy interface {
// Proceed is called before every event send. If proceed returns a
// positive, non-zero integer, the retryer will back off by the provided
// duration.
//
// An event is provided, by may be ignored.
Proceed(event Event) time.Duration
// Failure reports a failure to the strategy. If this method returns true,
// the event should be dropped.
Failure(event Event, err error) bool
// Success should be called when an event is sent successfully.
Success(event Event)
}
// Breaker implements a circuit breaker retry strategy.
//
// The current implementation never drops events.
type Breaker struct {
threshold int
recent int
last time.Time
backoff time.Duration // time after which we retry after failure.
mu sync.Mutex
}
var _ RetryStrategy = &Breaker{}
// NewBreaker returns a breaker that will backoff after the threshold has been
// tripped. A Breaker is thread safe and may be shared by many goroutines.
func NewBreaker(threshold int, backoff time.Duration) *Breaker {
return &Breaker{
threshold: threshold,
backoff: backoff,
}
}
// Proceed checks the failures against the threshold.
func (b *Breaker) Proceed(event Event) time.Duration {
b.mu.Lock()
defer b.mu.Unlock()
if b.recent < b.threshold {
return 0
}
return b.last.Add(b.backoff).Sub(time.Now())
}
// Success resets the breaker.
func (b *Breaker) Success(event Event) {
b.mu.Lock()
defer b.mu.Unlock()
b.recent = 0
b.last = time.Time{}
}
// Failure records the failure and latest failure time.
func (b *Breaker) Failure(event Event, err error) bool {
b.mu.Lock()
defer b.mu.Unlock()
b.recent++
b.last = time.Now().UTC()
return false // never drop events.
}
var (
// DefaultExponentialBackoffConfig provides a default configuration for
// exponential backoff.
DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
Base: time.Second,
Factor: time.Second,
Max: 20 * time.Second,
}
)
// ExponentialBackoffConfig configures backoff parameters.
//
// Note that these parameters operate on the upper bound for choosing a random
// value. For example, at Base=1s, a random value in [0,1s) will be chosen for
// the backoff value.
type ExponentialBackoffConfig struct {
// Base is the minimum bound for backing off after failure.
Base time.Duration
// Factor sets the amount of time by which the backoff grows with each
// failure.
Factor time.Duration
// Max is the absolute maxiumum bound for a single backoff.
Max time.Duration
}
// ExponentialBackoff implements random backoff with exponentially increasing
// bounds as the number consecutive failures increase.
type ExponentialBackoff struct {
failures uint64 // consecutive failure counter (needs to be 64-bit aligned)
config ExponentialBackoffConfig
}
// NewExponentialBackoff returns an exponential backoff strategy with the
// desired config. If config is nil, the default is returned.
func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff {
return &ExponentialBackoff{
config: config,
}
}
// Proceed returns the next randomly bound exponential backoff time.
func (b *ExponentialBackoff) Proceed(event Event) time.Duration {
return b.backoff(atomic.LoadUint64(&b.failures))
}
// Success resets the failures counter.
func (b *ExponentialBackoff) Success(event Event) {
atomic.StoreUint64(&b.failures, 0)
}
// Failure increments the failure counter.
func (b *ExponentialBackoff) Failure(event Event, err error) bool {
atomic.AddUint64(&b.failures, 1)
return false
}
// backoff calculates the amount of time to wait based on the number of
// consecutive failures.
func (b *ExponentialBackoff) backoff(failures uint64) time.Duration {
if failures <= 0 {
// proceed normally when there are no failures.
return 0
}
factor := b.config.Factor
if factor <= 0 {
factor = DefaultExponentialBackoffConfig.Factor
}
backoff := b.config.Base + factor*time.Duration(1<<(failures-1))
max := b.config.Max
if max <= 0 {
max = DefaultExponentialBackoffConfig.Max
}
if backoff > max || backoff < 0 {
backoff = max
}
// Choose a uniformly distributed value from [0, backoff).
return time.Duration(rand.Int63n(int64(backoff)))
}