Add transport field to EndpointConfig struct

The EndpointConfig struct in the notifications package has some config
fields for a notification endpoint. This commit adds the ability to pass
in an *http.Transport to use when notifying that endpoint of an event.
This is especially useful for endpoints that use self-signed CAs.

Signed-off-by: Josh Chorlton <josh.chorlton@docker.com>
This commit is contained in:
Josh Chorlton 2016-07-09 12:59:05 -07:00
parent 4e17ab5d31
commit a62f212544
3 changed files with 42 additions and 6 deletions

View file

@ -12,6 +12,7 @@ type EndpointConfig struct {
Timeout time.Duration Timeout time.Duration
Threshold int Threshold int
Backoff time.Duration Backoff time.Duration
Transport *http.Transport
} }
// defaults set any zero-valued fields to a reasonable default. // defaults set any zero-valued fields to a reasonable default.
@ -27,6 +28,10 @@ func (ec *EndpointConfig) defaults() {
if ec.Backoff <= 0 { if ec.Backoff <= 0 {
ec.Backoff = time.Second ec.Backoff = time.Second
} }
if ec.Transport == nil {
ec.Transport = http.DefaultTransport.(*http.Transport)
}
} }
// Endpoint is a reliable, queued, thread-safe sink that notify external http // Endpoint is a reliable, queued, thread-safe sink that notify external http
@ -54,7 +59,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
// Configures the inmemory queue, retry, http pipeline. // Configures the inmemory queue, retry, http pipeline.
endpoint.Sink = newHTTPSink( endpoint.Sink = newHTTPSink(
endpoint.url, endpoint.Timeout, endpoint.Headers, endpoint.url, endpoint.Timeout, endpoint.Headers,
endpoint.metrics.httpStatusListener()) endpoint.Transport, endpoint.metrics.httpStatusListener())
endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff) endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())

View file

@ -26,13 +26,16 @@ type httpSink struct {
// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other // newHTTPSink returns an unreliable, single-flight http sink. Wrap in other
// sinks for increased reliability. // sinks for increased reliability.
func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink { func newHTTPSink(u string, timeout time.Duration, headers http.Header, transport *http.Transport, listeners ...httpStatusListener) *httpSink {
if transport == nil {
transport = http.DefaultTransport.(*http.Transport)
}
return &httpSink{ return &httpSink{
url: u, url: u,
listeners: listeners, listeners: listeners,
client: &http.Client{ client: &http.Client{
Transport: &headerRoundTripper{ Transport: &headerRoundTripper{
Transport: http.DefaultTransport.(*http.Transport), Transport: transport,
headers: headers, headers: headers,
}, },
Timeout: timeout, Timeout: timeout,

View file

@ -1,6 +1,7 @@
package notifications package notifications
import ( import (
"crypto/tls"
"encoding/json" "encoding/json"
"fmt" "fmt"
"mime" "mime"
@ -8,6 +9,7 @@ import (
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
"strconv" "strconv"
"strings"
"testing" "testing"
"github.com/docker/distribution/manifest/schema1" "github.com/docker/distribution/manifest/schema1"
@ -16,7 +18,7 @@ import (
// TestHTTPSink mocks out an http endpoint and notifies it under a couple of // TestHTTPSink mocks out an http endpoint and notifies it under a couple of
// conditions, ensuring correct behavior. // conditions, ensuring correct behavior.
func TestHTTPSink(t *testing.T) { func TestHTTPSink(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() defer r.Body.Close()
if r.Method != "POST" { if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed) w.WriteHeader(http.StatusMethodNotAllowed)
@ -57,12 +59,38 @@ func TestHTTPSink(t *testing.T) {
} }
w.WriteHeader(status) w.WriteHeader(status)
})) })
server := httptest.NewTLSServer(serverHandler)
metrics := newSafeMetrics() metrics := newSafeMetrics()
sink := newHTTPSink(server.URL, 0, nil, sink := newHTTPSink(server.URL, 0, nil, nil,
&endpointMetricsHTTPStatusListener{safeMetrics: metrics}) &endpointMetricsHTTPStatusListener{safeMetrics: metrics})
// first make sure that the default transport gives x509 untrusted cert error
events := []Event{}
err := sink.Write(events...)
if !strings.Contains(err.Error(), "x509") {
t.Fatal("TLS server with default transport should give unknown CA error")
}
if err := sink.Close(); err != nil {
t.Fatalf("unexpected error closing http sink: %v", err)
}
// make sure that passing in the transport no longer gives this error
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
sink = newHTTPSink(server.URL, 0, nil, tr,
&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
err = sink.Write(events...)
if err != nil {
t.Fatalf("unexpected error writing events: %v", err)
}
// reset server to standard http server and sink to a basic sink
server = httptest.NewServer(serverHandler)
sink = newHTTPSink(server.URL, 0, nil, nil,
&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
var expectedMetrics EndpointMetrics var expectedMetrics EndpointMetrics
expectedMetrics.Statuses = make(map[string]int) expectedMetrics.Statuses = make(map[string]int)