Move notifications package to distribution
Since the notifications package is now decoupled from storage, we are moving it to the root package. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
286a644948
commit
d2d46fca41
12 changed files with 0 additions and 1 deletions
337
notifications/sinks.go
Normal file
337
notifications/sinks.go
Normal file
|
@ -0,0 +1,337 @@
|
|||
package notifications
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
// NOTE(stevvooe): This file contains definitions for several utility sinks.
|
||||
// Typically, the broadcaster is the only sink that should be required
|
||||
// externally, but others are suitable for export if the need arises. Albeit,
|
||||
// the tight integration with endpoint metrics should be removed.
|
||||
|
||||
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
|
||||
// component is to dispatch events to configured endpoints. Reliability can be
|
||||
// provided by wrapping incoming sinks.
|
||||
type Broadcaster struct {
|
||||
sinks []Sink
|
||||
events chan []Event
|
||||
closed chan chan struct{}
|
||||
}
|
||||
|
||||
// NewBroadcaster ...
|
||||
// Add appends one or more sinks to the list of sinks. The broadcaster
|
||||
// behavior will be affected by the properties of the sink. Generally, the
|
||||
// sink should accept all messages and deal with reliability on its own. Use
|
||||
// of EventQueue and RetryingSink should be used here.
|
||||
func NewBroadcaster(sinks ...Sink) *Broadcaster {
|
||||
b := Broadcaster{
|
||||
sinks: sinks,
|
||||
events: make(chan []Event),
|
||||
closed: make(chan chan struct{}),
|
||||
}
|
||||
|
||||
// Start the broadcaster
|
||||
go b.run()
|
||||
|
||||
return &b
|
||||
}
|
||||
|
||||
// Write accepts a block of events to be dispatched to all sinks. This method
|
||||
// will never fail and should never block (hopefully!). The caller cedes the
|
||||
// slice memory to the broadcaster and should not modify it after calling
|
||||
// write.
|
||||
func (b *Broadcaster) Write(events ...Event) error {
|
||||
select {
|
||||
case b.events <- events:
|
||||
case <-b.closed:
|
||||
return ErrSinkClosed
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close the broadcaster, ensuring that all messages are flushed to the
|
||||
// underlying sink before returning.
|
||||
func (b *Broadcaster) Close() error {
|
||||
logrus.Infof("broadcaster: closing")
|
||||
select {
|
||||
case <-b.closed:
|
||||
// already closed
|
||||
return fmt.Errorf("broadcaster: already closed")
|
||||
default:
|
||||
// do a little chan handoff dance to synchronize closing
|
||||
closed := make(chan struct{})
|
||||
b.closed <- closed
|
||||
close(b.closed)
|
||||
<-closed
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// run is the main broadcast loop, started when the broadcaster is created.
|
||||
// Under normal conditions, it waits for events on the event channel. After
|
||||
// Close is called, this goroutine will exit.
|
||||
func (b *Broadcaster) run() {
|
||||
for {
|
||||
select {
|
||||
case block := <-b.events:
|
||||
for _, sink := range b.sinks {
|
||||
if err := sink.Write(block...); err != nil {
|
||||
logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
|
||||
}
|
||||
}
|
||||
case closing := <-b.closed:
|
||||
|
||||
// close all the underlying sinks
|
||||
for _, sink := range b.sinks {
|
||||
if err := sink.Close(); err != nil {
|
||||
logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
|
||||
}
|
||||
}
|
||||
closing <- struct{}{}
|
||||
|
||||
logrus.Debugf("broadcaster: closed")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// eventQueue accepts all messages into a queue for asynchronous consumption
|
||||
// by a sink. It is unbounded and thread safe but the sink must be reliable or
|
||||
// events will be dropped.
|
||||
type eventQueue struct {
|
||||
sink Sink
|
||||
events *list.List
|
||||
listeners []eventQueueListener
|
||||
cond *sync.Cond
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
// eventQueueListener is called when various events happen on the queue.
|
||||
type eventQueueListener interface {
|
||||
ingress(events ...Event)
|
||||
egress(events ...Event)
|
||||
}
|
||||
|
||||
// newEventQueue returns a queue to the provided sink. If the updater is non-
|
||||
// nil, it will be called to update pending metrics on ingress and egress.
|
||||
func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
|
||||
eq := eventQueue{
|
||||
sink: sink,
|
||||
events: list.New(),
|
||||
listeners: listeners,
|
||||
}
|
||||
|
||||
eq.cond = sync.NewCond(&eq.mu)
|
||||
go eq.run()
|
||||
return &eq
|
||||
}
|
||||
|
||||
// Write accepts the events into the queue, only failing if the queue has
|
||||
// beend closed.
|
||||
func (eq *eventQueue) Write(events ...Event) error {
|
||||
eq.mu.Lock()
|
||||
defer eq.mu.Unlock()
|
||||
|
||||
if eq.closed {
|
||||
return ErrSinkClosed
|
||||
}
|
||||
|
||||
for _, listener := range eq.listeners {
|
||||
listener.ingress(events...)
|
||||
}
|
||||
eq.events.PushBack(events)
|
||||
eq.cond.Signal() // signal waiters
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close shutsdown the event queue, flushing
|
||||
func (eq *eventQueue) Close() error {
|
||||
eq.mu.Lock()
|
||||
defer eq.mu.Unlock()
|
||||
|
||||
if eq.closed {
|
||||
return fmt.Errorf("eventqueue: already closed")
|
||||
}
|
||||
|
||||
// set closed flag
|
||||
eq.closed = true
|
||||
eq.cond.Signal() // signal flushes queue
|
||||
eq.cond.Wait() // wait for signal from last flush
|
||||
|
||||
return eq.sink.Close()
|
||||
}
|
||||
|
||||
// run is the main goroutine to flush events to the target sink.
|
||||
func (eq *eventQueue) run() {
|
||||
for {
|
||||
block := eq.next()
|
||||
|
||||
if block == nil {
|
||||
return // nil block means event queue is closed.
|
||||
}
|
||||
|
||||
if err := eq.sink.Write(block...); err != nil {
|
||||
logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
|
||||
}
|
||||
|
||||
for _, listener := range eq.listeners {
|
||||
listener.egress(block...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// next encompasses the critical section of the run loop. When the queue is
|
||||
// empty, it will block on the condition. If new data arrives, it will wake
|
||||
// and return a block. When closed, a nil slice will be returned.
|
||||
func (eq *eventQueue) next() []Event {
|
||||
eq.mu.Lock()
|
||||
defer eq.mu.Unlock()
|
||||
|
||||
for eq.events.Len() < 1 {
|
||||
if eq.closed {
|
||||
eq.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
eq.cond.Wait()
|
||||
}
|
||||
|
||||
front := eq.events.Front()
|
||||
block := front.Value.([]Event)
|
||||
eq.events.Remove(front)
|
||||
|
||||
return block
|
||||
}
|
||||
|
||||
// retryingSink retries the write until success or an ErrSinkClosed is
|
||||
// returned. Underlying sink must have p > 0 of succeeding or the sink will
|
||||
// block. Internally, it is a circuit breaker retries to manage reset.
|
||||
// Concurrent calls to a retrying sink are serialized through the sink,
|
||||
// meaning that if one is in-flight, another will not proceed.
|
||||
type retryingSink struct {
|
||||
mu sync.Mutex
|
||||
sink Sink
|
||||
closed bool
|
||||
|
||||
// circuit breaker hueristics
|
||||
failures struct {
|
||||
threshold int
|
||||
recent int
|
||||
last time.Time
|
||||
backoff time.Duration // time after which we retry after failure.
|
||||
}
|
||||
}
|
||||
|
||||
type retryingSinkListener interface {
|
||||
active(events ...Event)
|
||||
retry(events ...Event)
|
||||
}
|
||||
|
||||
// TODO(stevvooe): We are using circuit break here, which actually doesn't
|
||||
// make a whole lot of sense for this use case, since we always retry. Move
|
||||
// this to use bounded exponential backoff.
|
||||
|
||||
// newRetryingSink returns a sink that will retry writes to a sink, backing
|
||||
// off on failure. Parameters threshold and backoff adjust the behavior of the
|
||||
// circuit breaker.
|
||||
func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
|
||||
rs := &retryingSink{
|
||||
sink: sink,
|
||||
}
|
||||
rs.failures.threshold = threshold
|
||||
rs.failures.backoff = backoff
|
||||
|
||||
return rs
|
||||
}
|
||||
|
||||
// Write attempts to flush the events to the downstream sink until it succeeds
|
||||
// or the sink is closed.
|
||||
func (rs *retryingSink) Write(events ...Event) error {
|
||||
rs.mu.Lock()
|
||||
defer rs.mu.Unlock()
|
||||
|
||||
retry:
|
||||
|
||||
if rs.closed {
|
||||
return ErrSinkClosed
|
||||
}
|
||||
|
||||
if !rs.proceed() {
|
||||
logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
|
||||
rs.wait(rs.failures.backoff)
|
||||
goto retry
|
||||
}
|
||||
|
||||
if err := rs.write(events...); err != nil {
|
||||
if err == ErrSinkClosed {
|
||||
// terminal!
|
||||
return err
|
||||
}
|
||||
|
||||
logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
|
||||
goto retry
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the sink and the underlying sink.
|
||||
func (rs *retryingSink) Close() error {
|
||||
rs.mu.Lock()
|
||||
defer rs.mu.Unlock()
|
||||
|
||||
if rs.closed {
|
||||
return fmt.Errorf("retryingsink: already closed")
|
||||
}
|
||||
|
||||
rs.closed = true
|
||||
return rs.sink.Close()
|
||||
}
|
||||
|
||||
// write provides a helper that dispatches failure and success properly. Used
|
||||
// by write as the single-flight write call.
|
||||
func (rs *retryingSink) write(events ...Event) error {
|
||||
if err := rs.sink.Write(events...); err != nil {
|
||||
rs.failure()
|
||||
return err
|
||||
}
|
||||
|
||||
rs.reset()
|
||||
return nil
|
||||
}
|
||||
|
||||
// wait backoff time against the sink, unlocking so others can proceed. Should
|
||||
// only be called by methods that currently have the mutex.
|
||||
func (rs *retryingSink) wait(backoff time.Duration) {
|
||||
rs.mu.Unlock()
|
||||
defer rs.mu.Lock()
|
||||
|
||||
// backoff here
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
// reset marks a succesful call.
|
||||
func (rs *retryingSink) reset() {
|
||||
rs.failures.recent = 0
|
||||
rs.failures.last = time.Time{}
|
||||
}
|
||||
|
||||
// failure records a failure.
|
||||
func (rs *retryingSink) failure() {
|
||||
rs.failures.recent++
|
||||
rs.failures.last = time.Now().UTC()
|
||||
}
|
||||
|
||||
// proceed returns true if the call should proceed based on circuit breaker
|
||||
// hueristics.
|
||||
func (rs *retryingSink) proceed() bool {
|
||||
return rs.failures.recent < rs.failures.threshold ||
|
||||
time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue