diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go index b93b5343..967dcd3d 100644 --- a/registry/proxy/proxyblobstore_test.go +++ b/registry/proxy/proxyblobstore_test.go @@ -132,8 +132,15 @@ func makeTestEnv(t *testing.T, name string) *testEnv { t.Fatalf("unable to create tempdir: %s", err) } + localDriver, err := filesystem.FromParameters(map[string]interface{}{ + "rootdirectory": truthDir, + }) + if err != nil { + t.Fatalf("unable to create filesystem driver: %s", err) + } + // todo: create a tempfile area here - localRegistry, err := storage.NewRegistry(ctx, filesystem.New(truthDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption) + localRegistry, err := storage.NewRegistry(ctx, localDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -142,7 +149,14 @@ func makeTestEnv(t *testing.T, name string) *testEnv { t.Fatalf("unexpected error getting repo: %v", err) } - truthRegistry, err := storage.NewRegistry(ctx, filesystem.New(cacheDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider())) + cacheDriver, err := filesystem.FromParameters(map[string]interface{}{ + "rootdirectory": cacheDir, + }) + if err != nil { + t.Fatalf("unable to create filesystem driver: %s", err) + } + + truthRegistry, err := storage.NewRegistry(ctx, cacheDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider())) if err != nil { t.Fatalf("error creating registry: %v", err) } diff --git a/registry/storage/driver/base/regulator.go b/registry/storage/driver/base/regulator.go index 21ddfe57..185160a4 100644 --- a/registry/storage/driver/base/regulator.go +++ b/registry/storage/driver/base/regulator.go @@ -10,46 +10,41 @@ import ( type regulator struct { storagedriver.StorageDriver - sync.Cond + *sync.Cond - available uint + available uint64 } // NewRegulator wraps the given driver and is used to regulate concurrent calls // to the given storage driver to a maximum of the given limit. This is useful // for storage drivers that would otherwise create an unbounded number of OS // threads if allowed to be called unregulated. -func NewRegulator(driver storagedriver.StorageDriver, limit uint) storagedriver.StorageDriver { +func NewRegulator(driver storagedriver.StorageDriver, limit uint64) storagedriver.StorageDriver { return ®ulator{ StorageDriver: driver, - Cond: sync.Cond{ - L: &sync.Mutex{}, - }, - available: limit, + Cond: sync.NewCond(&sync.Mutex{}), + available: limit, } } -func (r *regulator) condition() bool { - return r.available > 0 -} - func (r *regulator) enter() { r.L.Lock() - defer r.L.Unlock() - - for !r.condition() { + for r.available == 0 { r.Wait() } - r.available-- + r.L.Unlock() } func (r *regulator) exit() { r.L.Lock() - defer r.Signal() - defer r.L.Unlock() - + // We only need to signal to a waiting FS operation if we're already at the + // limit of threads used + if r.available == 0 { + r.Signal() + } r.available++ + r.L.Unlock() } // Name returns the human-readable "name" of the driver, useful in error @@ -80,25 +75,25 @@ func (r *regulator) PutContent(ctx context.Context, path string, content []byte) return r.StorageDriver.PutContent(ctx, path, content) } -// ReadStream retrieves an io.ReadCloser for the content stored at "path" +// Reader retrieves an io.ReadCloser for the content stored at "path" // with a given byte offset. // May be used to resume reading a stream by providing a nonzero offset. -func (r *regulator) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { +func (r *regulator) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { r.enter() defer r.exit() - return r.StorageDriver.ReadStream(ctx, path, offset) + return r.StorageDriver.Reader(ctx, path, offset) } -// WriteStream stores the contents of the provided io.ReadCloser at a +// Writer stores the contents of the provided io.ReadCloser at a // location designated by the given path. // May be used to resume writing a stream by providing a nonzero offset. // The offset must be no larger than the CurrentSize for this path. -func (r *regulator) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error) { +func (r *regulator) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { r.enter() defer r.exit() - return r.StorageDriver.WriteStream(ctx, path, offset, reader) + return r.StorageDriver.Writer(ctx, path, append) } // Stat retrieves the FileInfo for the given path, including the current diff --git a/registry/storage/driver/filesystem/driver.go b/registry/storage/driver/filesystem/driver.go index e22e9809..1a897261 100644 --- a/registry/storage/driver/filesystem/driver.go +++ b/registry/storage/driver/filesystem/driver.go @@ -8,6 +8,8 @@ import ( "io/ioutil" "os" "path" + "reflect" + "strconv" "time" "github.com/docker/distribution/context" @@ -16,8 +18,23 @@ import ( "github.com/docker/distribution/registry/storage/driver/factory" ) -const driverName = "filesystem" -const defaultRootDirectory = "/var/lib/registry" +const ( + driverName = "filesystem" + defaultRootDirectory = "/var/lib/registry" + defaultMaxThreads = uint64(100) + + // minThreads is the minimum value for the maxthreads configuration + // parameter. If the driver's parameters are less than this we set + // the parameters to minThreads + minThreads = uint64(25) +) + +// DriverParameters represents all configuration options available for the +// filesystem driver +type DriverParameters struct { + RootDirectory string + MaxThreads uint64 +} func init() { factory.Register(driverName, &filesystemDriverFactory{}) @@ -27,7 +44,7 @@ func init() { type filesystemDriverFactory struct{} func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { - return FromParameters(parameters), nil + return FromParameters(parameters) } type driver struct { @@ -47,25 +64,67 @@ type Driver struct { // FromParameters constructs a new Driver with a given parameters map // Optional Parameters: // - rootdirectory -func FromParameters(parameters map[string]interface{}) *Driver { - var rootDirectory = defaultRootDirectory +// - maxthreads +func FromParameters(parameters map[string]interface{}) (*Driver, error) { + params, err := fromParametersImpl(parameters) + if err != nil || params == nil { + return nil, err + } + return New(*params), nil +} + +func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) { + var ( + err error + maxThreads = defaultMaxThreads + rootDirectory = defaultRootDirectory + ) + if parameters != nil { - rootDir, ok := parameters["rootdirectory"] - if ok { + if rootDir, ok := parameters["rootdirectory"]; ok { rootDirectory = fmt.Sprint(rootDir) } + + // Get maximum number of threads for blocking filesystem operations, + // if specified + threads := parameters["maxthreads"] + switch v := threads.(type) { + case string: + if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil { + return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads) + } + case uint64: + maxThreads = v + case int, int32, int64: + maxThreads = uint64(reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int()) + case uint, uint32: + maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint() + case nil: + // do nothing + default: + return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads) + } + + if maxThreads < minThreads { + maxThreads = minThreads + } } - return New(rootDirectory) + + params := &DriverParameters{ + RootDirectory: rootDirectory, + MaxThreads: maxThreads, + } + return params, nil } // New constructs a new Driver with a given rootDirectory -func New(rootDirectory string) *Driver { - fsDriver := &driver{rootDirectory: rootDirectory} +func New(params DriverParameters) *Driver { + fsDriver := &driver{rootDirectory: params.RootDirectory} return &Driver{ baseEmbed: baseEmbed{ Base: base.Base{ - StorageDriver: base.NewRegulator(fsDriver, 100), + StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads), }, }, } diff --git a/registry/storage/driver/filesystem/driver_test.go b/registry/storage/driver/filesystem/driver_test.go index 8b48b431..3be85923 100644 --- a/registry/storage/driver/filesystem/driver_test.go +++ b/registry/storage/driver/filesystem/driver_test.go @@ -3,6 +3,7 @@ package filesystem import ( "io/ioutil" "os" + "reflect" "testing" storagedriver "github.com/docker/distribution/registry/storage/driver" @@ -20,7 +21,93 @@ func init() { } defer os.Remove(root) + driver, err := FromParameters(map[string]interface{}{ + "rootdirectory": root, + }) + if err != nil { + panic(err) + } + testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) { - return New(root), nil + return driver, nil }, testsuites.NeverSkip) } + +func TestFromParametersImpl(t *testing.T) { + + tests := []struct { + params map[string]interface{} // techincally the yaml can contain anything + expected DriverParameters + pass bool + }{ + // check we use default threads and root dirs + { + params: map[string]interface{}{}, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: defaultMaxThreads, + }, + pass: true, + }, + // Testing initiation with a string maxThreads which can't be parsed + { + params: map[string]interface{}{ + "maxthreads": "fail", + }, + expected: DriverParameters{}, + pass: false, + }, + { + params: map[string]interface{}{ + "maxthreads": "100", + }, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: uint64(100), + }, + pass: true, + }, + { + params: map[string]interface{}{ + "maxthreads": 100, + }, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: uint64(100), + }, + pass: true, + }, + // check that we use minimum thread counts + { + params: map[string]interface{}{ + "maxthreads": 1, + }, + expected: DriverParameters{ + RootDirectory: defaultRootDirectory, + MaxThreads: minThreads, + }, + pass: true, + }, + } + + for _, item := range tests { + params, err := fromParametersImpl(item.params) + + if !item.pass { + // We only need to assert that expected failures have an error + if err == nil { + t.Fatalf("expected error configuring filesystem driver with invalid param: %+v", item.params) + } + continue + } + + if err != nil { + t.Fatalf("unexpected error creating filesystem driver: %s", err) + } + // Note that we get a pointer to params back + if !reflect.DeepEqual(*params, item.expected) { + t.Fatalf("unexpected params from filesystem driver. expected %+v, got %+v", item.expected, params) + } + } + +}