Merge pull request #1695 from tonyhb/add-regulator-to-filesystem

Add regulator to filesystem
This commit is contained in:
Richard Scothern 2016-05-04 10:05:51 -07:00
commit c8592da977
4 changed files with 325 additions and 15 deletions

View file

@ -132,8 +132,15 @@ func makeTestEnv(t *testing.T, name string) *testEnv {
t.Fatalf("unable to create tempdir: %s", err)
}
localDriver, err := filesystem.FromParameters(map[string]interface{}{
"rootdirectory": truthDir,
})
if err != nil {
t.Fatalf("unable to create filesystem driver: %s", err)
}
// todo: create a tempfile area here
localRegistry, err := storage.NewRegistry(ctx, filesystem.New(truthDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption)
localRegistry, err := storage.NewRegistry(ctx, localDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption)
if err != nil {
t.Fatalf("error creating registry: %v", err)
}
@ -142,7 +149,14 @@ func makeTestEnv(t *testing.T, name string) *testEnv {
t.Fatalf("unexpected error getting repo: %v", err)
}
truthRegistry, err := storage.NewRegistry(ctx, filesystem.New(cacheDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()))
cacheDriver, err := filesystem.FromParameters(map[string]interface{}{
"rootdirectory": cacheDir,
})
if err != nil {
t.Fatalf("unable to create filesystem driver: %s", err)
}
truthRegistry, err := storage.NewRegistry(ctx, cacheDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()))
if err != nil {
t.Fatalf("error creating registry: %v", err)
}

View file

@ -0,0 +1,145 @@
package base
import (
"io"
"sync"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
type regulator struct {
storagedriver.StorageDriver
*sync.Cond
available uint64
}
// NewRegulator wraps the given driver and is used to regulate concurrent calls
// to the given storage driver to a maximum of the given limit. This is useful
// for storage drivers that would otherwise create an unbounded number of OS
// threads if allowed to be called unregulated.
func NewRegulator(driver storagedriver.StorageDriver, limit uint64) storagedriver.StorageDriver {
return &regulator{
StorageDriver: driver,
Cond: sync.NewCond(&sync.Mutex{}),
available: limit,
}
}
func (r *regulator) enter() {
r.L.Lock()
for r.available == 0 {
r.Wait()
}
r.available--
r.L.Unlock()
}
func (r *regulator) exit() {
r.L.Lock()
// We only need to signal to a waiting FS operation if we're already at the
// limit of threads used
if r.available == 0 {
r.Signal()
}
r.available++
r.L.Unlock()
}
// Name returns the human-readable "name" of the driver, useful in error
// messages and logging. By convention, this will just be the registration
// name, but drivers may provide other information here.
func (r *regulator) Name() string {
r.enter()
defer r.exit()
return r.StorageDriver.Name()
}
// GetContent retrieves the content stored at "path" as a []byte.
// This should primarily be used for small objects.
func (r *regulator) GetContent(ctx context.Context, path string) ([]byte, error) {
r.enter()
defer r.exit()
return r.StorageDriver.GetContent(ctx, path)
}
// PutContent stores the []byte content at a location designated by "path".
// This should primarily be used for small objects.
func (r *regulator) PutContent(ctx context.Context, path string, content []byte) error {
r.enter()
defer r.exit()
return r.StorageDriver.PutContent(ctx, path, content)
}
// Reader retrieves an io.ReadCloser for the content stored at "path"
// with a given byte offset.
// May be used to resume reading a stream by providing a nonzero offset.
func (r *regulator) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
r.enter()
defer r.exit()
return r.StorageDriver.Reader(ctx, path, offset)
}
// Writer stores the contents of the provided io.ReadCloser at a
// location designated by the given path.
// May be used to resume writing a stream by providing a nonzero offset.
// The offset must be no larger than the CurrentSize for this path.
func (r *regulator) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
r.enter()
defer r.exit()
return r.StorageDriver.Writer(ctx, path, append)
}
// Stat retrieves the FileInfo for the given path, including the current
// size in bytes and the creation time.
func (r *regulator) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
r.enter()
defer r.exit()
return r.StorageDriver.Stat(ctx, path)
}
// List returns a list of the objects that are direct descendants of the
//given path.
func (r *regulator) List(ctx context.Context, path string) ([]string, error) {
r.enter()
defer r.exit()
return r.StorageDriver.List(ctx, path)
}
// Move moves an object stored at sourcePath to destPath, removing the
// original object.
// Note: This may be no more efficient than a copy followed by a delete for
// many implementations.
func (r *regulator) Move(ctx context.Context, sourcePath string, destPath string) error {
r.enter()
defer r.exit()
return r.StorageDriver.Move(ctx, sourcePath, destPath)
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (r *regulator) Delete(ctx context.Context, path string) error {
r.enter()
defer r.exit()
return r.StorageDriver.Delete(ctx, path)
}
// URLFor returns a URL which may be used to retrieve the content stored at
// the given path, possibly using the given options.
// May return an ErrUnsupportedMethod in certain StorageDriver
// implementations.
func (r *regulator) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
r.enter()
defer r.exit()
return r.StorageDriver.URLFor(ctx, path, options)
}

View file

@ -8,6 +8,8 @@ import (
"io/ioutil"
"os"
"path"
"reflect"
"strconv"
"time"
"github.com/docker/distribution/context"
@ -16,8 +18,23 @@ import (
"github.com/docker/distribution/registry/storage/driver/factory"
)
const driverName = "filesystem"
const defaultRootDirectory = "/var/lib/registry"
const (
driverName = "filesystem"
defaultRootDirectory = "/var/lib/registry"
defaultMaxThreads = uint64(100)
// minThreads is the minimum value for the maxthreads configuration
// parameter. If the driver's parameters are less than this we set
// the parameters to minThreads
minThreads = uint64(25)
)
// DriverParameters represents all configuration options available for the
// filesystem driver
type DriverParameters struct {
RootDirectory string
MaxThreads uint64
}
func init() {
factory.Register(driverName, &filesystemDriverFactory{})
@ -27,7 +44,7 @@ func init() {
type filesystemDriverFactory struct{}
func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters), nil
return FromParameters(parameters)
}
type driver struct {
@ -47,25 +64,72 @@ type Driver struct {
// FromParameters constructs a new Driver with a given parameters map
// Optional Parameters:
// - rootdirectory
func FromParameters(parameters map[string]interface{}) *Driver {
var rootDirectory = defaultRootDirectory
// - maxthreads
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
params, err := fromParametersImpl(parameters)
if err != nil || params == nil {
return nil, err
}
return New(*params), nil
}
func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) {
var (
err error
maxThreads = defaultMaxThreads
rootDirectory = defaultRootDirectory
)
if parameters != nil {
rootDir, ok := parameters["rootdirectory"]
if ok {
if rootDir, ok := parameters["rootdirectory"]; ok {
rootDirectory = fmt.Sprint(rootDir)
}
// Get maximum number of threads for blocking filesystem operations,
// if specified
threads := parameters["maxthreads"]
switch v := threads.(type) {
case string:
if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil {
return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads)
}
case uint64:
maxThreads = v
case int, int32, int64:
val := reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int()
// If threads is negative casting to uint64 will wrap around and
// give you the hugest thread limit ever. Let's be sensible, here
if val > 0 {
maxThreads = uint64(val)
}
case uint, uint32:
maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint()
case nil:
// do nothing
default:
return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads)
}
if maxThreads < minThreads {
maxThreads = minThreads
}
}
return New(rootDirectory)
params := &DriverParameters{
RootDirectory: rootDirectory,
MaxThreads: maxThreads,
}
return params, nil
}
// New constructs a new Driver with a given rootDirectory
func New(rootDirectory string) *Driver {
func New(params DriverParameters) *Driver {
fsDriver := &driver{rootDirectory: params.RootDirectory}
return &Driver{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: &driver{
rootDirectory: rootDirectory,
},
StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads),
},
},
}

View file

@ -3,6 +3,7 @@ package filesystem
import (
"io/ioutil"
"os"
"reflect"
"testing"
storagedriver "github.com/docker/distribution/registry/storage/driver"
@ -20,7 +21,93 @@ func init() {
}
defer os.Remove(root)
driver, err := FromParameters(map[string]interface{}{
"rootdirectory": root,
})
if err != nil {
panic(err)
}
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
return New(root), nil
return driver, nil
}, testsuites.NeverSkip)
}
func TestFromParametersImpl(t *testing.T) {
tests := []struct {
params map[string]interface{} // techincally the yaml can contain anything
expected DriverParameters
pass bool
}{
// check we use default threads and root dirs
{
params: map[string]interface{}{},
expected: DriverParameters{
RootDirectory: defaultRootDirectory,
MaxThreads: defaultMaxThreads,
},
pass: true,
},
// Testing initiation with a string maxThreads which can't be parsed
{
params: map[string]interface{}{
"maxthreads": "fail",
},
expected: DriverParameters{},
pass: false,
},
{
params: map[string]interface{}{
"maxthreads": "100",
},
expected: DriverParameters{
RootDirectory: defaultRootDirectory,
MaxThreads: uint64(100),
},
pass: true,
},
{
params: map[string]interface{}{
"maxthreads": 100,
},
expected: DriverParameters{
RootDirectory: defaultRootDirectory,
MaxThreads: uint64(100),
},
pass: true,
},
// check that we use minimum thread counts
{
params: map[string]interface{}{
"maxthreads": 1,
},
expected: DriverParameters{
RootDirectory: defaultRootDirectory,
MaxThreads: minThreads,
},
pass: true,
},
}
for _, item := range tests {
params, err := fromParametersImpl(item.params)
if !item.pass {
// We only need to assert that expected failures have an error
if err == nil {
t.Fatalf("expected error configuring filesystem driver with invalid param: %+v", item.params)
}
continue
}
if err != nil {
t.Fatalf("unexpected error creating filesystem driver: %s", err)
}
// Note that we get a pointer to params back
if !reflect.DeepEqual(*params, item.expected) {
t.Fatalf("unexpected params from filesystem driver. expected %+v, got %+v", item.expected, params)
}
}
}