package notifications import ( "container/list" "fmt" "sync" "time" "github.com/sirupsen/logrus" ) // NOTE(stevvooe): This file contains definitions for several utility sinks. // Typically, the broadcaster is the only sink that should be required // externally, but others are suitable for export if the need arises. Albeit, // the tight integration with endpoint metrics should be removed. // Broadcaster sends events to multiple, reliable Sinks. The goal of this // component is to dispatch events to configured endpoints. Reliability can be // provided by wrapping incoming sinks. type Broadcaster struct { sinks []Sink events chan []Event closed chan chan struct{} } // NewBroadcaster ... // Add appends one or more sinks to the list of sinks. The broadcaster // behavior will be affected by the properties of the sink. Generally, the // sink should accept all messages and deal with reliability on its own. Use // of EventQueue and RetryingSink should be used here. func NewBroadcaster(sinks ...Sink) *Broadcaster { b := Broadcaster{ sinks: sinks, events: make(chan []Event), closed: make(chan chan struct{}), } // Start the broadcaster go b.run() return &b } // Write accepts a block of events to be dispatched to all sinks. This method // will never fail and should never block (hopefully!). The caller cedes the // slice memory to the broadcaster and should not modify it after calling // write. func (b *Broadcaster) Write(events ...Event) error { select { case b.events <- events: case <-b.closed: return ErrSinkClosed } return nil } // Close the broadcaster, ensuring that all messages are flushed to the // underlying sink before returning. func (b *Broadcaster) Close() error { logrus.Infof("broadcaster: closing") select { case <-b.closed: // already closed return fmt.Errorf("broadcaster: already closed") default: // do a little chan handoff dance to synchronize closing closed := make(chan struct{}) b.closed <- closed close(b.closed) <-closed return nil } } // run is the main broadcast loop, started when the broadcaster is created. // Under normal conditions, it waits for events on the event channel. After // Close is called, this goroutine will exit. func (b *Broadcaster) run() { for { select { case block := <-b.events: for _, sink := range b.sinks { if err := sink.Write(block...); err != nil { logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err) } } case closing := <-b.closed: // close all the underlying sinks for _, sink := range b.sinks { if err := sink.Close(); err != nil { logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err) } } closing <- struct{}{} logrus.Debugf("broadcaster: closed") return } } } // eventQueue accepts all messages into a queue for asynchronous consumption // by a sink. It is unbounded and thread safe but the sink must be reliable or // events will be dropped. type eventQueue struct { sink Sink events *list.List listeners []eventQueueListener cond *sync.Cond mu sync.Mutex closed bool } // eventQueueListener is called when various events happen on the queue. type eventQueueListener interface { ingress(events ...Event) egress(events ...Event) } // newEventQueue returns a queue to the provided sink. If the updater is non- // nil, it will be called to update pending metrics on ingress and egress. func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue { eq := eventQueue{ sink: sink, events: list.New(), listeners: listeners, } eq.cond = sync.NewCond(&eq.mu) go eq.run() return &eq } // Write accepts the events into the queue, only failing if the queue has // beend closed. func (eq *eventQueue) Write(events ...Event) error { eq.mu.Lock() defer eq.mu.Unlock() if eq.closed { return ErrSinkClosed } for _, listener := range eq.listeners { listener.ingress(events...) } eq.events.PushBack(events) eq.cond.Signal() // signal waiters return nil } // Close shuts down the event queue, flushing func (eq *eventQueue) Close() error { eq.mu.Lock() defer eq.mu.Unlock() if eq.closed { return fmt.Errorf("eventqueue: already closed") } // set closed flag eq.closed = true eq.cond.Signal() // signal flushes queue eq.cond.Wait() // wait for signal from last flush return eq.sink.Close() } // run is the main goroutine to flush events to the target sink. func (eq *eventQueue) run() { for { block := eq.next() if block == nil { return // nil block means event queue is closed. } if err := eq.sink.Write(block...); err != nil { logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err) } for _, listener := range eq.listeners { listener.egress(block...) } } } // next encompasses the critical section of the run loop. When the queue is // empty, it will block on the condition. If new data arrives, it will wake // and return a block. When closed, a nil slice will be returned. func (eq *eventQueue) next() []Event { eq.mu.Lock() defer eq.mu.Unlock() for eq.events.Len() < 1 { if eq.closed { eq.cond.Broadcast() return nil } eq.cond.Wait() } front := eq.events.Front() block := front.Value.([]Event) eq.events.Remove(front) return block } // ignoredSink discards events with ignored target media types and actions. // passes the rest along. type ignoredSink struct { Sink ignoreMediaTypes map[string]bool ignoreActions map[string]bool } func newIgnoredSink(sink Sink, ignored []string, ignoreActions []string) Sink { if len(ignored) == 0 { return sink } ignoredMap := make(map[string]bool) for _, mediaType := range ignored { ignoredMap[mediaType] = true } ignoredActionsMap := make(map[string]bool) for _, action := range ignoreActions { ignoredActionsMap[action] = true } return &ignoredSink{ Sink: sink, ignoreMediaTypes: ignoredMap, ignoreActions: ignoredActionsMap, } } // Write discards events with ignored target media types and passes the rest // along. func (imts *ignoredSink) Write(events ...Event) error { var kept []Event for _, e := range events { if !imts.ignoreMediaTypes[e.Target.MediaType] { kept = append(kept, e) } } if len(kept) == 0 { return nil } var results []Event for _, e := range kept { if !imts.ignoreActions[e.Action] { results = append(results, e) } } if len(results) == 0 { return nil } return imts.Sink.Write(results...) } // retryingSink retries the write until success or an ErrSinkClosed is // returned. Underlying sink must have p > 0 of succeeding or the sink will // block. Internally, it is a circuit breaker retries to manage reset. // Concurrent calls to a retrying sink are serialized through the sink, // meaning that if one is in-flight, another will not proceed. type retryingSink struct { mu sync.Mutex sink Sink closed bool // circuit breaker heuristics failures struct { threshold int recent int last time.Time backoff time.Duration // time after which we retry after failure. } } type retryingSinkListener interface { active(events ...Event) retry(events ...Event) } // TODO(stevvooe): We are using circuit break here, which actually doesn't // make a whole lot of sense for this use case, since we always retry. Move // this to use bounded exponential backoff. // newRetryingSink returns a sink that will retry writes to a sink, backing // off on failure. Parameters threshold and backoff adjust the behavior of the // circuit breaker. func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink { rs := &retryingSink{ sink: sink, } rs.failures.threshold = threshold rs.failures.backoff = backoff return rs } // Write attempts to flush the events to the downstream sink until it succeeds // or the sink is closed. func (rs *retryingSink) Write(events ...Event) error { rs.mu.Lock() defer rs.mu.Unlock() retry: if rs.closed { return ErrSinkClosed } if !rs.proceed() { logrus.Warnf("%v encountered too many errors, backing off", rs.sink) rs.wait(rs.failures.backoff) goto retry } if err := rs.write(events...); err != nil { if err == ErrSinkClosed { // terminal! return err } logrus.Errorf("retryingsink: error writing events: %v, retrying", err) goto retry } return nil } // Close closes the sink and the underlying sink. func (rs *retryingSink) Close() error { rs.mu.Lock() defer rs.mu.Unlock() if rs.closed { return fmt.Errorf("retryingsink: already closed") } rs.closed = true return rs.sink.Close() } // write provides a helper that dispatches failure and success properly. Used // by write as the single-flight write call. func (rs *retryingSink) write(events ...Event) error { if err := rs.sink.Write(events...); err != nil { rs.failure() return err } rs.reset() return nil } // wait backoff time against the sink, unlocking so others can proceed. Should // only be called by methods that currently have the mutex. func (rs *retryingSink) wait(backoff time.Duration) { rs.mu.Unlock() defer rs.mu.Lock() // backoff here time.Sleep(backoff) } // reset marks a successful call. func (rs *retryingSink) reset() { rs.failures.recent = 0 rs.failures.last = time.Time{} } // failure records a failure. func (rs *retryingSink) failure() { rs.failures.recent++ rs.failures.last = time.Now().UTC() } // proceed returns true if the call should proceed based on circuit breaker // heuristics. func (rs *retryingSink) proceed() bool { return rs.failures.recent < rs.failures.threshold || time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff)) }