a97d7c0c15
Signed-off-by: Igor Morozov <igor@adhoc05-sjc1.prod.uber.internal>
375 lines
9.2 KiB
Go
375 lines
9.2 KiB
Go
package notifications
|
|
|
|
import (
|
|
"container/list"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// NOTE(stevvooe): This file contains definitions for several utility sinks.
|
|
// Typically, the broadcaster is the only sink that should be required
|
|
// externally, but others are suitable for export if the need arises. Albeit,
|
|
// the tight integration with endpoint metrics should be removed.
|
|
|
|
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
|
|
// component is to dispatch events to configured endpoints. Reliability can be
|
|
// provided by wrapping incoming sinks.
|
|
type Broadcaster struct {
|
|
sinks []Sink
|
|
events chan []Event
|
|
closed chan chan struct{}
|
|
}
|
|
|
|
// NewBroadcaster ...
|
|
// Add appends one or more sinks to the list of sinks. The broadcaster
|
|
// behavior will be affected by the properties of the sink. Generally, the
|
|
// sink should accept all messages and deal with reliability on its own. Use
|
|
// of EventQueue and RetryingSink should be used here.
|
|
func NewBroadcaster(sinks ...Sink) *Broadcaster {
|
|
b := Broadcaster{
|
|
sinks: sinks,
|
|
events: make(chan []Event),
|
|
closed: make(chan chan struct{}),
|
|
}
|
|
|
|
// Start the broadcaster
|
|
go b.run()
|
|
|
|
return &b
|
|
}
|
|
|
|
// Write accepts a block of events to be dispatched to all sinks. This method
|
|
// will never fail and should never block (hopefully!). The caller cedes the
|
|
// slice memory to the broadcaster and should not modify it after calling
|
|
// write.
|
|
func (b *Broadcaster) Write(events ...Event) error {
|
|
select {
|
|
case b.events <- events:
|
|
case <-b.closed:
|
|
return ErrSinkClosed
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close the broadcaster, ensuring that all messages are flushed to the
|
|
// underlying sink before returning.
|
|
func (b *Broadcaster) Close() error {
|
|
logrus.Infof("broadcaster: closing")
|
|
select {
|
|
case <-b.closed:
|
|
// already closed
|
|
return fmt.Errorf("broadcaster: already closed")
|
|
default:
|
|
// do a little chan handoff dance to synchronize closing
|
|
closed := make(chan struct{})
|
|
b.closed <- closed
|
|
close(b.closed)
|
|
<-closed
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// run is the main broadcast loop, started when the broadcaster is created.
|
|
// Under normal conditions, it waits for events on the event channel. After
|
|
// Close is called, this goroutine will exit.
|
|
func (b *Broadcaster) run() {
|
|
for {
|
|
select {
|
|
case block := <-b.events:
|
|
for _, sink := range b.sinks {
|
|
if err := sink.Write(block...); err != nil {
|
|
logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
|
|
}
|
|
}
|
|
case closing := <-b.closed:
|
|
|
|
// close all the underlying sinks
|
|
for _, sink := range b.sinks {
|
|
if err := sink.Close(); err != nil {
|
|
logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
|
|
}
|
|
}
|
|
closing <- struct{}{}
|
|
|
|
logrus.Debugf("broadcaster: closed")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// eventQueue accepts all messages into a queue for asynchronous consumption
|
|
// by a sink. It is unbounded and thread safe but the sink must be reliable or
|
|
// events will be dropped.
|
|
type eventQueue struct {
|
|
sink Sink
|
|
events *list.List
|
|
listeners []eventQueueListener
|
|
cond *sync.Cond
|
|
mu sync.Mutex
|
|
closed bool
|
|
}
|
|
|
|
// eventQueueListener is called when various events happen on the queue.
|
|
type eventQueueListener interface {
|
|
ingress(events ...Event)
|
|
egress(events ...Event)
|
|
}
|
|
|
|
// newEventQueue returns a queue to the provided sink. If the updater is non-
|
|
// nil, it will be called to update pending metrics on ingress and egress.
|
|
func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
|
|
eq := eventQueue{
|
|
sink: sink,
|
|
events: list.New(),
|
|
listeners: listeners,
|
|
}
|
|
|
|
eq.cond = sync.NewCond(&eq.mu)
|
|
go eq.run()
|
|
return &eq
|
|
}
|
|
|
|
// Write accepts the events into the queue, only failing if the queue has
|
|
// beend closed.
|
|
func (eq *eventQueue) Write(events ...Event) error {
|
|
eq.mu.Lock()
|
|
defer eq.mu.Unlock()
|
|
|
|
if eq.closed {
|
|
return ErrSinkClosed
|
|
}
|
|
|
|
for _, listener := range eq.listeners {
|
|
listener.ingress(events...)
|
|
}
|
|
eq.events.PushBack(events)
|
|
eq.cond.Signal() // signal waiters
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close shuts down the event queue, flushing
|
|
func (eq *eventQueue) Close() error {
|
|
eq.mu.Lock()
|
|
defer eq.mu.Unlock()
|
|
|
|
if eq.closed {
|
|
return fmt.Errorf("eventqueue: already closed")
|
|
}
|
|
|
|
// set closed flag
|
|
eq.closed = true
|
|
eq.cond.Signal() // signal flushes queue
|
|
eq.cond.Wait() // wait for signal from last flush
|
|
|
|
return eq.sink.Close()
|
|
}
|
|
|
|
// run is the main goroutine to flush events to the target sink.
|
|
func (eq *eventQueue) run() {
|
|
for {
|
|
block := eq.next()
|
|
|
|
if block == nil {
|
|
return // nil block means event queue is closed.
|
|
}
|
|
|
|
if err := eq.sink.Write(block...); err != nil {
|
|
logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
|
|
}
|
|
|
|
for _, listener := range eq.listeners {
|
|
listener.egress(block...)
|
|
}
|
|
}
|
|
}
|
|
|
|
// next encompasses the critical section of the run loop. When the queue is
|
|
// empty, it will block on the condition. If new data arrives, it will wake
|
|
// and return a block. When closed, a nil slice will be returned.
|
|
func (eq *eventQueue) next() []Event {
|
|
eq.mu.Lock()
|
|
defer eq.mu.Unlock()
|
|
|
|
for eq.events.Len() < 1 {
|
|
if eq.closed {
|
|
eq.cond.Broadcast()
|
|
return nil
|
|
}
|
|
|
|
eq.cond.Wait()
|
|
}
|
|
|
|
front := eq.events.Front()
|
|
block := front.Value.([]Event)
|
|
eq.events.Remove(front)
|
|
|
|
return block
|
|
}
|
|
|
|
// ignoredMediaTypesSink discards events with ignored target media types and
|
|
// passes the rest along.
|
|
type ignoredMediaTypesSink struct {
|
|
Sink
|
|
ignored map[string]bool
|
|
}
|
|
|
|
func newIgnoredMediaTypesSink(sink Sink, ignored []string) Sink {
|
|
if len(ignored) == 0 {
|
|
return sink
|
|
}
|
|
|
|
ignoredMap := make(map[string]bool)
|
|
for _, mediaType := range ignored {
|
|
ignoredMap[mediaType] = true
|
|
}
|
|
|
|
return &ignoredMediaTypesSink{
|
|
Sink: sink,
|
|
ignored: ignoredMap,
|
|
}
|
|
}
|
|
|
|
// Write discards events with ignored target media types and passes the rest
|
|
// along.
|
|
func (imts *ignoredMediaTypesSink) Write(events ...Event) error {
|
|
var kept []Event
|
|
for _, e := range events {
|
|
if !imts.ignored[e.Target.MediaType] {
|
|
kept = append(kept, e)
|
|
}
|
|
}
|
|
if len(kept) == 0 {
|
|
return nil
|
|
}
|
|
return imts.Sink.Write(kept...)
|
|
}
|
|
|
|
// retryingSink retries the write until success or an ErrSinkClosed is
|
|
// returned. Underlying sink must have p > 0 of succeeding or the sink will
|
|
// block. Internally, it is a circuit breaker retries to manage reset.
|
|
// Concurrent calls to a retrying sink are serialized through the sink,
|
|
// meaning that if one is in-flight, another will not proceed.
|
|
type retryingSink struct {
|
|
mu sync.Mutex
|
|
sink Sink
|
|
closed bool
|
|
|
|
// circuit breaker heuristics
|
|
failures struct {
|
|
threshold int
|
|
recent int
|
|
last time.Time
|
|
backoff time.Duration // time after which we retry after failure.
|
|
}
|
|
}
|
|
|
|
type retryingSinkListener interface {
|
|
active(events ...Event)
|
|
retry(events ...Event)
|
|
}
|
|
|
|
// TODO(stevvooe): We are using circuit break here, which actually doesn't
|
|
// make a whole lot of sense for this use case, since we always retry. Move
|
|
// this to use bounded exponential backoff.
|
|
|
|
// newRetryingSink returns a sink that will retry writes to a sink, backing
|
|
// off on failure. Parameters threshold and backoff adjust the behavior of the
|
|
// circuit breaker.
|
|
func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
|
|
rs := &retryingSink{
|
|
sink: sink,
|
|
}
|
|
rs.failures.threshold = threshold
|
|
rs.failures.backoff = backoff
|
|
|
|
return rs
|
|
}
|
|
|
|
// Write attempts to flush the events to the downstream sink until it succeeds
|
|
// or the sink is closed.
|
|
func (rs *retryingSink) Write(events ...Event) error {
|
|
rs.mu.Lock()
|
|
defer rs.mu.Unlock()
|
|
|
|
retry:
|
|
|
|
if rs.closed {
|
|
return ErrSinkClosed
|
|
}
|
|
|
|
if !rs.proceed() {
|
|
logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
|
|
rs.wait(rs.failures.backoff)
|
|
goto retry
|
|
}
|
|
|
|
if err := rs.write(events...); err != nil {
|
|
if err == ErrSinkClosed {
|
|
// terminal!
|
|
return err
|
|
}
|
|
|
|
logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
|
|
goto retry
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes the sink and the underlying sink.
|
|
func (rs *retryingSink) Close() error {
|
|
rs.mu.Lock()
|
|
defer rs.mu.Unlock()
|
|
|
|
if rs.closed {
|
|
return fmt.Errorf("retryingsink: already closed")
|
|
}
|
|
|
|
rs.closed = true
|
|
return rs.sink.Close()
|
|
}
|
|
|
|
// write provides a helper that dispatches failure and success properly. Used
|
|
// by write as the single-flight write call.
|
|
func (rs *retryingSink) write(events ...Event) error {
|
|
if err := rs.sink.Write(events...); err != nil {
|
|
rs.failure()
|
|
return err
|
|
}
|
|
|
|
rs.reset()
|
|
return nil
|
|
}
|
|
|
|
// wait backoff time against the sink, unlocking so others can proceed. Should
|
|
// only be called by methods that currently have the mutex.
|
|
func (rs *retryingSink) wait(backoff time.Duration) {
|
|
rs.mu.Unlock()
|
|
defer rs.mu.Lock()
|
|
|
|
// backoff here
|
|
time.Sleep(backoff)
|
|
}
|
|
|
|
// reset marks a successful call.
|
|
func (rs *retryingSink) reset() {
|
|
rs.failures.recent = 0
|
|
rs.failures.last = time.Time{}
|
|
}
|
|
|
|
// failure records a failure.
|
|
func (rs *retryingSink) failure() {
|
|
rs.failures.recent++
|
|
rs.failures.last = time.Now().UTC()
|
|
}
|
|
|
|
// proceed returns true if the call should proceed based on circuit breaker
|
|
// heuristics.
|
|
func (rs *retryingSink) proceed() bool {
|
|
return rs.failures.recent < rs.failures.threshold ||
|
|
time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
|
|
}
|