From 78238ef1a0762a1f9f742882e110741b2f83d656 Mon Sep 17 00:00:00 2001 From: Andrey Kostov Date: Mon, 8 Aug 2016 15:43:12 -0700 Subject: [PATCH 1/6] Add credentials argument for GCS driver Signed-off-by: Andrey Kostov --- registry/storage/driver/gcs/gcs.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/registry/storage/driver/gcs/gcs.go b/registry/storage/driver/gcs/gcs.go index d129bee77..d1276c256 100644 --- a/registry/storage/driver/gcs/gcs.go +++ b/registry/storage/driver/gcs/gcs.go @@ -17,6 +17,7 @@ package gcs import ( "bytes" "context" + "encoding/json" "fmt" "io" "io/ioutil" @@ -140,6 +141,27 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri return nil, err } ts = jwtConf.TokenSource(context.Background()) + } else if credentials, ok := parameters["credentials"]; ok { + credentialMap, ok := credentials.(map[interface{}]interface{}) + if !ok { + return nil, fmt.Errorf("The credentials were not specified in the correct format") + } + + stringMap := map[string]interface{}{} + for k, v := range credentialMap { + key, ok := k.(string) + if !ok { + return nil, fmt.Errorf("One of the credential keys was not a string") + } + stringMap[key] = v + } + + data, err := json.Marshal(stringMap) + jwtConf, err := google.JWTConfigFromJSON(data, storage.ScopeFullControl) + if err != nil { + return nil, err + } + ts = jwtConf.TokenSource(context.Background()) } else { var err error ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl) From 3f9f073cefbd5e1fd6a4f0c669acea08a5bcf935 Mon Sep 17 00:00:00 2001 From: Andrey Kostov Date: Thu, 11 Aug 2016 14:41:23 -0700 Subject: [PATCH 2/6] Edit configuration.md to add gcs credentials option Signed-off-by: Andrey Kostov --- docs/configuration.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index df3447090..fe95d5942 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -100,6 +100,17 @@ storage: gcs: bucket: bucketname keyfile: /path/to/keyfile + credentials: + type: service_account + project_id: project_id_string + private_key_id: private_key_id_string + private_key: private_key_string + client_email: client@example.com + client_id: client_id_string + auth_uri: http://example.com/auth_uri + token_uri: http://example.com/token_uri + auth_provider_x509_cert_url: http://example.com/provider_cert_url + client_x509_cert_url: http://example.com/client_cert_url rootdirectory: /gcs/object/name/prefix chunksize: 5242880 s3: @@ -389,6 +400,17 @@ storage: gcs: bucket: bucketname keyfile: /path/to/keyfile + credentials: + type: service_account + project_id: project_id_string + private_key_id: private_key_id_string + private_key: private_key_string + client_email: client@example.com + client_id: client_id_string + auth_uri: http://example.com/auth_uri + token_uri: http://example.com/token_uri + auth_provider_x509_cert_url: http://example.com/provider_cert_url + client_x509_cert_url: http://example.com/client_cert_url rootdirectory: /gcs/object/name/prefix s3: accesskey: awsaccesskey From b424c3d870e096c2652c5d62ee350b0e44bd6210 Mon Sep 17 00:00:00 2001 From: Andrey Kostov Date: Mon, 15 Aug 2016 14:35:18 -0700 Subject: [PATCH 3/6] Better error handling for GCS credential argument addition Signed-off-by: Andrey Kostov --- registry/storage/driver/gcs/gcs.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/registry/storage/driver/gcs/gcs.go b/registry/storage/driver/gcs/gcs.go index d1276c256..a1dbd1951 100644 --- a/registry/storage/driver/gcs/gcs.go +++ b/registry/storage/driver/gcs/gcs.go @@ -151,12 +151,16 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri for k, v := range credentialMap { key, ok := k.(string) if !ok { - return nil, fmt.Errorf("One of the credential keys was not a string") + return nil, fmt.Errorf("One of the credential keys was not a string: %s", fmt.Sprint(k)) } stringMap[key] = v } data, err := json.Marshal(stringMap) + if err != nil { + return nil, fmt.Errorf("Failed to marshal gcs credentials to json") + } + jwtConf, err := google.JWTConfigFromJSON(data, storage.ScopeFullControl) if err != nil { return nil, err From f9187b257296ce68254be52f829c4da9a2e2ee3c Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Tue, 6 Dec 2016 15:50:39 -0800 Subject: [PATCH 4/6] Add regulator to GCS Signed-off-by: Huu Nguyen --- registry/storage/driver/base/regulator.go | 43 +++++++++++++++++ .../storage/driver/base/regulator_test.go | 31 ++++++++++++ registry/storage/driver/filesystem/driver.go | 32 ++----------- registry/storage/driver/gcs/gcs.go | 48 ++++++++++++++----- 4 files changed, 114 insertions(+), 40 deletions(-) diff --git a/registry/storage/driver/base/regulator.go b/registry/storage/driver/base/regulator.go index 97a30ae4d..9c5e6cc41 100644 --- a/registry/storage/driver/base/regulator.go +++ b/registry/storage/driver/base/regulator.go @@ -2,7 +2,10 @@ package base import ( "context" + "fmt" "io" + "reflect" + "strconv" "sync" storagedriver "github.com/docker/distribution/registry/storage/driver" @@ -15,6 +18,46 @@ type regulator struct { available uint64 } +// GetLimitFromParameter takes an interface type as decoded from the YAML +// configuration and returns a uint64 representing the maximum number of +// concurrent calls given a minimum limit and default. +// +// If the parameter supplied is of an invalid type this returns an error. +func GetLimitFromParameter(param interface{}, min, def uint64) (uint64, error) { + limit := def + + switch v := param.(type) { + case string: + var err error + if limit, err = strconv.ParseUint(v, 0, 64); err != nil { + return limit, fmt.Errorf("parameter must be an integer, '%v' invalid", param) + } + case uint64: + limit = v + case int, int32, int64: + val := reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Int() + // if param is negative casting to uint64 will wrap around and + // give you the hugest thread limit ever. Let's be sensible, here + if val > 0 { + limit = uint64(val) + } else { + limit = min + } + case uint, uint32: + limit = reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Uint() + case nil: + // use the default + default: + return 0, fmt.Errorf("invalid value '%#v'", param) + } + + if limit < min { + return min, nil + } + + return limit, nil +} + // NewRegulator wraps the given driver and is used to regulate concurrent calls // to the given storage driver to a maximum of the given limit. This is useful // for storage drivers that would otherwise create an unbounded number of OS diff --git a/registry/storage/driver/base/regulator_test.go b/registry/storage/driver/base/regulator_test.go index e4c0ad586..e30c6a75c 100644 --- a/registry/storage/driver/base/regulator_test.go +++ b/registry/storage/driver/base/regulator_test.go @@ -1,6 +1,7 @@ package base import ( + "fmt" "sync" "testing" "time" @@ -65,3 +66,33 @@ func TestRegulatorEnterExit(t *testing.T) { } } } + +func TestGetLimitFromParameter(t *testing.T) { + tests := []struct { + Input interface{} + Expected uint64 + Min uint64 + Default uint64 + Err error + }{ + {"foo", 0, 5, 5, fmt.Errorf("parameter must be an integer, 'foo' invalid")}, + {"50", 50, 5, 5, nil}, + {"5", 25, 25, 50, nil}, // lower than Min returns Min + {nil, 50, 25, 50, nil}, // nil returns default + {812, 812, 25, 50, nil}, + } + + for _, item := range tests { + t.Run(fmt.Sprint(item.Input), func(t *testing.T) { + actual, err := GetLimitFromParameter(item.Input, item.Min, item.Default) + + if err != nil && item.Err != nil && err.Error() != item.Err.Error() { + t.Fatalf("GetLimitFromParameter error, expected %#v got %#v", item.Err, err) + } + + if actual != item.Expected { + t.Fatalf("GetLimitFromParameter result error, expected %d got %d", item.Expected, actual) + } + }) + } +} diff --git a/registry/storage/driver/filesystem/driver.go b/registry/storage/driver/filesystem/driver.go index 9036536b9..8fc9d1ca0 100644 --- a/registry/storage/driver/filesystem/driver.go +++ b/registry/storage/driver/filesystem/driver.go @@ -9,8 +9,6 @@ import ( "io/ioutil" "os" "path" - "reflect" - "strconv" "time" storagedriver "github.com/docker/distribution/registry/storage/driver" @@ -85,33 +83,9 @@ func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, e rootDirectory = fmt.Sprint(rootDir) } - // Get maximum number of threads for blocking filesystem operations, - // if specified - threads := parameters["maxthreads"] - switch v := threads.(type) { - case string: - if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil { - return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads) - } - case uint64: - maxThreads = v - case int, int32, int64: - val := reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int() - // If threads is negative casting to uint64 will wrap around and - // give you the hugest thread limit ever. Let's be sensible, here - if val > 0 { - maxThreads = uint64(val) - } - case uint, uint32: - maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint() - case nil: - // do nothing - default: - return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads) - } - - if maxThreads < minThreads { - maxThreads = minThreads + maxThreads, err = base.GetLimitFromParameter(parameters["maxthreads"], minThreads, defaultMaxThreads) + if err != nil { + return nil, fmt.Errorf("maxthreads config error: %s", err.Error()) } } diff --git a/registry/storage/driver/gcs/gcs.go b/registry/storage/driver/gcs/gcs.go index a1dbd1951..b67844879 100644 --- a/registry/storage/driver/gcs/gcs.go +++ b/registry/storage/driver/gcs/gcs.go @@ -9,8 +9,6 @@ // // Note that the contents of incomplete uploads are not accessible even though // Stat returns their length -// -// +build include_gcs package gcs @@ -50,6 +48,8 @@ const ( uploadSessionContentType = "application/x-docker-upload-session" minChunkSize = 256 * 1024 defaultChunkSize = 20 * minChunkSize + defaultMaxConcurrency = 50 + minConcurrency = 25 maxTries = 5 ) @@ -65,6 +65,12 @@ type driverParameters struct { client *http.Client rootDirectory string chunkSize int + + // maxConcurrency limits the number of concurrent driver operations + // to GCS, which ultimately increases reliability of many simultaneous + // pushes by ensuring we aren't DoSing our own server with many + // connections. + maxConcurrency uint64 } func init() { @@ -90,6 +96,16 @@ type driver struct { chunkSize int } +// Wrapper wraps `driver` with a throttler, ensuring that no more than N +// GCS actions can occur concurrently. The default limit is 75. +type Wrapper struct { + baseEmbed +} + +type baseEmbed struct { + base.Base +} + // FromParameters constructs a new Driver with a given parameters map // Required parameters: // - bucket @@ -174,13 +190,19 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri } } + maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency) + if err != nil { + return nil, fmt.Errorf("maxconcurrency config error: %s", err) + } + params := driverParameters{ - bucket: fmt.Sprint(bucket), - rootDirectory: fmt.Sprint(rootDirectory), - email: jwtConf.Email, - privateKey: jwtConf.PrivateKey, - client: oauth2.NewClient(context.Background(), ts), - chunkSize: chunkSize, + bucket: fmt.Sprint(bucket), + rootDirectory: fmt.Sprint(rootDirectory), + email: jwtConf.Email, + privateKey: jwtConf.PrivateKey, + client: oauth2.NewClient(context.Background(), ts), + chunkSize: chunkSize, + maxConcurrency: maxConcurrency, } return New(params) @@ -204,8 +226,12 @@ func New(params driverParameters) (storagedriver.StorageDriver, error) { chunkSize: params.chunkSize, } - return &base.Base{ - StorageDriver: d, + return &Wrapper{ + baseEmbed: baseEmbed{ + Base: base.Base{ + StorageDriver: base.NewRegulator(d, params.maxConcurrency), + }, + }, }, nil } @@ -890,7 +916,7 @@ func (d *driver) context(context context.Context) context.Context { } func (d *driver) pathToKey(path string) string { - return strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/") + return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/")) } func (d *driver) pathToDirKey(path string) string { From 69299d93d9b324ad2a7d9c9081f280f1e74ce74f Mon Sep 17 00:00:00 2001 From: Huu Nguyen Date: Mon, 7 May 2018 15:04:06 -0700 Subject: [PATCH 5/6] Use existing jwtConf instead of creating a scoped one Signed-off-by: Huu Nguyen --- registry/storage/driver/gcs/gcs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/storage/driver/gcs/gcs.go b/registry/storage/driver/gcs/gcs.go index b67844879..07f58afa4 100644 --- a/registry/storage/driver/gcs/gcs.go +++ b/registry/storage/driver/gcs/gcs.go @@ -177,7 +177,7 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri return nil, fmt.Errorf("Failed to marshal gcs credentials to json") } - jwtConf, err := google.JWTConfigFromJSON(data, storage.ScopeFullControl) + jwtConf, err = google.JWTConfigFromJSON(data, storage.ScopeFullControl) if err != nil { return nil, err } From 7a195dd5ca4b9b399da0b1b36441dfa12658e660 Mon Sep 17 00:00:00 2001 From: Huu Nguyen Date: Tue, 15 May 2018 11:20:09 -0700 Subject: [PATCH 6/6] Add back include_gcs build constraint Signed-off-by: Huu Nguyen --- registry/storage/driver/gcs/gcs.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/registry/storage/driver/gcs/gcs.go b/registry/storage/driver/gcs/gcs.go index 07f58afa4..86dc87f14 100644 --- a/registry/storage/driver/gcs/gcs.go +++ b/registry/storage/driver/gcs/gcs.go @@ -9,6 +9,8 @@ // // Note that the contents of incomplete uploads are not accessible even though // Stat returns their length +// +// +build include_gcs package gcs