179 lines
4.2 KiB
Go
179 lines
4.2 KiB
Go
|
package events
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/sirupsen/logrus"
|
||
|
)
|
||
|
|
||
|
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
|
||
|
// component is to dispatch events to configured endpoints. Reliability can be
|
||
|
// provided by wrapping incoming sinks.
|
||
|
type Broadcaster struct {
|
||
|
sinks []Sink
|
||
|
events chan Event
|
||
|
adds chan configureRequest
|
||
|
removes chan configureRequest
|
||
|
|
||
|
shutdown chan struct{}
|
||
|
closed chan struct{}
|
||
|
once sync.Once
|
||
|
}
|
||
|
|
||
|
// NewBroadcaster appends one or more sinks to the list of sinks. The
|
||
|
// broadcaster behavior will be affected by the properties of the sink.
|
||
|
// Generally, the sink should accept all messages and deal with reliability on
|
||
|
// its own. Use of EventQueue and RetryingSink should be used here.
|
||
|
func NewBroadcaster(sinks ...Sink) *Broadcaster {
|
||
|
b := Broadcaster{
|
||
|
sinks: sinks,
|
||
|
events: make(chan Event),
|
||
|
adds: make(chan configureRequest),
|
||
|
removes: make(chan configureRequest),
|
||
|
shutdown: make(chan struct{}),
|
||
|
closed: make(chan struct{}),
|
||
|
}
|
||
|
|
||
|
// Start the broadcaster
|
||
|
go b.run()
|
||
|
|
||
|
return &b
|
||
|
}
|
||
|
|
||
|
// Write accepts an event to be dispatched to all sinks. This method will never
|
||
|
// fail and should never block (hopefully!). The caller cedes the memory to the
|
||
|
// broadcaster and should not modify it after calling write.
|
||
|
func (b *Broadcaster) Write(event Event) error {
|
||
|
select {
|
||
|
case b.events <- event:
|
||
|
case <-b.closed:
|
||
|
return ErrSinkClosed
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Add the sink to the broadcaster.
|
||
|
//
|
||
|
// The provided sink must be comparable with equality. Typically, this just
|
||
|
// works with a regular pointer type.
|
||
|
func (b *Broadcaster) Add(sink Sink) error {
|
||
|
return b.configure(b.adds, sink)
|
||
|
}
|
||
|
|
||
|
// Remove the provided sink.
|
||
|
func (b *Broadcaster) Remove(sink Sink) error {
|
||
|
return b.configure(b.removes, sink)
|
||
|
}
|
||
|
|
||
|
type configureRequest struct {
|
||
|
sink Sink
|
||
|
response chan error
|
||
|
}
|
||
|
|
||
|
func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
|
||
|
response := make(chan error, 1)
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case ch <- configureRequest{
|
||
|
sink: sink,
|
||
|
response: response}:
|
||
|
ch = nil
|
||
|
case err := <-response:
|
||
|
return err
|
||
|
case <-b.closed:
|
||
|
return ErrSinkClosed
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Close the broadcaster, ensuring that all messages are flushed to the
|
||
|
// underlying sink before returning.
|
||
|
func (b *Broadcaster) Close() error {
|
||
|
b.once.Do(func() {
|
||
|
close(b.shutdown)
|
||
|
})
|
||
|
|
||
|
<-b.closed
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// run is the main broadcast loop, started when the broadcaster is created.
|
||
|
// Under normal conditions, it waits for events on the event channel. After
|
||
|
// Close is called, this goroutine will exit.
|
||
|
func (b *Broadcaster) run() {
|
||
|
defer close(b.closed)
|
||
|
remove := func(target Sink) {
|
||
|
for i, sink := range b.sinks {
|
||
|
if sink == target {
|
||
|
b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case event := <-b.events:
|
||
|
for _, sink := range b.sinks {
|
||
|
if err := sink.Write(event); err != nil {
|
||
|
if err == ErrSinkClosed {
|
||
|
// remove closed sinks
|
||
|
remove(sink)
|
||
|
continue
|
||
|
}
|
||
|
logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
|
||
|
Errorf("broadcaster: dropping event")
|
||
|
}
|
||
|
}
|
||
|
case request := <-b.adds:
|
||
|
// while we have to iterate for add/remove, common iteration for
|
||
|
// send is faster against slice.
|
||
|
|
||
|
var found bool
|
||
|
for _, sink := range b.sinks {
|
||
|
if request.sink == sink {
|
||
|
found = true
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if !found {
|
||
|
b.sinks = append(b.sinks, request.sink)
|
||
|
}
|
||
|
// b.sinks[request.sink] = struct{}{}
|
||
|
request.response <- nil
|
||
|
case request := <-b.removes:
|
||
|
remove(request.sink)
|
||
|
request.response <- nil
|
||
|
case <-b.shutdown:
|
||
|
// close all the underlying sinks
|
||
|
for _, sink := range b.sinks {
|
||
|
if err := sink.Close(); err != nil && err != ErrSinkClosed {
|
||
|
logrus.WithField("events.sink", sink).WithError(err).
|
||
|
Errorf("broadcaster: closing sink failed")
|
||
|
}
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (b *Broadcaster) String() string {
|
||
|
// Serialize copy of this broadcaster without the sync.Once, to avoid
|
||
|
// a data race.
|
||
|
|
||
|
b2 := map[string]interface{}{
|
||
|
"sinks": b.sinks,
|
||
|
"events": b.events,
|
||
|
"adds": b.adds,
|
||
|
"removes": b.removes,
|
||
|
|
||
|
"shutdown": b.shutdown,
|
||
|
"closed": b.closed,
|
||
|
}
|
||
|
|
||
|
return fmt.Sprint(b2)
|
||
|
}
|