From e4dd3359cc3171b98ae34edd8a0a1a19e6d61a6a Mon Sep 17 00:00:00 2001 From: Josh Hawn Date: Sat, 27 Feb 2016 15:37:07 -0800 Subject: [PATCH 1/4] Regulate filesystem driver to max of 100 calls It's easily possible for a flood of requests to trigger thousands of concurrent file accesses on the storage driver. Each file I/O call creates a new OS thread that is not reaped by the Golang runtime. By limiting it to only 100 at a time we can effectively bound the number of OS threads in use by the storage driver. Docker-DCO-1.1-Signed-off-by: Josh Hawn (github: jlhawn) Signed-off-by: Tony Holdstock-Brown --- registry/storage/driver/base/regulator.go | 150 +++++++++++++++++++ registry/storage/driver/filesystem/driver.go | 6 +- 2 files changed, 153 insertions(+), 3 deletions(-) create mode 100644 registry/storage/driver/base/regulator.go diff --git a/registry/storage/driver/base/regulator.go b/registry/storage/driver/base/regulator.go new file mode 100644 index 00000000..21ddfe57 --- /dev/null +++ b/registry/storage/driver/base/regulator.go @@ -0,0 +1,150 @@ +package base + +import ( + "io" + "sync" + + "github.com/docker/distribution/context" + storagedriver "github.com/docker/distribution/registry/storage/driver" +) + +type regulator struct { + storagedriver.StorageDriver + sync.Cond + + available uint +} + +// NewRegulator wraps the given driver and is used to regulate concurrent calls +// to the given storage driver to a maximum of the given limit. This is useful +// for storage drivers that would otherwise create an unbounded number of OS +// threads if allowed to be called unregulated. +func NewRegulator(driver storagedriver.StorageDriver, limit uint) storagedriver.StorageDriver { + return ®ulator{ + StorageDriver: driver, + Cond: sync.Cond{ + L: &sync.Mutex{}, + }, + available: limit, + } +} + +func (r *regulator) condition() bool { + return r.available > 0 +} + +func (r *regulator) enter() { + r.L.Lock() + defer r.L.Unlock() + + for !r.condition() { + r.Wait() + } + + r.available-- +} + +func (r *regulator) exit() { + r.L.Lock() + defer r.Signal() + defer r.L.Unlock() + + r.available++ +} + +// Name returns the human-readable "name" of the driver, useful in error +// messages and logging. By convention, this will just be the registration +// name, but drivers may provide other information here. +func (r *regulator) Name() string { + r.enter() + defer r.exit() + + return r.StorageDriver.Name() +} + +// GetContent retrieves the content stored at "path" as a []byte. +// This should primarily be used for small objects. +func (r *regulator) GetContent(ctx context.Context, path string) ([]byte, error) { + r.enter() + defer r.exit() + + return r.StorageDriver.GetContent(ctx, path) +} + +// PutContent stores the []byte content at a location designated by "path". +// This should primarily be used for small objects. +func (r *regulator) PutContent(ctx context.Context, path string, content []byte) error { + r.enter() + defer r.exit() + + return r.StorageDriver.PutContent(ctx, path, content) +} + +// ReadStream retrieves an io.ReadCloser for the content stored at "path" +// with a given byte offset. +// May be used to resume reading a stream by providing a nonzero offset. +func (r *regulator) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { + r.enter() + defer r.exit() + + return r.StorageDriver.ReadStream(ctx, path, offset) +} + +// WriteStream stores the contents of the provided io.ReadCloser at a +// location designated by the given path. +// May be used to resume writing a stream by providing a nonzero offset. +// The offset must be no larger than the CurrentSize for this path. +func (r *regulator) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error) { + r.enter() + defer r.exit() + + return r.StorageDriver.WriteStream(ctx, path, offset, reader) +} + +// Stat retrieves the FileInfo for the given path, including the current +// size in bytes and the creation time. +func (r *regulator) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { + r.enter() + defer r.exit() + + return r.StorageDriver.Stat(ctx, path) +} + +// List returns a list of the objects that are direct descendants of the +//given path. +func (r *regulator) List(ctx context.Context, path string) ([]string, error) { + r.enter() + defer r.exit() + + return r.StorageDriver.List(ctx, path) +} + +// Move moves an object stored at sourcePath to destPath, removing the +// original object. +// Note: This may be no more efficient than a copy followed by a delete for +// many implementations. +func (r *regulator) Move(ctx context.Context, sourcePath string, destPath string) error { + r.enter() + defer r.exit() + + return r.StorageDriver.Move(ctx, sourcePath, destPath) +} + +// Delete recursively deletes all objects stored at "path" and its subpaths. +func (r *regulator) Delete(ctx context.Context, path string) error { + r.enter() + defer r.exit() + + return r.StorageDriver.Delete(ctx, path) +} + +// URLFor returns a URL which may be used to retrieve the content stored at +// the given path, possibly using the given options. +// May return an ErrUnsupportedMethod in certain StorageDriver +// implementations. +func (r *regulator) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { + r.enter() + defer r.exit() + + return r.StorageDriver.URLFor(ctx, path, options) +} diff --git a/registry/storage/driver/filesystem/driver.go b/registry/storage/driver/filesystem/driver.go index 3bbdc637..e22e9809 100644 --- a/registry/storage/driver/filesystem/driver.go +++ b/registry/storage/driver/filesystem/driver.go @@ -60,12 +60,12 @@ func FromParameters(parameters map[string]interface{}) *Driver { // New constructs a new Driver with a given rootDirectory func New(rootDirectory string) *Driver { + fsDriver := &driver{rootDirectory: rootDirectory} + return &Driver{ baseEmbed: baseEmbed{ Base: base.Base{ - StorageDriver: &driver{ - rootDirectory: rootDirectory, - }, + StorageDriver: base.NewRegulator(fsDriver, 100), }, }, } From 33c448f14769c96d6702f9481831132b06e3cad6 Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Tue, 26 Apr 2016 14:36:38 -0700 Subject: [PATCH 2/4] Implement regulator in filesystem driver This commit refactors base.regulator into the 2.4 interfaces and adds a filesystem configuration option `maxthreads` to configure the regulator. By default `maxthreads` is set to 100. This means the FS driver is limited to 100 concurrent blocking file operations. Any subsequent operations will block in Go until previous filesystem operations complete. This ensures that the registry can never open thousands of simultaneous threads from os filesystem operations. Note that `maxthreads` can never be less than 25. Add test case covering parsable string maxthreads Signed-off-by: Tony Holdstock-Brown --- registry/proxy/proxyblobstore_test.go | 18 +++- registry/storage/driver/base/regulator.go | 43 ++++----- registry/storage/driver/filesystem/driver.go | 81 ++++++++++++++--- .../storage/driver/filesystem/driver_test.go | 89 ++++++++++++++++++- 4 files changed, 193 insertions(+), 38 deletions(-) diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go index b93b5343..967dcd3d 100644 --- a/registry/proxy/proxyblobstore_test.go +++ b/registry/proxy/proxyblobstore_test.go @@ -132,8 +132,15 @@ func makeTestEnv(t *testing.T, name string) *testEnv { t.Fatalf("unable to create tempdir: %s", err) } + localDriver, err := filesystem.FromParameters(map[string]interface{}{ + "rootdirectory": truthDir, + }) + if err != nil { + t.Fatalf("unable to create filesystem driver: %s", err) + } + // todo: create a tempfile area here - localRegistry, err := storage.NewRegistry(ctx, filesystem.New(truthDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption) + localRegistry, err := storage.NewRegistry(ctx, localDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -142,7 +149,14 @@ func makeTestEnv(t *testing.T, name string) *testEnv { t.Fatalf("unexpected error getting repo: %v", err) } - truthRegistry, err := storage.NewRegistry(ctx, filesystem.New(cacheDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider())) + cacheDriver, err := filesystem.FromParameters(map[string]interface{}{ + "rootdirectory": cacheDir, + }) + if err != nil { + t.Fatalf("unable to create filesystem driver: %s", err) + } + + truthRegistry, err := storage.NewRegistry(ctx, cacheDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider())) if err != nil { t.Fatalf("error creating registry: %v", err) } diff --git a/registry/storage/driver/base/regulator.go b/registry/storage/driver/base/regulator.go index 21ddfe57..185160a4 100644 --- a/registry/storage/driver/base/regulator.go +++ b/registry/storage/driver/base/regulator.go @@ -10,46 +10,41 @@ import ( type regulator struct { storagedriver.StorageDriver - sync.Cond + *sync.Cond - available uint + available uint64 } // NewRegulator wraps the given driver and is used to regulate concurrent calls // to the given storage driver to a maximum of the given limit. This is useful // for storage drivers that would otherwise create an unbounded number of OS // threads if allowed to be called unregulated. -func NewRegulator(driver storagedriver.StorageDriver, limit uint) storagedriver.StorageDriver { +func NewRegulator(driver storagedriver.StorageDriver, limit uint64) storagedriver.StorageDriver { return ®ulator{ StorageDriver: driver, - Cond: sync.Cond{ - L: &sync.Mutex{}, - }, - available: limit, + Cond: sync.NewCond(&sync.Mutex{}), + available: limit, } } -func (r *regulator) condition() bool { - return r.available > 0 -} - func (r *regulator) enter() { r.L.Lock() - defer r.L.Unlock() - - for !r.condition() { + for r.available == 0 { r.Wait() } - r.available-- + r.L.Unlock() } func (r *regulator) exit() { r.L.Lock() - defer r.Signal() - defer r.L.Unlock() - + // We only need to signal to a waiting FS operation if we're already at the + // limit of threads used + if r.available == 0 { + r.Signal() + } r.available++ + r.L.Unlock() } // Name returns the human-readable "name" of the driver, useful in error @@ -80,25 +75,25 @@ func (r *regulator) PutContent(ctx context.Context, path string, content []byte) return r.StorageDriver.PutContent(ctx, path, content) } -// ReadStream retrieves an io.ReadCloser for the content stored at "path" +// Reader retrieves an io.ReadCloser for the content stored at "path" // with a given byte offset. // May be used to resume reading a stream by providing a nonzero offset. -func (r *regulator) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { +func (r *regulator) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { r.enter() defer r.exit() - return r.StorageDriver.ReadStream(ctx, path, offset) + return r.StorageDriver.Reader(ctx, path, offset) } -// WriteStream stores the contents of the provided io.ReadCloser at a +// Writer stores the contents of the provided io.ReadCloser at a // location designated by the given path. // May be used to resume writing a stream by providing a nonzero offset. // The offset must be no larger than the CurrentSize for this path. -func (r *regulator) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error) { +func (r *regulator) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { r.enter() defer r.exit() - return r.StorageDriver.WriteStream(ctx, path, offset, reader) + return r.StorageDriver.Writer(ctx, path, append) } // Stat retrieves the FileInfo for the given path, including the current diff --git a/registry/storage/driver/filesystem/driver.go b/registry/storage/driver/filesystem/driver.go index e22e9809..1a897261 100644 --- a/registry/storage/driver/filesystem/driver.go +++ b/registry/storage/driver/filesystem/driver.go @@ -8,6 +8,8 @@ import ( "io/ioutil" "os" "path" + "reflect" + "strconv" "time" "github.com/docker/distribution/context" @@ -16,8 +18,23 @@ import ( "github.com/docker/distribution/registry/storage/driver/factory" ) -const driverName = "filesystem" -const defaultRootDirectory = "/var/lib/registry" +const ( + driverName = "filesystem" + defaultRootDirectory = "/var/lib/registry" + defaultMaxThreads = uint64(100) + + // minThreads is the minimum value for the maxthreads configuration + // parameter. If the driver's parameters are less than this we set + // the parameters to minThreads + minThreads = uint64(25) +) + +// DriverParameters represents all configuration options available for the +// filesystem driver +type DriverParameters struct { + RootDirectory string + MaxThreads uint64 +} func init() { factory.Register(driverName, &filesystemDriverFactory{}) @@ -27,7 +44,7 @@ func init() { type filesystemDriverFactory struct{} func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { - return FromParameters(parameters), nil + return FromParameters(parameters) } type driver struct { @@ -47,25 +64,67 @@ type Driver struct { // FromParameters constructs a new Driver with a given parameters map // Optional Parameters: // - rootdirectory -func FromParameters(parameters map[string]interface{}) *Driver { - var rootDirectory = defaultRootDirectory +// - maxthreads +func FromParameters(parameters map[string]interface{}) (*Driver, error) { + params, err := fromParametersImpl(parameters) + if err != nil || params == nil { + return nil, err + } + return New(*params), nil +} + +func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) { + var ( + err error + maxThreads = defaultMaxThreads + rootDirectory = defaultRootDirectory + ) + if parameters != nil { - rootDir, ok := parameters["rootdirectory"] - if ok { + if rootDir, ok := parameters["rootdirectory"]; ok { rootDirectory = fmt.Sprint(rootDir) } + + // Get maximum number of threads for blocking filesystem operations, + // if specified + threads := parameters["maxthreads"] + switch v := threads.(type) { + case string: + if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil { + return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads) + } + case uint64: + maxThreads = v + case int, int32, int64: + maxThreads = uint64(reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int()) + case uint, uint32: + maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint() + case nil: + // do nothing + default: + return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads) + } + + if maxThreads < minThreads { + maxThreads = minThreads + } } - return New(rootDirectory) + + params := &DriverParameters{ + RootDirectory: rootDirectory, + MaxThreads: maxThreads, + } + return params, nil } // New constructs a new Driver with a given rootDirectory -func New(rootDirectory string) *Driver { - fsDriver := &driver{rootDirectory: rootDirectory} +func New(params DriverParameters) *Driver { + fsDriver := &driver{rootDirectory: params.RootDirectory} return &Driver{ baseEmbed: baseEmbed{ Base: base.Base{ - StorageDriver: base.NewRegulator(fsDriver, 100), + StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads), }, }, } diff --git a/registry/storage/driver/filesystem/driver_test.go b/registry/storage/driver/filesystem/driver_test.go index 8b48b431..3be85923 100644 --- a/registry/storage/driver/filesystem/driver_test.go +++ b/registry/storage/driver/filesystem/driver_test.go @@ -3,6 +3,7 @@ package filesystem import ( "io/ioutil" "os" + "reflect" "testing" storagedriver "github.com/docker/distribution/registry/storage/driver" @@ -20,7 +21,93 @@ func init() { } defer os.Remove(root) + driver, err := FromParameters(map[string]interface{}{ + "rootdirectory": root, + }) + if err != nil { + panic(err) + } + testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) { - return New(root), nil + return driver, nil }, testsuites.NeverSkip) } + +func TestFromParametersImpl(t *testing.T) { + + tests := []struct { + params map[string]interface{} // techincally the yaml can contain anything + expected DriverParameters + pass bool + }{ + // check we use default threads and root dirs + { + params: map[string]interface{}{}, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: defaultMaxThreads, + }, + pass: true, + }, + // Testing initiation with a string maxThreads which can't be parsed + { + params: map[string]interface{}{ + "maxthreads": "fail", + }, + expected: DriverParameters{}, + pass: false, + }, + { + params: map[string]interface{}{ + "maxthreads": "100", + }, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: uint64(100), + }, + pass: true, + }, + { + params: map[string]interface{}{ + "maxthreads": 100, + }, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: uint64(100), + }, + pass: true, + }, + // check that we use minimum thread counts + { + params: map[string]interface{}{ + "maxthreads": 1, + }, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: minThreads, + }, + pass: true, + }, + } + + for _, item := range tests { + params, err := fromParametersImpl(item.params) + + if !item.pass { + // We only need to assert that expected failures have an error + if err == nil { + t.Fatalf("expected error configuring filesystem driver with invalid param: %+v", item.params) + } + continue + } + + if err != nil { + t.Fatalf("unexpected error creating filesystem driver: %s", err) + } + // Note that we get a pointer to params back + if !reflect.DeepEqual(*params, item.expected) { + t.Fatalf("unexpected params from filesystem driver. expected %+v, got %+v", item.expected, params) + } + } + +} From d0352a7448b17346ad67882cf3df3b1239481d7c Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Tue, 26 Apr 2016 15:20:40 -0700 Subject: [PATCH 3/4] Add documentation Signed-off-by: Tony Holdstock-Brown --- docs/configuration.md | 1 + docs/storage-drivers/filesystem.md | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index f9b89feb..7d3a73e3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -78,6 +78,7 @@ information about each option that appears later in this page. storage: filesystem: rootdirectory: /var/lib/registry + maxthreads: 100 azure: accountname: accountname accountkey: base64encodedaccountkey diff --git a/docs/storage-drivers/filesystem.md b/docs/storage-drivers/filesystem.md index 476edcf5..fea7ce4a 100644 --- a/docs/storage-drivers/filesystem.md +++ b/docs/storage-drivers/filesystem.md @@ -16,3 +16,7 @@ An implementation of the `storagedriver.StorageDriver` interface which uses the `rootdirectory`: (optional) The absolute path to a root directory tree in which to store all registry files. The registry stores all its data here so make sure there is adequate space available. Defaults to `/var/lib/registry`. +`maxthreads`: (optional) The maximum number of simultaneous blocking filesystem +operations permitted within the registry. Each operation spawns a new thread and +may cause thread exhaustion issues if many are done in parallel. Defaults to +`100`, and can be no lower than `25`. From c9c62380ffc1b72a6d1ea1e80cc2d806c750bd9a Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Tue, 3 May 2016 16:03:22 -0700 Subject: [PATCH 4/4] Don't wrap thead limits when using a negative int Signed-off-by: Tony Holdstock-Brown --- registry/storage/driver/filesystem/driver.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/registry/storage/driver/filesystem/driver.go b/registry/storage/driver/filesystem/driver.go index 1a897261..649e2bc2 100644 --- a/registry/storage/driver/filesystem/driver.go +++ b/registry/storage/driver/filesystem/driver.go @@ -96,7 +96,12 @@ func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, e case uint64: maxThreads = v case int, int32, int64: - maxThreads = uint64(reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int()) + val := reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int() + // If threads is negative casting to uint64 will wrap around and + // give you the hugest thread limit ever. Let's be sensible, here + if val > 0 { + maxThreads = uint64(val) + } case uint, uint32: maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint() case nil: