Adds custom registry User-Agent header to s3 HTTP requests

Uses docker/goamz instead of AdRoll/goamz

Adds a registry UA string param to the storage parameters when
constructing the storage driver for the registry App.
This could be used by other storage drivers as well

Signed-off-by: Brian Bland <brian.bland@docker.com>
This commit is contained in:
Brian Bland 2016-01-20 16:40:58 -08:00
parent e4b654f2ce
commit f41a408e34
4 changed files with 58 additions and 29 deletions

View file

@ -9,6 +9,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"runtime"
"time" "time"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
@ -30,6 +31,7 @@ import (
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/factory" "github.com/docker/distribution/registry/storage/driver/factory"
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
"github.com/docker/distribution/version"
"github.com/docker/libtrust" "github.com/docker/libtrust"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -83,12 +85,12 @@ type App struct {
// NewApp takes a configuration and returns a configured app, ready to serve // NewApp takes a configuration and returns a configured app, ready to serve
// requests. The app only implements ServeHTTP and can be wrapped in other // requests. The app only implements ServeHTTP and can be wrapped in other
// handlers accordingly. // handlers accordingly.
func NewApp(ctx context.Context, configuration *configuration.Configuration) *App { func NewApp(ctx context.Context, config *configuration.Configuration) *App {
app := &App{ app := &App{
Config: configuration, Config: config,
Context: ctx, Context: ctx,
router: v2.RouterWithPrefix(configuration.HTTP.Prefix), router: v2.RouterWithPrefix(config.HTTP.Prefix),
isCache: configuration.Proxy.RemoteURL != "", isCache: config.Proxy.RemoteURL != "",
} }
// Register the handler dispatchers. // Register the handler dispatchers.
@ -102,8 +104,15 @@ func NewApp(ctx context.Context, configuration *configuration.Configuration) *Ap
app.register(v2.RouteNameBlobUpload, blobUploadDispatcher) app.register(v2.RouteNameBlobUpload, blobUploadDispatcher)
app.register(v2.RouteNameBlobUploadChunk, blobUploadDispatcher) app.register(v2.RouteNameBlobUploadChunk, blobUploadDispatcher)
// override the storage driver's UA string for registry outbound HTTP requests
storageParams := config.Storage.Parameters()
if storageParams == nil {
storageParams = make(configuration.Parameters)
}
storageParams["useragent"] = fmt.Sprintf("docker-distribution/%s %s", version.Version, runtime.Version())
var err error var err error
app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) app.driver, err = factory.Create(config.Storage.Type(), storageParams)
if err != nil { if err != nil {
// TODO(stevvooe): Move the creation of a service into a protected // TODO(stevvooe): Move the creation of a service into a protected
// method, where this is created lazily. Its status can be queried via // method, where this is created lazily. Its status can be queried via
@ -112,7 +121,7 @@ func NewApp(ctx context.Context, configuration *configuration.Configuration) *Ap
} }
purgeConfig := uploadPurgeDefaultConfig() purgeConfig := uploadPurgeDefaultConfig()
if mc, ok := configuration.Storage["maintenance"]; ok { if mc, ok := config.Storage["maintenance"]; ok {
if v, ok := mc["uploadpurging"]; ok { if v, ok := mc["uploadpurging"]; ok {
purgeConfig, ok = v.(map[interface{}]interface{}) purgeConfig, ok = v.(map[interface{}]interface{})
if !ok { if !ok {
@ -135,15 +144,15 @@ func NewApp(ctx context.Context, configuration *configuration.Configuration) *Ap
startUploadPurger(app, app.driver, ctxu.GetLogger(app), purgeConfig) startUploadPurger(app, app.driver, ctxu.GetLogger(app), purgeConfig)
app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"]) app.driver, err = applyStorageMiddleware(app.driver, config.Middleware["storage"])
if err != nil { if err != nil {
panic(err) panic(err)
} }
app.configureSecret(configuration) app.configureSecret(config)
app.configureEvents(configuration) app.configureEvents(config)
app.configureRedis(configuration) app.configureRedis(config)
app.configureLogHook(configuration) app.configureLogHook(config)
// Generate an ephemeral key to be used for signing converted manifests // Generate an ephemeral key to be used for signing converted manifests
// for clients that don't support schema2. // for clients that don't support schema2.
@ -152,8 +161,8 @@ func NewApp(ctx context.Context, configuration *configuration.Configuration) *Ap
panic(err) panic(err)
} }
if configuration.HTTP.Host != "" { if config.HTTP.Host != "" {
u, err := url.Parse(configuration.HTTP.Host) u, err := url.Parse(config.HTTP.Host)
if err != nil { if err != nil {
panic(fmt.Sprintf(`could not parse http "host" parameter: %v`, err)) panic(fmt.Sprintf(`could not parse http "host" parameter: %v`, err))
} }
@ -167,7 +176,7 @@ func NewApp(ctx context.Context, configuration *configuration.Configuration) *Ap
} }
// configure deletion // configure deletion
if d, ok := configuration.Storage["delete"]; ok { if d, ok := config.Storage["delete"]; ok {
e, ok := d["enabled"] e, ok := d["enabled"]
if ok { if ok {
if deleteEnabled, ok := e.(bool); ok && deleteEnabled { if deleteEnabled, ok := e.(bool); ok && deleteEnabled {
@ -178,7 +187,7 @@ func NewApp(ctx context.Context, configuration *configuration.Configuration) *Ap
// configure redirects // configure redirects
var redirectDisabled bool var redirectDisabled bool
if redirectConfig, ok := configuration.Storage["redirect"]; ok { if redirectConfig, ok := config.Storage["redirect"]; ok {
v := redirectConfig["disable"] v := redirectConfig["disable"]
switch v := v.(type) { switch v := v.(type) {
case bool: case bool:
@ -194,7 +203,7 @@ func NewApp(ctx context.Context, configuration *configuration.Configuration) *Ap
} }
// configure storage caches // configure storage caches
if cc, ok := configuration.Storage["cache"]; ok { if cc, ok := config.Storage["cache"]; ok {
v, ok := cc["blobdescriptor"] v, ok := cc["blobdescriptor"]
if !ok { if !ok {
// Backwards compatible: "layerinfo" == "blobdescriptor" // Backwards compatible: "layerinfo" == "blobdescriptor"
@ -223,7 +232,7 @@ func NewApp(ctx context.Context, configuration *configuration.Configuration) *Ap
ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache") ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache")
default: default:
if v != "" { if v != "" {
ctxu.GetLogger(app).Warnf("unknown cache type %q, caching disabled", configuration.Storage["cache"]) ctxu.GetLogger(app).Warnf("unknown cache type %q, caching disabled", config.Storage["cache"])
} }
} }
} }
@ -236,15 +245,15 @@ func NewApp(ctx context.Context, configuration *configuration.Configuration) *Ap
} }
} }
app.registry, err = applyRegistryMiddleware(app.Context, app.registry, configuration.Middleware["registry"]) app.registry, err = applyRegistryMiddleware(app.Context, app.registry, config.Middleware["registry"])
if err != nil { if err != nil {
panic(err) panic(err)
} }
authType := configuration.Auth.Type() authType := config.Auth.Type()
if authType != "" { if authType != "" {
accessController, err := auth.GetAccessController(configuration.Auth.Type(), configuration.Auth.Parameters()) accessController, err := auth.GetAccessController(config.Auth.Type(), config.Auth.Parameters())
if err != nil { if err != nil {
panic(fmt.Sprintf("unable to configure authorization (%s): %v", authType, err)) panic(fmt.Sprintf("unable to configure authorization (%s): %v", authType, err))
} }
@ -253,13 +262,13 @@ func NewApp(ctx context.Context, configuration *configuration.Configuration) *Ap
} }
// configure as a pull through cache // configure as a pull through cache
if configuration.Proxy.RemoteURL != "" { if config.Proxy.RemoteURL != "" {
app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, configuration.Proxy) app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, config.Proxy)
if err != nil { if err != nil {
panic(err.Error()) panic(err.Error())
} }
app.isCache = true app.isCache = true
ctxu.GetLogger(app).Info("Registry configured as a proxy cache to ", configuration.Proxy.RemoteURL) ctxu.GetLogger(app).Info("Registry configured as a proxy cache to ", config.Proxy.RemoteURL)
} }
return app return app

View file

@ -10,10 +10,10 @@ import (
"io/ioutil" "io/ioutil"
"time" "time"
"github.com/AdRoll/goamz/cloudfront"
"github.com/docker/distribution/context" "github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
"github.com/docker/goamz/cloudfront"
) )
// cloudFrontStorageMiddleware provides an simple implementation of layerHandler that // cloudFrontStorageMiddleware provides an simple implementation of layerHandler that

View file

@ -26,11 +26,12 @@ import (
"sync" "sync"
"time" "time"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/goamz/aws"
"github.com/docker/goamz/s3"
"github.com/docker/distribution/context" "github.com/docker/distribution/context"
"github.com/docker/distribution/registry/client/transport"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base" "github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory" "github.com/docker/distribution/registry/storage/driver/factory"
@ -58,6 +59,7 @@ type DriverParameters struct {
V4Auth bool V4Auth bool
ChunkSize int64 ChunkSize int64
RootDirectory string RootDirectory string
UserAgent string
} }
func init() { func init() {
@ -168,7 +170,7 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
case int, uint, int32, uint32, uint64: case int, uint, int32, uint32, uint64:
chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int() chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
default: default:
return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam) return nil, fmt.Errorf("invalid value for chunksize: %#v", chunkSizeParam)
} }
if chunkSize < minChunkSize { if chunkSize < minChunkSize {
@ -181,6 +183,11 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
rootDirectory = "" rootDirectory = ""
} }
userAgent, ok := parameters["useragent"]
if !ok {
userAgent = ""
}
params := DriverParameters{ params := DriverParameters{
fmt.Sprint(accessKey), fmt.Sprint(accessKey),
fmt.Sprint(secretKey), fmt.Sprint(secretKey),
@ -191,6 +198,7 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
v4AuthBool, v4AuthBool,
chunkSize, chunkSize,
fmt.Sprint(rootDirectory), fmt.Sprint(rootDirectory),
fmt.Sprint(userAgent),
} }
return New(params) return New(params)
@ -209,7 +217,16 @@ func New(params DriverParameters) (*Driver, error) {
} }
s3obj := s3.New(auth, params.Region) s3obj := s3.New(auth, params.Region)
bucket := s3obj.Bucket(params.Bucket)
if params.UserAgent != "" {
s3obj.Client = &http.Client{
Transport: transport.NewTransport(http.DefaultTransport,
transport.NewHeaderRequestModifier(http.Header{
http.CanonicalHeaderKey("User-Agent"): []string{params.UserAgent},
}),
),
}
}
if params.V4Auth { if params.V4Auth {
s3obj.Signature = aws.V4Signature s3obj.Signature = aws.V4Signature
@ -219,6 +236,8 @@ func New(params DriverParameters) (*Driver, error) {
} }
} }
bucket := s3obj.Bucket(params.Bucket)
// TODO Currently multipart uploads have no timestamps, so this would be unwise // TODO Currently multipart uploads have no timestamps, so this would be unwise
// if you initiated a new s3driver while another one is running on the same bucket. // if you initiated a new s3driver while another one is running on the same bucket.
// multis, _, err := bucket.ListMulti("", "") // multis, _, err := bucket.ListMulti("", "")

View file

@ -6,10 +6,10 @@ import (
"strconv" "strconv"
"testing" "testing"
"github.com/AdRoll/goamz/aws"
"github.com/docker/distribution/context" "github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/testsuites" "github.com/docker/distribution/registry/storage/driver/testsuites"
"github.com/docker/goamz/aws"
"gopkg.in/check.v1" "gopkg.in/check.v1"
) )
@ -69,6 +69,7 @@ func init() {
v4AuthBool, v4AuthBool,
minChunkSize, minChunkSize,
rootDirectory, rootDirectory,
"",
} }
return New(parameters) return New(parameters)