diff --git a/registry/storage/driver/base/regulator.go b/registry/storage/driver/base/regulator.go index 97a30ae4..9c5e6cc4 100644 --- a/registry/storage/driver/base/regulator.go +++ b/registry/storage/driver/base/regulator.go @@ -2,7 +2,10 @@ package base import ( "context" + "fmt" "io" + "reflect" + "strconv" "sync" storagedriver "github.com/docker/distribution/registry/storage/driver" @@ -15,6 +18,46 @@ type regulator struct { available uint64 } +// GetLimitFromParameter takes an interface type as decoded from the YAML +// configuration and returns a uint64 representing the maximum number of +// concurrent calls given a minimum limit and default. +// +// If the parameter supplied is of an invalid type this returns an error. +func GetLimitFromParameter(param interface{}, min, def uint64) (uint64, error) { + limit := def + + switch v := param.(type) { + case string: + var err error + if limit, err = strconv.ParseUint(v, 0, 64); err != nil { + return limit, fmt.Errorf("parameter must be an integer, '%v' invalid", param) + } + case uint64: + limit = v + case int, int32, int64: + val := reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Int() + // if param is negative casting to uint64 will wrap around and + // give you the hugest thread limit ever. Let's be sensible, here + if val > 0 { + limit = uint64(val) + } else { + limit = min + } + case uint, uint32: + limit = reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Uint() + case nil: + // use the default + default: + return 0, fmt.Errorf("invalid value '%#v'", param) + } + + if limit < min { + return min, nil + } + + return limit, nil +} + // NewRegulator wraps the given driver and is used to regulate concurrent calls // to the given storage driver to a maximum of the given limit. This is useful // for storage drivers that would otherwise create an unbounded number of OS diff --git a/registry/storage/driver/base/regulator_test.go b/registry/storage/driver/base/regulator_test.go index e4c0ad58..e30c6a75 100644 --- a/registry/storage/driver/base/regulator_test.go +++ b/registry/storage/driver/base/regulator_test.go @@ -1,6 +1,7 @@ package base import ( + "fmt" "sync" "testing" "time" @@ -65,3 +66,33 @@ func TestRegulatorEnterExit(t *testing.T) { } } } + +func TestGetLimitFromParameter(t *testing.T) { + tests := []struct { + Input interface{} + Expected uint64 + Min uint64 + Default uint64 + Err error + }{ + {"foo", 0, 5, 5, fmt.Errorf("parameter must be an integer, 'foo' invalid")}, + {"50", 50, 5, 5, nil}, + {"5", 25, 25, 50, nil}, // lower than Min returns Min + {nil, 50, 25, 50, nil}, // nil returns default + {812, 812, 25, 50, nil}, + } + + for _, item := range tests { + t.Run(fmt.Sprint(item.Input), func(t *testing.T) { + actual, err := GetLimitFromParameter(item.Input, item.Min, item.Default) + + if err != nil && item.Err != nil && err.Error() != item.Err.Error() { + t.Fatalf("GetLimitFromParameter error, expected %#v got %#v", item.Err, err) + } + + if actual != item.Expected { + t.Fatalf("GetLimitFromParameter result error, expected %d got %d", item.Expected, actual) + } + }) + } +} diff --git a/registry/storage/driver/filesystem/driver.go b/registry/storage/driver/filesystem/driver.go index 9036536b..8fc9d1ca 100644 --- a/registry/storage/driver/filesystem/driver.go +++ b/registry/storage/driver/filesystem/driver.go @@ -9,8 +9,6 @@ import ( "io/ioutil" "os" "path" - "reflect" - "strconv" "time" storagedriver "github.com/docker/distribution/registry/storage/driver" @@ -85,33 +83,9 @@ func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, e rootDirectory = fmt.Sprint(rootDir) } - // Get maximum number of threads for blocking filesystem operations, - // if specified - threads := parameters["maxthreads"] - switch v := threads.(type) { - case string: - if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil { - return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads) - } - case uint64: - maxThreads = v - case int, int32, int64: - val := reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int() - // If threads is negative casting to uint64 will wrap around and - // give you the hugest thread limit ever. Let's be sensible, here - if val > 0 { - maxThreads = uint64(val) - } - case uint, uint32: - maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint() - case nil: - // do nothing - default: - return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads) - } - - if maxThreads < minThreads { - maxThreads = minThreads + maxThreads, err = base.GetLimitFromParameter(parameters["maxthreads"], minThreads, defaultMaxThreads) + if err != nil { + return nil, fmt.Errorf("maxthreads config error: %s", err.Error()) } } diff --git a/registry/storage/driver/gcs/gcs.go b/registry/storage/driver/gcs/gcs.go index a1dbd195..b6784487 100644 --- a/registry/storage/driver/gcs/gcs.go +++ b/registry/storage/driver/gcs/gcs.go @@ -9,8 +9,6 @@ // // Note that the contents of incomplete uploads are not accessible even though // Stat returns their length -// -// +build include_gcs package gcs @@ -50,6 +48,8 @@ const ( uploadSessionContentType = "application/x-docker-upload-session" minChunkSize = 256 * 1024 defaultChunkSize = 20 * minChunkSize + defaultMaxConcurrency = 50 + minConcurrency = 25 maxTries = 5 ) @@ -65,6 +65,12 @@ type driverParameters struct { client *http.Client rootDirectory string chunkSize int + + // maxConcurrency limits the number of concurrent driver operations + // to GCS, which ultimately increases reliability of many simultaneous + // pushes by ensuring we aren't DoSing our own server with many + // connections. + maxConcurrency uint64 } func init() { @@ -90,6 +96,16 @@ type driver struct { chunkSize int } +// Wrapper wraps `driver` with a throttler, ensuring that no more than N +// GCS actions can occur concurrently. The default limit is 75. +type Wrapper struct { + baseEmbed +} + +type baseEmbed struct { + base.Base +} + // FromParameters constructs a new Driver with a given parameters map // Required parameters: // - bucket @@ -174,13 +190,19 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri } } + maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency) + if err != nil { + return nil, fmt.Errorf("maxconcurrency config error: %s", err) + } + params := driverParameters{ - bucket: fmt.Sprint(bucket), - rootDirectory: fmt.Sprint(rootDirectory), - email: jwtConf.Email, - privateKey: jwtConf.PrivateKey, - client: oauth2.NewClient(context.Background(), ts), - chunkSize: chunkSize, + bucket: fmt.Sprint(bucket), + rootDirectory: fmt.Sprint(rootDirectory), + email: jwtConf.Email, + privateKey: jwtConf.PrivateKey, + client: oauth2.NewClient(context.Background(), ts), + chunkSize: chunkSize, + maxConcurrency: maxConcurrency, } return New(params) @@ -204,8 +226,12 @@ func New(params driverParameters) (storagedriver.StorageDriver, error) { chunkSize: params.chunkSize, } - return &base.Base{ - StorageDriver: d, + return &Wrapper{ + baseEmbed: baseEmbed{ + Base: base.Base{ + StorageDriver: base.NewRegulator(d, params.maxConcurrency), + }, + }, }, nil } @@ -890,7 +916,7 @@ func (d *driver) context(context context.Context) context.Context { } func (d *driver) pathToKey(path string) string { - return strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/") + return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/")) } func (d *driver) pathToDirKey(path string) string {