diff --git a/docs/configuration.md b/docs/configuration.md index 91d852316..6a1afb794 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -78,6 +78,7 @@ information about each option that appears later in this page. storage: filesystem: rootdirectory: /var/lib/registry + maxthreads: 100 azure: accountname: accountname accountkey: base64encodedaccountkey diff --git a/docs/storage-drivers/filesystem.md b/docs/storage-drivers/filesystem.md index 476edcf5a..fea7ce4a8 100644 --- a/docs/storage-drivers/filesystem.md +++ b/docs/storage-drivers/filesystem.md @@ -16,3 +16,7 @@ An implementation of the `storagedriver.StorageDriver` interface which uses the `rootdirectory`: (optional) The absolute path to a root directory tree in which to store all registry files. The registry stores all its data here so make sure there is adequate space available. Defaults to `/var/lib/registry`. +`maxthreads`: (optional) The maximum number of simultaneous blocking filesystem +operations permitted within the registry. Each operation spawns a new thread and +may cause thread exhaustion issues if many are done in parallel. Defaults to +`100`, and can be no lower than `25`. diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go index b93b53433..967dcd3d2 100644 --- a/registry/proxy/proxyblobstore_test.go +++ b/registry/proxy/proxyblobstore_test.go @@ -132,8 +132,15 @@ func makeTestEnv(t *testing.T, name string) *testEnv { t.Fatalf("unable to create tempdir: %s", err) } + localDriver, err := filesystem.FromParameters(map[string]interface{}{ + "rootdirectory": truthDir, + }) + if err != nil { + t.Fatalf("unable to create filesystem driver: %s", err) + } + // todo: create a tempfile area here - localRegistry, err := storage.NewRegistry(ctx, filesystem.New(truthDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption) + localRegistry, err := storage.NewRegistry(ctx, localDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -142,7 +149,14 @@ func makeTestEnv(t *testing.T, name string) *testEnv { t.Fatalf("unexpected error getting repo: %v", err) } - truthRegistry, err := storage.NewRegistry(ctx, filesystem.New(cacheDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider())) + cacheDriver, err := filesystem.FromParameters(map[string]interface{}{ + "rootdirectory": cacheDir, + }) + if err != nil { + t.Fatalf("unable to create filesystem driver: %s", err) + } + + truthRegistry, err := storage.NewRegistry(ctx, cacheDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider())) if err != nil { t.Fatalf("error creating registry: %v", err) } diff --git a/registry/storage/driver/base/regulator.go b/registry/storage/driver/base/regulator.go new file mode 100644 index 000000000..185160a4b --- /dev/null +++ b/registry/storage/driver/base/regulator.go @@ -0,0 +1,145 @@ +package base + +import ( + "io" + "sync" + + "github.com/docker/distribution/context" + storagedriver "github.com/docker/distribution/registry/storage/driver" +) + +type regulator struct { + storagedriver.StorageDriver + *sync.Cond + + available uint64 +} + +// NewRegulator wraps the given driver and is used to regulate concurrent calls +// to the given storage driver to a maximum of the given limit. This is useful +// for storage drivers that would otherwise create an unbounded number of OS +// threads if allowed to be called unregulated. +func NewRegulator(driver storagedriver.StorageDriver, limit uint64) storagedriver.StorageDriver { + return ®ulator{ + StorageDriver: driver, + Cond: sync.NewCond(&sync.Mutex{}), + available: limit, + } +} + +func (r *regulator) enter() { + r.L.Lock() + for r.available == 0 { + r.Wait() + } + r.available-- + r.L.Unlock() +} + +func (r *regulator) exit() { + r.L.Lock() + // We only need to signal to a waiting FS operation if we're already at the + // limit of threads used + if r.available == 0 { + r.Signal() + } + r.available++ + r.L.Unlock() +} + +// Name returns the human-readable "name" of the driver, useful in error +// messages and logging. By convention, this will just be the registration +// name, but drivers may provide other information here. +func (r *regulator) Name() string { + r.enter() + defer r.exit() + + return r.StorageDriver.Name() +} + +// GetContent retrieves the content stored at "path" as a []byte. +// This should primarily be used for small objects. +func (r *regulator) GetContent(ctx context.Context, path string) ([]byte, error) { + r.enter() + defer r.exit() + + return r.StorageDriver.GetContent(ctx, path) +} + +// PutContent stores the []byte content at a location designated by "path". +// This should primarily be used for small objects. +func (r *regulator) PutContent(ctx context.Context, path string, content []byte) error { + r.enter() + defer r.exit() + + return r.StorageDriver.PutContent(ctx, path, content) +} + +// Reader retrieves an io.ReadCloser for the content stored at "path" +// with a given byte offset. +// May be used to resume reading a stream by providing a nonzero offset. +func (r *regulator) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { + r.enter() + defer r.exit() + + return r.StorageDriver.Reader(ctx, path, offset) +} + +// Writer stores the contents of the provided io.ReadCloser at a +// location designated by the given path. +// May be used to resume writing a stream by providing a nonzero offset. +// The offset must be no larger than the CurrentSize for this path. +func (r *regulator) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { + r.enter() + defer r.exit() + + return r.StorageDriver.Writer(ctx, path, append) +} + +// Stat retrieves the FileInfo for the given path, including the current +// size in bytes and the creation time. +func (r *regulator) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { + r.enter() + defer r.exit() + + return r.StorageDriver.Stat(ctx, path) +} + +// List returns a list of the objects that are direct descendants of the +//given path. +func (r *regulator) List(ctx context.Context, path string) ([]string, error) { + r.enter() + defer r.exit() + + return r.StorageDriver.List(ctx, path) +} + +// Move moves an object stored at sourcePath to destPath, removing the +// original object. +// Note: This may be no more efficient than a copy followed by a delete for +// many implementations. +func (r *regulator) Move(ctx context.Context, sourcePath string, destPath string) error { + r.enter() + defer r.exit() + + return r.StorageDriver.Move(ctx, sourcePath, destPath) +} + +// Delete recursively deletes all objects stored at "path" and its subpaths. +func (r *regulator) Delete(ctx context.Context, path string) error { + r.enter() + defer r.exit() + + return r.StorageDriver.Delete(ctx, path) +} + +// URLFor returns a URL which may be used to retrieve the content stored at +// the given path, possibly using the given options. +// May return an ErrUnsupportedMethod in certain StorageDriver +// implementations. +func (r *regulator) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { + r.enter() + defer r.exit() + + return r.StorageDriver.URLFor(ctx, path, options) +} diff --git a/registry/storage/driver/filesystem/driver.go b/registry/storage/driver/filesystem/driver.go index 3bbdc6379..649e2bc23 100644 --- a/registry/storage/driver/filesystem/driver.go +++ b/registry/storage/driver/filesystem/driver.go @@ -8,6 +8,8 @@ import ( "io/ioutil" "os" "path" + "reflect" + "strconv" "time" "github.com/docker/distribution/context" @@ -16,8 +18,23 @@ import ( "github.com/docker/distribution/registry/storage/driver/factory" ) -const driverName = "filesystem" -const defaultRootDirectory = "/var/lib/registry" +const ( + driverName = "filesystem" + defaultRootDirectory = "/var/lib/registry" + defaultMaxThreads = uint64(100) + + // minThreads is the minimum value for the maxthreads configuration + // parameter. If the driver's parameters are less than this we set + // the parameters to minThreads + minThreads = uint64(25) +) + +// DriverParameters represents all configuration options available for the +// filesystem driver +type DriverParameters struct { + RootDirectory string + MaxThreads uint64 +} func init() { factory.Register(driverName, &filesystemDriverFactory{}) @@ -27,7 +44,7 @@ func init() { type filesystemDriverFactory struct{} func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { - return FromParameters(parameters), nil + return FromParameters(parameters) } type driver struct { @@ -47,25 +64,72 @@ type Driver struct { // FromParameters constructs a new Driver with a given parameters map // Optional Parameters: // - rootdirectory -func FromParameters(parameters map[string]interface{}) *Driver { - var rootDirectory = defaultRootDirectory +// - maxthreads +func FromParameters(parameters map[string]interface{}) (*Driver, error) { + params, err := fromParametersImpl(parameters) + if err != nil || params == nil { + return nil, err + } + return New(*params), nil +} + +func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) { + var ( + err error + maxThreads = defaultMaxThreads + rootDirectory = defaultRootDirectory + ) + if parameters != nil { - rootDir, ok := parameters["rootdirectory"] - if ok { + if rootDir, ok := parameters["rootdirectory"]; ok { rootDirectory = fmt.Sprint(rootDir) } + + // Get maximum number of threads for blocking filesystem operations, + // if specified + threads := parameters["maxthreads"] + switch v := threads.(type) { + case string: + if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil { + return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads) + } + case uint64: + maxThreads = v + case int, int32, int64: + val := reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int() + // If threads is negative casting to uint64 will wrap around and + // give you the hugest thread limit ever. Let's be sensible, here + if val > 0 { + maxThreads = uint64(val) + } + case uint, uint32: + maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint() + case nil: + // do nothing + default: + return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads) + } + + if maxThreads < minThreads { + maxThreads = minThreads + } } - return New(rootDirectory) + + params := &DriverParameters{ + RootDirectory: rootDirectory, + MaxThreads: maxThreads, + } + return params, nil } // New constructs a new Driver with a given rootDirectory -func New(rootDirectory string) *Driver { +func New(params DriverParameters) *Driver { + fsDriver := &driver{rootDirectory: params.RootDirectory} + return &Driver{ baseEmbed: baseEmbed{ Base: base.Base{ - StorageDriver: &driver{ - rootDirectory: rootDirectory, - }, + StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads), }, }, } diff --git a/registry/storage/driver/filesystem/driver_test.go b/registry/storage/driver/filesystem/driver_test.go index 8b48b4312..3be859239 100644 --- a/registry/storage/driver/filesystem/driver_test.go +++ b/registry/storage/driver/filesystem/driver_test.go @@ -3,6 +3,7 @@ package filesystem import ( "io/ioutil" "os" + "reflect" "testing" storagedriver "github.com/docker/distribution/registry/storage/driver" @@ -20,7 +21,93 @@ func init() { } defer os.Remove(root) + driver, err := FromParameters(map[string]interface{}{ + "rootdirectory": root, + }) + if err != nil { + panic(err) + } + testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) { - return New(root), nil + return driver, nil }, testsuites.NeverSkip) } + +func TestFromParametersImpl(t *testing.T) { + + tests := []struct { + params map[string]interface{} // techincally the yaml can contain anything + expected DriverParameters + pass bool + }{ + // check we use default threads and root dirs + { + params: map[string]interface{}{}, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: defaultMaxThreads, + }, + pass: true, + }, + // Testing initiation with a string maxThreads which can't be parsed + { + params: map[string]interface{}{ + "maxthreads": "fail", + }, + expected: DriverParameters{}, + pass: false, + }, + { + params: map[string]interface{}{ + "maxthreads": "100", + }, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: uint64(100), + }, + pass: true, + }, + { + params: map[string]interface{}{ + "maxthreads": 100, + }, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: uint64(100), + }, + pass: true, + }, + // check that we use minimum thread counts + { + params: map[string]interface{}{ + "maxthreads": 1, + }, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: minThreads, + }, + pass: true, + }, + } + + for _, item := range tests { + params, err := fromParametersImpl(item.params) + + if !item.pass { + // We only need to assert that expected failures have an error + if err == nil { + t.Fatalf("expected error configuring filesystem driver with invalid param: %+v", item.params) + } + continue + } + + if err != nil { + t.Fatalf("unexpected error creating filesystem driver: %s", err) + } + // Note that we get a pointer to params back + if !reflect.DeepEqual(*params, item.expected) { + t.Fatalf("unexpected params from filesystem driver. expected %+v, got %+v", item.expected, params) + } + } + +}