package notifications import ( "bytes" "encoding/json" "fmt" "net/http" "sync" "time" ) // httpSink implements a single-flight, http notification endpoint. This is // very lightweight in that it only makes an attempt at an http request. // Reliability should be provided by the caller. type httpSink struct { url string mu sync.Mutex closed bool client *http.Client listeners []httpStatusListener // TODO(stevvooe): Allow one to configure the media type accepted by this // sink and choose the serialization based on that. } // newHTTPSink returns an unreliable, single-flight http sink. Wrap in other // sinks for increased reliability. func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink { return &httpSink{ url: u, listeners: listeners, client: &http.Client{ Transport: &headerRoundTripper{ Transport: http.DefaultTransport.(*http.Transport), headers: headers, }, Timeout: timeout, }, } } // httpStatusListener is called on various outcomes of sending notifications. type httpStatusListener interface { success(status int, events ...Event) failure(status int, events ...Event) err(err error, events ...Event) } // Accept makes an attempt to notify the endpoint, returning an error if it // fails. It is the caller's responsibility to retry on error. The events are // accepted or rejected as a group. func (hs *httpSink) Write(events ...Event) error { hs.mu.Lock() defer hs.mu.Unlock() defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections() if hs.closed { return ErrSinkClosed } envelope := Envelope{ Events: events, } // TODO(stevvooe): It is not ideal to keep re-encoding the request body on // retry but we are going to do it to keep the code simple. It is likely // we could change the event struct to manage its own buffer. p, err := json.MarshalIndent(envelope, "", " ") if err != nil { for _, listener := range hs.listeners { listener.err(err, events...) } return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err) } body := bytes.NewReader(p) resp, err := hs.client.Post(hs.url, EventsMediaType, body) if err != nil { for _, listener := range hs.listeners { listener.err(err, events...) } return fmt.Errorf("%v: error posting: %v", hs, err) } defer resp.Body.Close() // The notifier will treat any 2xx or 3xx response as accepted by the // endpoint. switch { case resp.StatusCode >= 200 && resp.StatusCode < 400: for _, listener := range hs.listeners { listener.success(resp.StatusCode, events...) } // TODO(stevvooe): This is a little accepting: we may want to support // unsupported media type responses with retries using the correct // media type. There may also be cases that will never work. return nil default: for _, listener := range hs.listeners { listener.failure(resp.StatusCode, events...) } return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status) } } // Close the endpoint func (hs *httpSink) Close() error { hs.mu.Lock() defer hs.mu.Unlock() if hs.closed { return fmt.Errorf("httpsink: already closed") } hs.closed = true return nil } func (hs *httpSink) String() string { return fmt.Sprintf("httpSink{%s}", hs.url) } type headerRoundTripper struct { *http.Transport // must be transport to support CancelRequest headers http.Header } func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { var nreq http.Request nreq = *req nreq.Header = make(http.Header) merge := func(headers http.Header) { for k, v := range headers { nreq.Header[k] = append(nreq.Header[k], v...) } } merge(req.Header) merge(hrt.headers) return hrt.Transport.RoundTrip(&nreq) }