diff --git a/docs/configuration.md b/docs/configuration.md index df3447090..fe95d5942 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -100,6 +100,17 @@ storage: gcs: bucket: bucketname keyfile: /path/to/keyfile + credentials: + type: service_account + project_id: project_id_string + private_key_id: private_key_id_string + private_key: private_key_string + client_email: client@example.com + client_id: client_id_string + auth_uri: http://example.com/auth_uri + token_uri: http://example.com/token_uri + auth_provider_x509_cert_url: http://example.com/provider_cert_url + client_x509_cert_url: http://example.com/client_cert_url rootdirectory: /gcs/object/name/prefix chunksize: 5242880 s3: @@ -389,6 +400,17 @@ storage: gcs: bucket: bucketname keyfile: /path/to/keyfile + credentials: + type: service_account + project_id: project_id_string + private_key_id: private_key_id_string + private_key: private_key_string + client_email: client@example.com + client_id: client_id_string + auth_uri: http://example.com/auth_uri + token_uri: http://example.com/token_uri + auth_provider_x509_cert_url: http://example.com/provider_cert_url + client_x509_cert_url: http://example.com/client_cert_url rootdirectory: /gcs/object/name/prefix s3: accesskey: awsaccesskey diff --git a/registry/storage/driver/base/regulator.go b/registry/storage/driver/base/regulator.go index 97a30ae4d..9c5e6cc41 100644 --- a/registry/storage/driver/base/regulator.go +++ b/registry/storage/driver/base/regulator.go @@ -2,7 +2,10 @@ package base import ( "context" + "fmt" "io" + "reflect" + "strconv" "sync" storagedriver "github.com/docker/distribution/registry/storage/driver" @@ -15,6 +18,46 @@ type regulator struct { available uint64 } +// GetLimitFromParameter takes an interface type as decoded from the YAML +// configuration and returns a uint64 representing the maximum number of +// concurrent calls given a minimum limit and default. +// +// If the parameter supplied is of an invalid type this returns an error. +func GetLimitFromParameter(param interface{}, min, def uint64) (uint64, error) { + limit := def + + switch v := param.(type) { + case string: + var err error + if limit, err = strconv.ParseUint(v, 0, 64); err != nil { + return limit, fmt.Errorf("parameter must be an integer, '%v' invalid", param) + } + case uint64: + limit = v + case int, int32, int64: + val := reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Int() + // if param is negative casting to uint64 will wrap around and + // give you the hugest thread limit ever. Let's be sensible, here + if val > 0 { + limit = uint64(val) + } else { + limit = min + } + case uint, uint32: + limit = reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Uint() + case nil: + // use the default + default: + return 0, fmt.Errorf("invalid value '%#v'", param) + } + + if limit < min { + return min, nil + } + + return limit, nil +} + // NewRegulator wraps the given driver and is used to regulate concurrent calls // to the given storage driver to a maximum of the given limit. This is useful // for storage drivers that would otherwise create an unbounded number of OS diff --git a/registry/storage/driver/base/regulator_test.go b/registry/storage/driver/base/regulator_test.go index e4c0ad586..e30c6a75c 100644 --- a/registry/storage/driver/base/regulator_test.go +++ b/registry/storage/driver/base/regulator_test.go @@ -1,6 +1,7 @@ package base import ( + "fmt" "sync" "testing" "time" @@ -65,3 +66,33 @@ func TestRegulatorEnterExit(t *testing.T) { } } } + +func TestGetLimitFromParameter(t *testing.T) { + tests := []struct { + Input interface{} + Expected uint64 + Min uint64 + Default uint64 + Err error + }{ + {"foo", 0, 5, 5, fmt.Errorf("parameter must be an integer, 'foo' invalid")}, + {"50", 50, 5, 5, nil}, + {"5", 25, 25, 50, nil}, // lower than Min returns Min + {nil, 50, 25, 50, nil}, // nil returns default + {812, 812, 25, 50, nil}, + } + + for _, item := range tests { + t.Run(fmt.Sprint(item.Input), func(t *testing.T) { + actual, err := GetLimitFromParameter(item.Input, item.Min, item.Default) + + if err != nil && item.Err != nil && err.Error() != item.Err.Error() { + t.Fatalf("GetLimitFromParameter error, expected %#v got %#v", item.Err, err) + } + + if actual != item.Expected { + t.Fatalf("GetLimitFromParameter result error, expected %d got %d", item.Expected, actual) + } + }) + } +} diff --git a/registry/storage/driver/filesystem/driver.go b/registry/storage/driver/filesystem/driver.go index 9036536b9..8fc9d1ca0 100644 --- a/registry/storage/driver/filesystem/driver.go +++ b/registry/storage/driver/filesystem/driver.go @@ -9,8 +9,6 @@ import ( "io/ioutil" "os" "path" - "reflect" - "strconv" "time" storagedriver "github.com/docker/distribution/registry/storage/driver" @@ -85,33 +83,9 @@ func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, e rootDirectory = fmt.Sprint(rootDir) } - // Get maximum number of threads for blocking filesystem operations, - // if specified - threads := parameters["maxthreads"] - switch v := threads.(type) { - case string: - if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil { - return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads) - } - case uint64: - maxThreads = v - case int, int32, int64: - val := reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int() - // If threads is negative casting to uint64 will wrap around and - // give you the hugest thread limit ever. Let's be sensible, here - if val > 0 { - maxThreads = uint64(val) - } - case uint, uint32: - maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint() - case nil: - // do nothing - default: - return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads) - } - - if maxThreads < minThreads { - maxThreads = minThreads + maxThreads, err = base.GetLimitFromParameter(parameters["maxthreads"], minThreads, defaultMaxThreads) + if err != nil { + return nil, fmt.Errorf("maxthreads config error: %s", err.Error()) } } diff --git a/registry/storage/driver/gcs/gcs.go b/registry/storage/driver/gcs/gcs.go index d129bee77..86dc87f14 100644 --- a/registry/storage/driver/gcs/gcs.go +++ b/registry/storage/driver/gcs/gcs.go @@ -17,6 +17,7 @@ package gcs import ( "bytes" "context" + "encoding/json" "fmt" "io" "io/ioutil" @@ -49,6 +50,8 @@ const ( uploadSessionContentType = "application/x-docker-upload-session" minChunkSize = 256 * 1024 defaultChunkSize = 20 * minChunkSize + defaultMaxConcurrency = 50 + minConcurrency = 25 maxTries = 5 ) @@ -64,6 +67,12 @@ type driverParameters struct { client *http.Client rootDirectory string chunkSize int + + // maxConcurrency limits the number of concurrent driver operations + // to GCS, which ultimately increases reliability of many simultaneous + // pushes by ensuring we aren't DoSing our own server with many + // connections. + maxConcurrency uint64 } func init() { @@ -89,6 +98,16 @@ type driver struct { chunkSize int } +// Wrapper wraps `driver` with a throttler, ensuring that no more than N +// GCS actions can occur concurrently. The default limit is 75. +type Wrapper struct { + baseEmbed +} + +type baseEmbed struct { + base.Base +} + // FromParameters constructs a new Driver with a given parameters map // Required parameters: // - bucket @@ -140,6 +159,31 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri return nil, err } ts = jwtConf.TokenSource(context.Background()) + } else if credentials, ok := parameters["credentials"]; ok { + credentialMap, ok := credentials.(map[interface{}]interface{}) + if !ok { + return nil, fmt.Errorf("The credentials were not specified in the correct format") + } + + stringMap := map[string]interface{}{} + for k, v := range credentialMap { + key, ok := k.(string) + if !ok { + return nil, fmt.Errorf("One of the credential keys was not a string: %s", fmt.Sprint(k)) + } + stringMap[key] = v + } + + data, err := json.Marshal(stringMap) + if err != nil { + return nil, fmt.Errorf("Failed to marshal gcs credentials to json") + } + + jwtConf, err = google.JWTConfigFromJSON(data, storage.ScopeFullControl) + if err != nil { + return nil, err + } + ts = jwtConf.TokenSource(context.Background()) } else { var err error ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl) @@ -148,13 +192,19 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri } } + maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency) + if err != nil { + return nil, fmt.Errorf("maxconcurrency config error: %s", err) + } + params := driverParameters{ - bucket: fmt.Sprint(bucket), - rootDirectory: fmt.Sprint(rootDirectory), - email: jwtConf.Email, - privateKey: jwtConf.PrivateKey, - client: oauth2.NewClient(context.Background(), ts), - chunkSize: chunkSize, + bucket: fmt.Sprint(bucket), + rootDirectory: fmt.Sprint(rootDirectory), + email: jwtConf.Email, + privateKey: jwtConf.PrivateKey, + client: oauth2.NewClient(context.Background(), ts), + chunkSize: chunkSize, + maxConcurrency: maxConcurrency, } return New(params) @@ -178,8 +228,12 @@ func New(params driverParameters) (storagedriver.StorageDriver, error) { chunkSize: params.chunkSize, } - return &base.Base{ - StorageDriver: d, + return &Wrapper{ + baseEmbed: baseEmbed{ + Base: base.Base{ + StorageDriver: base.NewRegulator(d, params.maxConcurrency), + }, + }, }, nil } @@ -864,7 +918,7 @@ func (d *driver) context(context context.Context) context.Context { } func (d *driver) pathToKey(path string) string { - return strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/") + return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/")) } func (d *driver) pathToDirKey(path string) string {